You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/09/21 04:12:49 UTC
[25/63] [abbrv] git commit: Redesign Scheduler part 2
Redesign Scheduler part 2
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/e6aadfcc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/e6aadfcc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/e6aadfcc
Branch: refs/heads/master
Commit: e6aadfccdaf02b9df65d10ac835cab7fd26e274e
Parents: aa7550a
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Jul 25 03:09:49 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Sat Sep 20 20:02:48 2014 +0200
----------------------------------------------------------------------
.../flink/client/CliFrontendListCancelTest.java | 7 -
.../org/apache/flink/runtime/AbstractID.java | 49 +-
.../runtime/execution/ExecutionListener.java | 11 -
.../runtime/executiongraph/ExecutionGraph.java | 11 +-
.../executiongraph/ExecutionVertex2.java | 63 +-
.../VertexAssignmentListener.java | 40 --
.../flink/runtime/filecache/FileCache.java | 2 +-
.../flink/runtime/instance/AllocatedSlot.java | 84 ++-
.../flink/runtime/instance/AllocationID.java | 32 -
.../apache/flink/runtime/instance/Hardware.java | 2 +-
.../apache/flink/runtime/instance/Instance.java | 202 ++++--
.../instance/InstanceConnectionInfo.java | 6 +-
.../flink/runtime/instance/InstanceID.java | 6 +-
.../runtime/instance/InstanceListener.java | 10 +
.../flink/runtime/jobgraph/JobVertexID.java | 6 +-
.../flink/runtime/jobmanager/JobManager.java | 102 +--
.../jobmanager/scheduler/DefaultScheduler.java | 528 ++++++++--------
.../scheduler/InstanceFillDegreeComparator.java | 31 -
.../jobmanager/scheduler/ScheduledUnit.java | 68 +-
.../scheduler/SchedulingStrategy.java | 33 -
.../runtime/jobmanager/scheduler/SetQueue.java | 134 ++++
.../jobmanager/scheduler/SharedSlot.java | 99 +++
.../scheduler/SlotAllocationFuture.java | 94 +++
.../scheduler/SlotAllocationFutureAction.java | 34 +
.../scheduler/SlotAvailablilityListener.java | 29 +
.../jobmanager/scheduler/SlotSharingGroup.java | 70 ++
.../scheduler/SlotSharingGroupAssignment.java | 270 ++++++++
.../runtime/jobmanager/scheduler/SubSlot.java | 69 ++
.../jobmanager/web/SetupInfoServlet.java | 2 +-
.../protocols/ExtendedManagementProtocol.java | 10 -
.../protocols/TaskOperationProtocol.java | 5 -
.../taskmanager/ExecutorThreadFactory.java | 41 --
.../flink/runtime/taskmanager/TaskManager.java | 1 +
.../runtime/ExecutorThreadFactory.java | 41 --
.../runtime/util/ExecutorThreadFactory.java | 38 ++
.../flink/runtime/util/NativeCodeLoader.java | 129 ----
.../apache/flink/runtime/AbstractIDTest.java | 75 ++-
.../flink/runtime/instance/HardwareTest.java | 25 +-
.../instance/InstanceConnectionInfoTest.java | 25 +-
.../flink/runtime/instance/InstanceTest.java | 174 +++++
.../instance/LocalInstanceManagerTest.java | 4 -
.../scheduler/DefaultSchedulerTest.java | 180 ------
.../scheduler/SchedulerIsolatedTasksTest.java | 385 +++++++++++
.../scheduler/SchedulerSlotSharingTest.java | 633 +++++++++++++++++++
.../scheduler/SchedulerTestUtils.java | 145 +++++
.../jobmanager/scheduler/SetQueueTest.java | 110 ++++
.../jobmanager/scheduler/SharedSlotsTest.java | 118 ++++
.../scheduler/SlotAllocationFutureTest.java | 177 ++++++
48 files changed, 3328 insertions(+), 1082 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e6aadfcc/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java
index fa4d9a3..f335745 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java
@@ -16,7 +16,6 @@
* limitations under the License.
*/
-
package org.apache.flink.client;
import static org.junit.Assert.assertTrue;
@@ -28,7 +27,6 @@ import java.util.List;
import org.apache.commons.cli.CommandLine;
import org.apache.flink.client.CliFrontend;
-import org.apache.flink.core.io.StringRecord;
import org.apache.flink.runtime.client.JobCancelResult;
import org.apache.flink.runtime.client.JobProgressResult;
import org.apache.flink.runtime.client.JobSubmissionResult;
@@ -197,11 +195,6 @@ public class CliFrontendListCancelTest {
}
@Override
- public void killInstance(StringRecord instanceName) throws IOException {
- throw new UnsupportedOperationException();
- }
-
- @Override
public void logBufferUtilization(JobID jobID) throws IOException {
throw new UnsupportedOperationException();
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e6aadfcc/flink-runtime/src/main/java/org/apache/flink/runtime/AbstractID.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/AbstractID.java b/flink-runtime/src/main/java/org/apache/flink/runtime/AbstractID.java
index 73cd8fc..458907c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/AbstractID.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/AbstractID.java
@@ -32,7 +32,7 @@ import io.netty.buffer.ByteBuf;
/**
* A statistically unique identification number.
*/
-public class AbstractID implements IOReadableWritable {
+public class AbstractID implements IOReadableWritable, Comparable<AbstractID> {
/** The size of a long in bytes */
private static final int SIZE_OF_LONG = 8;
@@ -139,24 +139,8 @@ public class AbstractID implements IOReadableWritable {
this.lowerPart = src.lowerPart;
this.upperPart = src.upperPart;
}
-
- @Override
- public boolean equals(Object obj) {
- if (obj != null && obj instanceof AbstractID) {
- AbstractID src = (AbstractID) obj;
- return src.lowerPart == this.lowerPart && src.upperPart == this.upperPart;
- } else {
- return false;
- }
- }
-
- @Override
- public int hashCode() {
- return ((int) this.lowerPart) ^
- ((int) (this.lowerPart >>> 32)) ^
- ((int) this.upperPart) ^
- ((int) (this.upperPart >>> 32));
- }
+
+ // --------------------------------------------------------------------------------------------
@Override
public void read(DataInputView in) throws IOException {
@@ -180,6 +164,26 @@ public class AbstractID implements IOReadableWritable {
buf.writeLong(this.upperPart);
}
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj != null && obj instanceof AbstractID) {
+ AbstractID src = (AbstractID) obj;
+ return src.lowerPart == this.lowerPart && src.upperPart == this.upperPart;
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ return ((int) this.lowerPart) ^
+ ((int) (this.lowerPart >>> 32)) ^
+ ((int) this.upperPart) ^
+ ((int) (this.upperPart >>> 32));
+ }
+
@Override
public String toString() {
final byte[] ba = new byte[SIZE];
@@ -187,4 +191,11 @@ public class AbstractID implements IOReadableWritable {
longToByteArray(this.upperPart, ba, SIZE_OF_LONG);
return StringUtils.byteToHexString(ba);
}
+
+ @Override
+ public int compareTo(AbstractID o) {
+ int diff1 = Long.compare(this.upperPart, o.upperPart);
+ int diff2 = Long.compare(this.lowerPart, o.lowerPart);
+ return diff1 == 0 ? diff2 : diff1;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e6aadfcc/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionListener.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionListener.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionListener.java
index 305f47d..ded630f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionListener.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionListener.java
@@ -16,7 +16,6 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.execution;
import org.apache.flink.runtime.executiongraph.ExecutionVertexID;
@@ -25,20 +24,10 @@ import org.apache.flink.runtime.jobgraph.JobID;
/**
* This interface must be implemented by classes which should be able to receive notifications about
* changes of a task's execution state.
- *
*/
public interface ExecutionListener {
/**
- * Returns the priority of the execution listener object. If multiple execution listener objects are registered for
- * a given vertex, the priority determines in which order they will be called. Priorities are expressed as
- * non-negative integer values. The lower the integer value, the higher the call priority.
- *
- * @return the priority of this execution listener
- */
- int getPriority();
-
- /**
* Called when the execution state of the associated task has changed. It is important to point out that multiple
* execution listeners can be invoked as a reaction to a state change, according to their priority. As a result, the
* value of <code>newExecutionState</code> may be out-dated by the time a particular execution listener is called.
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e6aadfcc/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index cea0271..bc95250 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -16,7 +16,6 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.executiongraph;
import java.util.ArrayList;
@@ -43,8 +42,6 @@ import org.apache.flink.core.io.InputSplit;
import org.apache.flink.runtime.execution.ExecutionListener;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
-import org.apache.flink.runtime.instance.AllocatedResource;
-import org.apache.flink.runtime.instance.DummyInstance;
import org.apache.flink.runtime.io.network.channels.ChannelID;
import org.apache.flink.runtime.io.network.channels.ChannelType;
import org.apache.flink.runtime.io.network.gates.GateID;
@@ -57,7 +54,7 @@ import org.apache.flink.runtime.jobgraph.JobID;
import org.apache.flink.runtime.jobgraph.JobInputVertex;
import org.apache.flink.runtime.jobgraph.JobOutputVertex;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.taskmanager.ExecutorThreadFactory;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.apache.flink.util.StringUtils;
/**
@@ -1291,12 +1288,6 @@ public class ExecutionGraph implements ExecutionListener {
return this.stages.iterator();
}
-
- @Override
- public int getPriority() {
- return 1;
- }
-
/**
* Performs an asynchronous update operation to this execution graph.
*
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e6aadfcc/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex2.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex2.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex2.java
index ab33ca0..09a6f5b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex2.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex2.java
@@ -15,11 +15,70 @@
package org.apache.flink.runtime.executiongraph;
+import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
public class ExecutionVertex2 {
+ private final JobVertexID jobVertexId;
+
+
+
+ public ExecutionVertex2() {
+ this(new JobVertexID());
+ }
+
+ public ExecutionVertex2(JobVertexID jobVertexId) {
+ this.jobVertexId = jobVertexId;
+ }
+
+
+
+ public JobID getJobId() {
+ return new JobID();
+ }
+
+
+ public JobVertexID getJobvertexId() {
+ return this.jobVertexId;
+ }
+
+ public String getTaskName() {
+ return "task";
+ }
+
+ public int getTotalNumberOfParallelSubtasks() {
+ return 1;
+ }
+
+ public int getParallelSubtaskIndex() {
+ return 0;
+ }
+
+
+ // --------------------------------------------------------------------------------------------
+ // Scheduling
+ // --------------------------------------------------------------------------------------------
+
+ public Iterable<Instance> getPreferredLocations() {
+ return null;
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Utilities
+ // --------------------------------------------------------------------------------------------
- public void handleException(Throwable t) {
- t.printStackTrace();
+ /**
+ * Creates a simple name representation in the style 'taskname (x/y)', where
+ * 'taskname' is the name as returned by {@link #getTaskName()}, 'x' is the parallel
+ * subtask index as returned by {@link #getParallelSubtaskIndex()}{@code + 1}, and 'y' is the total
+ * number of tasks, as returned by {@link #getTotalNumberOfParallelSubtasks()}.
+ *
+ * @return A simple name representation.
+ */
+ public String getSimpleName() {
+ return getTaskName() + " (" + (getParallelSubtaskIndex()+1) + '/' + getTotalNumberOfParallelSubtasks() + ')';
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e6aadfcc/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/VertexAssignmentListener.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/VertexAssignmentListener.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/VertexAssignmentListener.java
deleted file mode 100644
index f09b10f..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/VertexAssignmentListener.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.executiongraph;
-
-import org.apache.flink.runtime.instance.AllocatedResource;
-
-/**
- * Classes implementing the {@link VertexAssignmentListener} interface can register for notifications about changes in
- * the assignment of an {@link ExecutionVertex} to an {@link AllocatedResource}.
- *
- */
-public interface VertexAssignmentListener {
-
- /**
- * Called when an {@link ExecutionVertex} has been assigned to an {@link AllocatedResource}.
- *
- * @param id
- * the ID of the vertex which has been reassigned
- * @param newAllocatedResource
- * the allocated resource the vertex is now assigned to
- */
- void vertexAssignmentChanged(ExecutionVertexID id, AllocatedResource newAllocatedResource);
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e6aadfcc/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java
index 35562de..63b4d54 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java
@@ -42,7 +42,7 @@ import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.local.LocalFileSystem;
import org.apache.flink.runtime.jobgraph.JobID;
-import org.apache.flink.runtime.taskmanager.runtime.ExecutorThreadFactory;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.apache.flink.runtime.util.IOUtils;
/**
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e6aadfcc/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocatedSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocatedSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocatedSlot.java
index 6289d45..cb7e658 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocatedSlot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocatedSlot.java
@@ -18,20 +18,24 @@
package org.apache.flink.runtime.instance;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import org.apache.flink.runtime.executiongraph.ExecutionVertex2;
import org.apache.flink.runtime.jobgraph.JobID;
-import org.apache.flink.runtime.jobmanager.scheduler.ResourceId;
/**
* An allocated slot is the unit in which resources are allocated on instances.
*/
public class AllocatedSlot {
+
+ private static final AtomicIntegerFieldUpdater<AllocatedSlot> STATUS_UPDATER =
+ AtomicIntegerFieldUpdater.newUpdater(AllocatedSlot.class, "status");
+
+ private static final int ALLOCATED_AND_ALIVE = 0; // tasks may be added and might be running
+ private static final int CANCELLED = 1; // no more tasks may run
+ private static final int RELEASED = 2; // has been given back to the instance
- /** The ID which identifies the resources occupied by this slot. */
- private final ResourceId resourceId;
-
+
/** The ID of the job this slice belongs to. */
private final JobID jobID;
@@ -41,12 +45,15 @@ public class AllocatedSlot {
/** The number of the slot on which the task is deployed */
private final int slotNumber;
- /** Flag that marks the schedule as active */
- private final AtomicBoolean active = new AtomicBoolean(true);
+ private volatile int status = ALLOCATED_AND_ALIVE;
+
- public AllocatedSlot(JobID jobID, ResourceId resourceId, Instance instance, int slotNumber) {
- this.resourceId = resourceId;
+ public AllocatedSlot(JobID jobID, Instance instance, int slotNumber) {
+ if (jobID == null || instance == null || slotNumber < 0) {
+ throw new IllegalArgumentException();
+ }
+
this.jobID = jobID;
this.instance = instance;
this.slotNumber = slotNumber;
@@ -63,10 +70,6 @@ public class AllocatedSlot {
return this.jobID;
}
- public ResourceId getResourceId() {
- return resourceId;
- }
-
public Instance getInstance() {
return instance;
}
@@ -87,7 +90,60 @@ public class AllocatedSlot {
return true;
}
- public void cancelResource() {
+ // --------------------------------------------------------------------------------------------
+ // Status and life cycle
+ // --------------------------------------------------------------------------------------------
+
+ public boolean isAlive() {
+ return status == ALLOCATED_AND_ALIVE;
+ }
+
+ public boolean isCanceled() {
+ return status != ALLOCATED_AND_ALIVE;
+ }
+
+ public boolean isReleased() {
+ return status == RELEASED;
+ }
+
+
+ public void cancel() {
+ if (STATUS_UPDATER.compareAndSet(this, ALLOCATED_AND_ALIVE, CANCELLED)) {
+ // kill all tasks currently running in this slot
+ }
+ }
+
+ public void releaseSlot() {
+ // cancel everything, if there is something. since this is atomically status based,
+ // it will not happen twice if another attempt happened before or concurrently
+ cancel();
+ this.instance.returnAllocatedSlot(this);
+ }
+
+ protected boolean markReleased() {
+ return STATUS_UPDATER.compareAndSet(this, CANCELLED, RELEASED);
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Utilities
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public String toString() {
+ return instance.getId() + " (" + slotNumber + ") - " + getStateName(status);
+ }
+
+ private static final String getStateName(int state) {
+ switch (state) {
+ case ALLOCATED_AND_ALIVE:
+ return "ALLOCATED/ALIVE";
+ case CANCELLED:
+ return "CANCELLED";
+ case RELEASED:
+ return "RELEASED";
+ default:
+ return "(unknown)";
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e6aadfcc/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocationID.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocationID.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocationID.java
deleted file mode 100644
index c315dc3..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocationID.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.instance;
-
-import org.apache.flink.runtime.AbstractID;
-
-/**
- * An allocation ID unambiguously identifies the allocated resources
- * within an {@link Instance}. The ID is necessary if an {@link InstanceManager} decides to partition
- * {@link Instance}s
- * without the knowledge of Nephele's scheduler.
- *
- */
-public class AllocationID extends AbstractID {
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e6aadfcc/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Hardware.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Hardware.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Hardware.java
index 92ccfb2..aa61927 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Hardware.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Hardware.java
@@ -76,7 +76,7 @@ public class Hardware {
return -1;
default:
- LOG.error("Unrecognized OS");
+ LOG.error("Unrecognized OS: " + OperatingSystem.getCurrentOperatingSystem());
return -1;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e6aadfcc/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
index 3d39c8f..a168b2c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
@@ -21,13 +21,15 @@ package org.apache.flink.runtime.instance;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayDeque;
+import java.util.ArrayList;
import java.util.HashSet;
+import java.util.List;
import java.util.Queue;
import java.util.Set;
import org.apache.flink.runtime.ipc.RPC;
import org.apache.flink.runtime.jobgraph.JobID;
-import org.apache.flink.runtime.jobmanager.scheduler.ResourceId;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotAvailablilityListener;
import org.apache.flink.runtime.net.NetUtils;
import org.apache.flink.runtime.protocols.TaskOperationProtocol;
import org.eclipse.jetty.util.log.Log;
@@ -52,14 +54,19 @@ public class Instance {
/** The number of task slots available on the node */
private final int numberOfSlots;
-
+ /** A list of available slot positons */
private final Queue<Integer> availableSlots;
/** Allocated slots on this instance */
private final Set<AllocatedSlot> allocatedSlots = new HashSet<AllocatedSlot>();
+
+ /** A listener to be notified upon new slot availability */
+ private SlotAvailablilityListener slotListener;
+
+
/** The RPC proxy to send calls to the task manager represented by this instance */
- private volatile TaskOperationProtocol taskManager ;
+ private volatile TaskOperationProtocol taskManager;
/**
* Time when last heat beat has been received from the task manager running on this instance.
@@ -68,6 +75,8 @@ public class Instance {
private volatile boolean isDead;
+ // --------------------------------------------------------------------------------------------
+
/**
* Constructs an abstract instance object.
*
@@ -82,43 +91,36 @@ public class Instance {
this.resources = resources;
this.numberOfSlots = numberOfSlots;
- this.availableSlots = new ArrayDeque<Integer>();
+ this.availableSlots = new ArrayDeque<Integer>(numberOfSlots);
for (int i = 0; i < numberOfSlots; i++) {
this.availableSlots.add(i);
}
}
- public TaskOperationProtocol getTaskManagerProxy() throws IOException {
- TaskOperationProtocol tm = this.taskManager;
-
- if (tm == null) {
- synchronized (this) {
- if (this.taskManager == null) {
- this.taskManager = RPC.getProxy(TaskOperationProtocol.class,
- new InetSocketAddress(getInstanceConnectionInfo().address(),
- getInstanceConnectionInfo().ipcPort()), NetUtils.getSocketFactory());
- }
- tm = this.taskManager;
- }
- }
-
- return tm;
+ // --------------------------------------------------------------------------------------------
+ // Properties
+ // --------------------------------------------------------------------------------------------
+
+ public InstanceID getId() {
+ return instanceId;
}
-
- /** Destroys and removes the RPC stub object for this instance's task manager. */
- private void destroyTaskManagerProxy() {
- synchronized (this) {
- if (this.taskManager != null) {
- try {
- RPC.stopProxy(this.taskManager);
- } catch (Throwable t) {
- Log.debug("Error shutting down RPC proxy.", t);
- }
- }
- }
+
+ public HardwareDescription getResources() {
+ return this.resources;
+ }
+
+ public int getTotalNumberOfSlots() {
+ return numberOfSlots;
+ }
+
+ /**
+ * Returns the instance's connection information object.
+ *
+ * @return the instance's connection information object
+ */
+ public InstanceConnectionInfo getInstanceConnectionInfo() {
+ return this.instanceConnectionInfo;
}
-
-
// --------------------------------------------------------------------------------------------
// Life and Death
@@ -136,38 +138,52 @@ public class Instance {
isDead = true;
synchronized (instanceLock) {
- this.allocatedSlots.clear();
+
+ // no more notifications for the slot releasing
+ this.slotListener = null;
+
for (AllocatedSlot slot : allocatedSlots) {
- slot.cancelResource();
+ slot.releaseSlot();
}
+ allocatedSlots.clear();
+ availableSlots.clear();
}
destroyTaskManagerProxy();
}
// --------------------------------------------------------------------------------------------
- // Properties
+ // Connection to the TaskManager
// --------------------------------------------------------------------------------------------
- public InstanceID getId() {
- return instanceId;
- }
-
- public HardwareDescription getResources() {
- return this.resources;
- }
-
- public int getTotalNumberOfSlots() {
- return numberOfSlots;
+ public TaskOperationProtocol getTaskManagerProxy() throws IOException {
+ TaskOperationProtocol tm = this.taskManager;
+
+ if (tm == null) {
+ synchronized (this) {
+ if (this.taskManager == null) {
+ this.taskManager = RPC.getProxy(TaskOperationProtocol.class,
+ new InetSocketAddress(getInstanceConnectionInfo().address(),
+ getInstanceConnectionInfo().ipcPort()), NetUtils.getSocketFactory());
+ }
+ tm = this.taskManager;
+ }
+ }
+
+ return tm;
}
-
- /**
- * Returns the instance's connection information object.
- *
- * @return the instance's connection information object
- */
- public InstanceConnectionInfo getInstanceConnectionInfo() {
- return this.instanceConnectionInfo;
+
+ /** Destroys and removes the RPC stub object for this instance's task manager. */
+ private void destroyTaskManagerProxy() {
+ synchronized (this) {
+ if (this.taskManager != null) {
+ try {
+ RPC.stopProxy(this.taskManager);
+ } catch (Throwable t) {
+ Log.debug("Error shutting down RPC proxy.", t);
+ }
+ }
+ }
}
// --------------------------------------------------------------------------------------------
@@ -206,7 +222,11 @@ public class Instance {
// Resource allocation
// --------------------------------------------------------------------------------------------
- public AllocatedSlot allocateSlot(JobID jobID, ResourceId resourceId) throws InstanceDiedException {
+ public AllocatedSlot allocateSlot(JobID jobID) throws InstanceDiedException {
+ if (jobID == null) {
+ throw new IllegalArgumentException();
+ }
+
synchronized (instanceLock) {
if (isDead) {
throw new InstanceDiedException(this);
@@ -216,27 +236,95 @@ public class Instance {
if (nextSlot == null) {
return null;
} else {
- AllocatedSlot slot = new AllocatedSlot(jobID, resourceId, this, nextSlot);
+ AllocatedSlot slot = new AllocatedSlot(jobID, this, nextSlot);
allocatedSlots.add(slot);
return slot;
}
}
}
+ public boolean returnAllocatedSlot(AllocatedSlot slot) {
+ // the slot needs to be in the returned to instance state
+ if (slot == null || slot.getInstance() != this) {
+ throw new IllegalArgumentException("Slot is null or belongs to the wrong instance.");
+ }
+ if (slot.isAlive()) {
+ throw new IllegalArgumentException("Slot is still alive");
+ }
+
+ if (slot.markReleased()) {
+ synchronized (instanceLock) {
+ if (isDead) {
+ return false;
+ }
+
+ if (this.allocatedSlots.remove(slot)) {
+ this.availableSlots.add(slot.getSlotNumber());
+
+ if (this.slotListener != null) {
+ this.slotListener.newSlotAvailable(this);
+ }
+
+ return true;
+ } else {
+ throw new IllegalArgumentException("Slot was not allocated from the instance.");
+ }
+ }
+ } else {
+ return false;
+ }
+ }
+
+ public void cancelAndReleaseAllSlots() {
+ synchronized (instanceLock) {
+ // we need to do this copy because of concurrent modification exceptions
+ List<AllocatedSlot> copy = new ArrayList<AllocatedSlot>(this.allocatedSlots);
+
+ for (AllocatedSlot slot : copy) {
+ slot.releaseSlot();
+ }
+ allocatedSlots.clear();
+ }
+ }
+
public int getNumberOfAvailableSlots() {
return this.availableSlots.size();
}
+ public int getNumberOfAllocatedSlots() {
+ return this.allocatedSlots.size();
+ }
+
public boolean hasResourcesAvailable() {
return !isDead && getNumberOfAvailableSlots() > 0;
}
// --------------------------------------------------------------------------------------------
+ // Listeners
+ // --------------------------------------------------------------------------------------------
+
+ public void setSlotAvailabilityListener(SlotAvailablilityListener slotListener) {
+ synchronized (instanceLock) {
+ if (this.slotListener != null) {
+ throw new IllegalStateException("Instance has already a slot listener.");
+ } else {
+ this.slotListener = slotListener;
+ }
+ }
+ }
+
+ public void removeSlotListener() {
+ synchronized (instanceLock) {
+ this.slotListener = null;
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
// Standard Utilities
// --------------------------------------------------------------------------------------------
@Override
public String toString() {
- return "Instance (" + this.instanceConnectionInfo + "), resources: " + this.resources + ", numberOfSlots=" + numberOfSlots;
+ return instanceId + " @" + this.instanceConnectionInfo + ' ' + numberOfSlots + " slots";
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e6aadfcc/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceConnectionInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceConnectionInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceConnectionInfo.java
index 4cbb2a9..ec63c00 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceConnectionInfo.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceConnectionInfo.java
@@ -67,18 +67,18 @@ public class InstanceConnectionInfo implements IOReadableWritable, Comparable<In
* the port instance's task manager expects to receive transfer envelopes on
*/
public InstanceConnectionInfo(InetAddress inetAddress, int ipcPort, int dataPort) {
-
if (inetAddress == null) {
throw new IllegalArgumentException("Argument inetAddress must not be null");
}
-
if (ipcPort <= 0) {
throw new IllegalArgumentException("Argument ipcPort must be greater than zero");
}
-
if (dataPort <= 0) {
throw new IllegalArgumentException("Argument dataPort must be greater than zero");
}
+ if (ipcPort == dataPort) {
+ throw new IllegalArgumentException("IPC and data port must be different");
+ }
this.ipcPort = ipcPort;
this.dataPort = dataPort;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e6aadfcc/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceID.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceID.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceID.java
index 388bba2..9c922af 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceID.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceID.java
@@ -16,15 +16,11 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.instance;
import org.apache.flink.runtime.AbstractID;
/**
* Class for statistically unique instance IDs.
- *
*/
-public class InstanceID extends AbstractID {
-
-}
+public class InstanceID extends AbstractID {}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e6aadfcc/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceListener.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceListener.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceListener.java
index 76e63b8..d42d404 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceListener.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceListener.java
@@ -24,7 +24,17 @@ package org.apache.flink.runtime.instance;
*/
public interface InstanceListener {
+ /**
+ * Called when a new instance becomes available.
+ *
+ * @param instance The instance that became available.
+ */
void newInstanceAvailable(Instance instance);
+ /**
+ * Called when an instance died.
+ *
+ * @param instance The instance that died.
+ */
void instanceDied(Instance instance);
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e6aadfcc/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertexID.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertexID.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertexID.java
index ba76e02..6a56c19 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertexID.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertexID.java
@@ -16,15 +16,11 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.jobgraph;
import org.apache.flink.runtime.AbstractID;
/**
* A class for statistically unique job vertex IDs.
- *
*/
-public class JobVertexID extends AbstractID {
-
-}
+public class JobVertexID extends AbstractID {}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e6aadfcc/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
index 1f5fc96..3b76b78 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
@@ -70,11 +70,11 @@ import org.apache.flink.runtime.executiongraph.GraphConversionException;
import org.apache.flink.runtime.executiongraph.InternalJobStatus;
import org.apache.flink.runtime.executiongraph.JobStatusListener;
import org.apache.flink.runtime.instance.DefaultInstanceManager;
-import org.apache.flink.runtime.instance.DummyInstance;
import org.apache.flink.runtime.instance.HardwareDescription;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.instance.InstanceConnectionInfo;
import org.apache.flink.runtime.instance.InstanceManager;
+import org.apache.flink.runtime.instance.LocalInstanceManager;
import org.apache.flink.runtime.io.network.ConnectionInfoLookupResponse;
import org.apache.flink.runtime.io.network.RemoteReceiver;
import org.apache.flink.runtime.io.network.channels.ChannelID;
@@ -87,8 +87,6 @@ import org.apache.flink.runtime.jobmanager.accumulators.AccumulatorManager;
import org.apache.flink.runtime.jobmanager.archive.ArchiveListener;
import org.apache.flink.runtime.jobmanager.archive.MemoryArchivist;
import org.apache.flink.runtime.jobmanager.scheduler.DefaultScheduler;
-import org.apache.flink.runtime.jobmanager.scheduler.SchedulingException;
-import org.apache.flink.runtime.jobmanager.splitassigner.InputSplitManager;
import org.apache.flink.runtime.jobmanager.splitassigner.InputSplitWrapper;
import org.apache.flink.runtime.jobmanager.web.WebInfoServer;
import org.apache.flink.runtime.managementgraph.ManagementGraph;
@@ -101,14 +99,13 @@ import org.apache.flink.runtime.protocols.ExtendedManagementProtocol;
import org.apache.flink.runtime.protocols.InputSplitProviderProtocol;
import org.apache.flink.runtime.protocols.JobManagerProtocol;
import org.apache.flink.runtime.taskmanager.AbstractTaskResult;
-import org.apache.flink.runtime.taskmanager.ExecutorThreadFactory;
import org.apache.flink.runtime.taskmanager.TaskCancelResult;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.taskmanager.TaskKillResult;
import org.apache.flink.runtime.taskmanager.TaskSubmissionResult;
-import org.apache.flink.runtime.taskmanager.transferenvelope.RegisterTaskManagerResult;
import org.apache.flink.runtime.types.IntegerRecord;
import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.apache.flink.runtime.util.SerializableArrayList;
import org.apache.flink.util.StringUtils;
import org.apache.log4j.ConsoleAppender;
@@ -130,32 +127,32 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
private static final Log LOG = LogFactory.getLog(JobManager.class);
+ private final static int FAILURE_RETURN_CODE = 1;
+
+
+ private final ExecutorService executorService = Executors.newCachedThreadPool(ExecutorThreadFactory.INSTANCE);
+
private final Server jobManagerServer;
- private final JobManagerProfiler profiler;
-
private final EventCollector eventCollector;
private final ArchiveListener archive;
- private final InputSplitManager inputSplitManager;
-
+ private final InstanceManager instanceManager;
+
private final DefaultScheduler scheduler;
- private AccumulatorManager accumulatorManager;
-
- private InstanceManager instanceManager;
+ private final AccumulatorManager accumulatorManager;
+
private final int recommendedClientPollingInterval;
- private final ExecutorService executorService = Executors.newCachedThreadPool(ExecutorThreadFactory.INSTANCE);
-
- private final static int FAILURE_RETURN_CODE = 1;
-
+
private final AtomicBoolean isShutdownInProgress = new AtomicBoolean(false);
private volatile boolean isShutDown;
+
private WebInfoServer server;
@@ -199,8 +196,6 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
// accumulator results.
this.accumulatorManager = new AccumulatorManager(Math.min(1, archived_items));
- // Load the input split manager
- this.inputSplitManager = new InputSplitManager();
// Determine own RPC address
final InetSocketAddress rpcServerAddress = new InetSocketAddress(ipcAddress, ipcPort);
@@ -219,7 +214,8 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
// Try to load the instance manager for the given execution mode
if (executionMode == ExecutionMode.LOCAL) {
- this.instanceManager = new Lo
+ final int numTaskManagers = GlobalConfiguration.getInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 1);
+ this.instanceManager = new LocalInstanceManager(numTaskManagers);
}
else if (executionMode == ExecutionMode.CLUSTER) {
this.instanceManager = new DefaultInstanceManager();
@@ -237,19 +233,6 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
if (this.scheduler == null) {
throw new Exception("Unable to load scheduler " + schedulerClassName);
}
-
- // Load profiler if it should be used
- if (GlobalConfiguration.getBoolean(ProfilingUtils.ENABLE_PROFILING_KEY, false)) {
- final String profilerClassName = GlobalConfiguration.getString(ProfilingUtils.JOBMANAGER_CLASSNAME_KEY,
- "org.apache.flink.runtime.profiling.impl.JobManagerProfilerImpl");
- this.profiler = ProfilingUtils.loadJobManagerProfiler(profilerClassName, ipcAddress);
- if (this.profiler == null) {
- throw new Exception("Cannot load profiler");
- }
- } else {
- this.profiler = null;
- LOG.debug("Profiler disabled");
- }
}
public void shutdown() {
@@ -263,11 +246,6 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
this.instanceManager.shutdown();
}
- // Stop profiling if enabled
- if (this.profiler != null) {
- this.profiler.shutdown();
- }
-
// Stop RPC server
if (this.jobManagerServer != null) {
this.jobManagerServer.stop();
@@ -458,12 +436,6 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
LOG.debug("The dependency chain for instance sharing is acyclic");
}
- // Check if the job will be executed with profiling enabled
- boolean jobRunsWithProfiling = false;
- if (this.profiler != null && job.getJobConfiguration().getBoolean(ProfilingUtils.PROFILE_JOB_KEY, true)) {
- jobRunsWithProfiling = true;
- }
-
// Try to create initial execution graph from job graph
LOG.info("Creating initial execution graph from job graph " + job.getName());
ExecutionGraph eg;
@@ -485,22 +457,9 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
// Register job with the progress collector
if (this.eventCollector != null) {
- this.eventCollector.registerJob(eg, jobRunsWithProfiling, System.currentTimeMillis());
+ this.eventCollector.registerJob(eg, false, System.currentTimeMillis());
}
- // Check if profiling should be enabled for this job
- if (jobRunsWithProfiling) {
- this.profiler.registerProfilingJob(eg);
-
- if (this.eventCollector != null) {
- this.profiler.registerForProfilingData(eg.getJobID(), this.eventCollector);
- }
-
- }
-
- // Register job with the dynamic input split assigner
- this.inputSplitManager.registerJob(eg);
-
// Register for updates on the job status
eg.registerJobStatusListener(this);
@@ -902,41 +861,12 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
eg.executeCommand(runnable);
}
-
- @Override
- public void killInstance(final StringRecord instanceName) throws IOException {
-
- final Instance instance = this.instanceManager.getInstanceByName(instanceName.toString());
- if (instance == null) {
- LOG.error("Cannot find instance with name " + instanceName + " to kill it");
- return;
- }
-
- LOG.info("Killing task manager on instance " + instance);
-
- final Runnable runnable = new Runnable() {
-
- @Override
- public void run() {
- try {
- instance.killTaskManager();
- } catch (IOException ioe) {
- LOG.error(ioe);
- }
- }
- };
-
- // Hand it over to the executor service
- this.executorService.execute(runnable);
- }
-
/**
* Tests whether the job manager has been shut down completely.
*
* @return <code>true</code> if the job manager has been shut down completely, <code>false</code> otherwise
*/
public boolean isShutDown() {
-
return this.isShutDown;
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e6aadfcc/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/DefaultScheduler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/DefaultScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/DefaultScheduler.java
index e038d7d..3cae48b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/DefaultScheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/DefaultScheduler.java
@@ -18,170 +18,293 @@
package org.apache.flink.runtime.jobmanager.scheduler;
-import java.lang.Thread.UncaughtExceptionHandler;
import java.util.ArrayDeque;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.Queue;
import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex2;
import org.apache.flink.runtime.instance.AllocatedSlot;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.instance.InstanceDiedException;
import org.apache.flink.runtime.instance.InstanceListener;
-import org.apache.flink.runtime.jobgraph.JobID;
-
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
/**
* The scheduler is responsible for distributing the ready-to-run tasks and assigning them to instances and
* slots.
- * <p>
- * The scheduler's bookkeeping on the available instances is lazy: It is not modified once an
- * instance is dead, but it will lazily remove the instance from its pool as soon as it tries
- * to allocate a resource on that instance and it fails with an {@link InstanceDiedException}.
*/
-public class DefaultScheduler implements InstanceListener {
+public class DefaultScheduler implements InstanceListener, SlotAvailablilityListener {
protected static final Logger LOG = LoggerFactory.getLogger(DefaultScheduler.class);
-
- private final Object lock = new Object();
+
+ private final Object globalLock = new Object();
+
/** All instances that the scheduler can deploy to */
private final Set<Instance> allInstances = new HashSet<Instance>();
/** All instances that still have available resources */
- private final Queue<Instance> instancesWithAvailableResources = new LifoSetQueue<Instance>();
-
-
- private final ConcurrentHashMap<ResourceId, AllocatedSlot> allocatedSlots = new ConcurrentHashMap<ResourceId, AllocatedSlot>();
-
-// /** A cache that remembers the last resource IDs it has seen, to co-locate future
-// * deployments of tasks with the same resource ID to the same instance.
-// */
-// private final Cache<ResourceId, Instance> ghostCache;
+ private final Queue<Instance> instancesWithAvailableResources = new SetQueue<Instance>();
/** All tasks pending to be scheduled */
- private final Queue<ScheduledUnit> taskQueue = new ArrayDeque<ScheduledUnit>();
-
+ private final Queue<QueuedTask> taskQueue = new ArrayDeque<QueuedTask>();
- /** The thread that runs the scheduling loop, picking up tasks to be scheduled and scheduling them. */
- private final Thread schedulerThread;
+ private int unconstrainedAssignments = 0;
- /** Atomic flag to safely control the shutdown */
- private final AtomicBoolean shutdown = new AtomicBoolean(false);
+ private int localizedAssignments = 0;
- /** Flag indicating whether the scheduler should reject a unit if it cannot find a resource
- * for it at the time of scheduling */
- private final boolean rejectIfNoResourceAvailable;
+ private int nonLocalizedAssignments = 0;
-
public DefaultScheduler() {
- this(true);
- }
-
- public DefaultScheduler(boolean rejectIfNoResourceAvailable) {
- this.rejectIfNoResourceAvailable = rejectIfNoResourceAvailable;
-
-// this.ghostCache = CacheBuilder.newBuilder()
-// .initialCapacity(64) // easy start
-// .maximumSize(1024) // retain some history
-// .weakValues() // do not prevent dead instances from being collected
-// .build();
-
- // set up (but do not start) the scheduling thread
- Runnable loopRunner = new Runnable() {
- @Override
- public void run() {
- runSchedulerLoop();
- }
- };
- this.schedulerThread = new Thread(loopRunner, "Scheduling Thread");
- }
-
- public void start() {
- if (shutdown.get()) {
- throw new IllegalStateException("Scheduler has been shut down.");
- }
-
- try {
- this.schedulerThread.start();
- }
- catch (IllegalThreadStateException e) {
- throw new IllegalStateException("The scheduler has already been started.");
- }
}
/**
* Shuts the scheduler down. After shut down no more tasks can be added to the scheduler.
*/
public void shutdown() {
- if (this.shutdown.compareAndSet(false, true)) {
- // clear the task queue and add the termination signal, to let
- // the scheduling loop know that things are done
- this.taskQueue.clear();
- this.taskQueue.add(TERMINATION_SIGNAL);
-
- // interrupt the scheduling thread, in case it was waiting for resources to
- // show up to deploy a task
- this.schedulerThread.interrupt();
+ synchronized (globalLock) {
+ for (Instance i : allInstances) {
+ i.removeSlotListener();
+ i.cancelAndReleaseAllSlots();
+ }
+ allInstances.clear();
+ instancesWithAvailableResources.clear();
+ taskQueue.clear();
}
}
-
- public void setUncaughtExceptionHandler(UncaughtExceptionHandler handler) {
- if (this.schedulerThread.getState() != Thread.State.NEW) {
- throw new IllegalStateException("Can only add exception handler before starting the scheduler.");
+
+ /**
+ *
+ * NOTE: In the presence of multi-threaded operations, this number may be inexact.
+ *
+ * @return The number of empty slots, for tasks.
+ */
+ public int getNumberOfAvailableSlots() {
+ int count = 0;
+
+ synchronized (globalLock) {
+ for (Instance instance : instancesWithAvailableResources) {
+ count += instance.getNumberOfAvailableSlots();
+ }
}
- this.schedulerThread.setUncaughtExceptionHandler(handler);
+
+ return count;
}
-
// --------------------------------------------------------------------------------------------
// Scheduling
// --------------------------------------------------------------------------------------------
+ public AllocatedSlot scheduleImmediately(ScheduledUnit task) throws NoResourceAvailableException {
+ Object ret = scheduleTask(task, false);
+ if (ret instanceof AllocatedSlot) {
+ return (AllocatedSlot) ret;
+ }
+ else {
+ throw new RuntimeException();
+ }
+ }
+
+ public SlotAllocationFuture scheduleQueued(ScheduledUnit task) throws NoResourceAvailableException {
+ Object ret = scheduleTask(task, true);
+ if (ret instanceof AllocatedSlot) {
+ return new SlotAllocationFuture((AllocatedSlot) ret);
+ }
+ if (ret instanceof SlotAllocationFuture) {
+ return (SlotAllocationFuture) ret;
+ }
+ else {
+ throw new RuntimeException();
+ }
+ }
+
/**
- * @param task
- * @param queueIfNoResource If true, this call will queue the request if no resource is immediately
- * available. If false, it will throw a {@link NoResourceAvailableException}
- * if no resource is immediately available.
+ * Returns either an {@link AllocatedSlot}, or an {@link SlotAllocationFuture}.
*/
- public void scheduleTask(ScheduledUnit task, boolean queueIfNoResource) {
+ private Object scheduleTask(ScheduledUnit task, boolean queueIfNoResource) throws NoResourceAvailableException {
if (task == null) {
throw new IllegalArgumentException();
}
- // if there is already a slot for that resource
- AllocatedSlot existing = this.allocatedSlots.get(task.getSharedResourceId());
- if (existing != null) {
- // try to attach to the existing slot
- if (existing.runTask(task.getTaskVertex())) {
- // all good, we are done
- return;
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Scheduling task " + task);
+ }
+
+ synchronized (globalLock) {
+ // 1) If the task has a strict co-schedule hint, obey it, if it has been assigned.
+// CoLocationHint hint = task.getCoScheduleHint();
+// if (hint != null) {
+//
+// // try to add to the slot, or make it wait on the hint and schedule the hint itself
+// if () {
+// return slot;
+// }
+// }
+
+ // 2) See if we can place the task somewhere together with another existing task.
+ // This is defined by the slot sharing groups
+ SlotSharingGroup sharingUnit = task.getSlotSharingGroup();
+ if (sharingUnit != null) {
+ // see if we can add the task to the current sharing group.
+ SlotSharingGroupAssignment assignment = sharingUnit.getTaskAssignment();
+ AllocatedSlot slot = assignment.getSlotForTask(task.getJobVertexId(), task.getTaskVertex());
+ if (slot != null) {
+ return slot;
+ }
+ }
+
+ // 3) We could not schedule it to an existing slot, so we need to get a new one or queue the task
+
+ // we need potentially to loop multiple times, because there may be false positives
+ // in the set-with-available-instances
+ while (true) {
+ Instance instanceToUse = getFreeInstanceForTask(task.getTaskVertex());
+
+ if (instanceToUse != null) {
+ try {
+ AllocatedSlot slot = instanceToUse.allocateSlot(task.getTaskVertex().getJobId());
+
+ // if the instance has further available slots, re-add it to the set of available resources.
+ if (instanceToUse.hasResourcesAvailable()) {
+ this.instancesWithAvailableResources.add(instanceToUse);
+ }
+
+ if (slot != null) {
+
+ // if the task is in a shared group, assign the slot to that group
+ // and get a sub slot in turn
+ if (sharingUnit != null) {
+ slot = sharingUnit.getTaskAssignment().addSlotWithTask(slot, task.getJobVertexId());
+ }
+
+ // try to run the task
+ if (slot.runTask(task.getTaskVertex())) {
+ return slot;
+ } else {
+ // did not assign, so we recycle the resource
+ slot.releaseSlot();
+ }
+ }
+ }
+ catch (InstanceDiedException e) {
+ // the instance died it has not yet been propagated to this scheduler
+ // remove the instance from the set of available instances
+ this.allInstances.remove(instanceToUse);
+ this.instancesWithAvailableResources.remove(instanceToUse);
+ }
+ }
+ else {
+ // no resource available now, so queue the request
+ if (queueIfNoResource) {
+ SlotAllocationFuture future = new SlotAllocationFuture();
+ this.taskQueue.add(new QueuedTask(task, future));
+ return future;
+ }
+ else {
+ throw new NoResourceAvailableException(task);
+ }
+ }
}
- // else: the slot was deallocated, we need to proceed as if there was none
}
+ }
- // check if there is a slot that has an available sub-slot for that group-vertex
- // TODO
+ /**
+ * Gets a suitable instance to schedule the vertex execution to.
+ * <p>
+ * NOTE: This method does is not thread-safe, it needs to be synchronized by the caller.
+ *
+ * @param vertex The task to run.
+ * @return The instance to run the vertex on, it {@code null}, if no instance is available.
+ */
+ protected Instance getFreeInstanceForTask(ExecutionVertex2 vertex) {
+ if (this.instancesWithAvailableResources.isEmpty()) {
+ return null;
+ }
+ Iterable<Instance> locationsIterable = vertex.getPreferredLocations();
+ Iterator<Instance> locations = locationsIterable == null ? null : locationsIterable.iterator();
+ if (locations != null && locations.hasNext()) {
+
+ while (locations.hasNext()) {
+ Instance location = locations.next();
+
+ if (location != null && this.instancesWithAvailableResources.remove(location)) {
+
+ localizedAssignments++;
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Local assignment: " + vertex.getSimpleName() + " --> " + location);
+ }
+
+ return location;
+ }
+ }
+
+ Instance instance = this.instancesWithAvailableResources.poll();
+ nonLocalizedAssignments++;
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Non-local assignment: " + vertex.getSimpleName() + " --> " + instance);
+ }
+ return instance;
+ }
+ else {
+ Instance instance = this.instancesWithAvailableResources.poll();
+ unconstrainedAssignments++;
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Unconstrained assignment: " + vertex.getSimpleName() + " --> " + instance);
+ }
+
+ return instance;
+ }
+ }
+
+ @Override
+ public void newSlotAvailable(Instance instance) {
+
+ // global lock before instance lock, so that the order of acquiring locks is always 1) global, 2) instance
+ synchronized (globalLock) {
+ QueuedTask queued = taskQueue.peek();
+
+ // the slot was properly released, we can allocate a new one from that instance
+
+ if (queued != null) {
+ ScheduledUnit task = queued.getTask();
+
+ try {
+ AllocatedSlot newSlot = instance.allocateSlot(task.getTaskVertex().getJobId());
+ if (newSlot != null && newSlot.runTask(task.getTaskVertex())) {
+
+ // success, remove from the task queue and notify the future
+ taskQueue.poll();
+ if (queued.getFuture() != null) {
+ queued.getFuture().setSlot(newSlot);
+ }
+ }
+ }
+ catch (InstanceDiedException e) {
+ this.allInstances.remove(instance);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Instance " + instance + " was marked dead asynchronously.");
+ }
+ }
+ }
+ else {
+ this.instancesWithAvailableResources.add(instance);
+ }
+ }
}
// --------------------------------------------------------------------------------------------
// Instance Availability
// --------------------------------------------------------------------------------------------
-
@Override
public void newInstanceAvailable(Instance instance) {
if (instance == null) {
@@ -195,15 +318,28 @@ public class DefaultScheduler implements InstanceListener {
}
// synchronize globally for instance changes
- synchronized (this.lock) {
+ synchronized (this.globalLock) {
+
// check we do not already use this instance
if (!this.allInstances.add(instance)) {
throw new IllegalArgumentException("The instance is already contained.");
}
+ try {
+ instance.setSlotAvailabilityListener(this);
+ }
+ catch (IllegalStateException e) {
+ this.allInstances.remove(instance);
+ LOG.error("Scheduler could not attach to the instance as a listener.");
+ }
+
+
// add it to the available resources and let potential waiters know
this.instancesWithAvailableResources.add(instance);
- this.lock.notifyAll();
+
+ for (int i = 0; i < instance.getNumberOfAvailableSlots(); i++) {
+ newSlotAvailable(instance);
+ }
}
}
@@ -216,173 +352,57 @@ public class DefaultScheduler implements InstanceListener {
instance.markDead();
// we only remove the instance from the pools, we do not care about the
- synchronized (this.lock) {
+ synchronized (this.globalLock) {
// the instance must not be available anywhere any more
this.allInstances.remove(instance);
this.instancesWithAvailableResources.remove(instance);
}
}
+ // --------------------------------------------------------------------------------------------
+ // Status reporting
+ // --------------------------------------------------------------------------------------------
+
public int getNumberOfAvailableInstances() {
- synchronized (lock) {
- return allInstances.size();
- }
+ return allInstances.size();
}
- // --------------------------------------------------------------------------------------------
- // Scheduling
- // --------------------------------------------------------------------------------------------
+ public int getNumberOfInstancesWithAvailableSlots() {
+ return instancesWithAvailableResources.size();
+ }
-// /**
-// * Schedules the given unit to an available resource. This call blocks if no resource
-// * is currently available
-// *
-// * @param unit The unit to be scheduled.
-// */
-// protected void scheduleQueuedUnit(ScheduledUnit unit) {
-// if (unit == null) {
-// throw new IllegalArgumentException("Unit to schedule must not be null.");
-// }
-//
-// // see if the resource Id has already an assigned resource
-// AllocatedSlot resource = this.allocatedSlots.get(unit.getSharedResourceId());
-//
-// if (resource == null) {
-// // not yet allocated. find a slot to schedule to
-// try {
-// resource = getResourceToScheduleUnit(unit, this.rejectIfNoResourceAvailable);
-// if (resource == null) {
-// throw new RuntimeException("Error: The resource to schedule to is null.");
-// }
-// }
-// catch (Exception e) {
-// // we cannot go on, the task needs to know what to do now.
-// unit.getTaskVertex().handleException(e);
-// return;
-// }
-// }
-//
-// resource.runTask(unit.getTaskVertex());
-// }
+ public int getNumberOfUnconstrainedAssignments() {
+ return unconstrainedAssignments;
+ }
- /**
- * Acquires a resource to schedule the given unit to. This call may block if no
- * resource is currently available, or throw an exception, based on the given flag.
- *
- * @param unit The unit to find a resource for.
- * @return The resource to schedule the execution of the given unit on.
- *
- * @throws NoResourceAvailableException If the {@code exceptionOnNoAvailability} flag is true and the scheduler
- * has currently no resources available.
- */
- protected AllocatedSlot getNewSlotForTask(ScheduledUnit unit, boolean queueIfNoResource)
- throws NoResourceAvailableException
- {
- synchronized (this.lock) {
- Instance instanceToUse = this.instancesWithAvailableResources.poll();
-
- // if there is nothing, throw an exception or wait, depending on what is configured
- if (instanceToUse != null) {
- try {
- AllocatedSlot slot = instanceToUse.allocateSlot(unit.getJobId(), unit.getSharedResourceId());
-
- // if the instance has further available slots, re-add it to the set of available resources.
- if (instanceToUse.hasResourcesAvailable()) {
- this.instancesWithAvailableResources.add(instanceToUse);
- }
-
- if (slot != null) {
- AllocatedSlot previous = this.allocatedSlots.putIfAbsent(unit.getSharedResourceId(), slot);
- if (previous != null) {
- // concurrently, someone allocated a slot for that ID
- // release the new one
- slot.cancelResource();
- slot = previous;
- }
- }
- // else fall through the loop
- }
- catch (InstanceDiedException e) {
- // the instance died it has not yet been propagated to this scheduler
- // remove the instance from the set of available instances
- this.allInstances.remove(instanceToUse);
- }
- }
-
-
- if (queueIfNoResource) {
- this.taskQueue.add(unit);
- }
- else {
- throw new NoResourceAvailableException(unit);
- }
- // at this point, we have an instance. request a slot from the instance
-
-
- // if the instance has further available slots, re-add it to the set of available
- // resources.
- // if it does not, but asynchronously a slot became available, we may attempt to add the
- // instance twice, which does not matter because of the set semantics of the "instancesWithAvailableResources"
- if (instanceToUse.hasResourcesAvailable()) {
- this.instancesWithAvailableResources.add(instanceToUse);
- }
-
- if (slot != null) {
- AllocatedSlot previous = this.allocatedSlots.putIfAbsent(unit.getSharedResourceId(), slot);
- if (previous != null) {
- // concurrently, someone allocated a slot for that ID
- // release the new one
- slot.cancelResource();
- slot = previous;
- }
- }
- // else fall through the loop
- }
- }
-
- return slot;
+ public int getNumberOfLocalizedAssignments() {
+ return localizedAssignments;
}
- protected void runSchedulerLoop() {
- // while the scheduler is alive
- while (!shutdown.get()) {
-
- // get the next unit
- ScheduledUnit next = null;
- try {
- next = this.taskQueue.take();
- }
- catch (InterruptedException e) {
- if (shutdown.get()) {
- return;
- } else {
- LOG.error("Scheduling loop was interrupted.");
- }
- }
-
- // if we see this special unit, it means we are done
- if (next == TERMINATION_SIGNAL) {
- return;
- }
-
- // deploy the next scheduling unit
- try {
- scheduleNextUnit(next);
- }
- catch (Throwable t) {
- // ignore the errors in the presence of a shutdown
- if (!shutdown.get()) {
- if (t instanceof Error) {
- throw (Error) t;
- } else if (t instanceof RuntimeException) {
- throw (RuntimeException) t;
- } else {
- throw new RuntimeException("Critical error in scheduler thread.", t);
- }
- }
- }
- }
+ public int getNumberOfNonLocalizedAssignments() {
+ return nonLocalizedAssignments;
}
- private static final ScheduledUnit TERMINATION_SIGNAL = new ScheduledUnit();
+ // --------------------------------------------------------------------------------------------
+
+ private static final class QueuedTask {
+
+ private final ScheduledUnit task;
+
+ private final SlotAllocationFuture future;
+
+
+ public QueuedTask(ScheduledUnit task, SlotAllocationFuture future) {
+ this.task = task;
+ this.future = future;
+ }
+
+ public ScheduledUnit getTask() {
+ return task;
+ }
+
+ public SlotAllocationFuture getFuture() {
+ return future;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e6aadfcc/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/InstanceFillDegreeComparator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/InstanceFillDegreeComparator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/InstanceFillDegreeComparator.java
deleted file mode 100644
index 47aadf9..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/InstanceFillDegreeComparator.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/***********************************************************************************************************************
- *
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- *
- **********************************************************************************************************************/
-
-package org.apache.flink.runtime.jobmanager.scheduler;
-
-import java.util.Comparator;
-
-import org.apache.flink.runtime.instance.Instance;
-
-public class InstanceFillDegreeComparator implements Comparator<Instance> {
-
- @Override
- public int compare(Instance o1, Instance o2) {
- float fraction1 = o1.getNumberOfAvailableSlots() / (float) o1.getTotalNumberOfSlots();
- float fraction2 = o2.getNumberOfAvailableSlots() / (float) o2.getTotalNumberOfSlots();
-
- return fraction1 < fraction2 ? -1 : 1;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e6aadfcc/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduledUnit.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduledUnit.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduledUnit.java
index 338529f..2b0de6f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduledUnit.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduledUnit.java
@@ -1,71 +1,75 @@
-/***********************************************************************************************************************
- *
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- *
- **********************************************************************************************************************/
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.flink.runtime.jobmanager.scheduler;
-import java.util.concurrent.atomic.AtomicBoolean;
-
import org.apache.flink.runtime.executiongraph.ExecutionVertex2;
-import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
public class ScheduledUnit {
- private final JobID jobId;
-
private final ExecutionVertex2 taskVertex;
- private final ResourceId resourceId;
-
- private final AtomicBoolean scheduled = new AtomicBoolean(false);
+ private final SlotSharingGroup sharingGroup;
+ // --------------------------------------------------------------------------------------------
- public ScheduledUnit(JobID jobId, ExecutionVertex2 taskVertex) {
- this(jobId, taskVertex, new ResourceId());
+ public ScheduledUnit(ExecutionVertex2 taskVertex) {
+ if (taskVertex == null) {
+ throw new NullPointerException();
+ }
+
+ this.taskVertex = taskVertex;
+ this.sharingGroup = null;
}
- public ScheduledUnit(JobID jobId, ExecutionVertex2 taskVertex, ResourceId resourceId) {
- if (jobId == null || taskVertex == null || resourceId == null) {
+ public ScheduledUnit(ExecutionVertex2 taskVertex, SlotSharingGroup sharingUnit) {
+ if (taskVertex == null) {
throw new NullPointerException();
}
- this.jobId = jobId;
this.taskVertex = taskVertex;
- this.resourceId = resourceId;
+ this.sharingGroup = sharingUnit;
}
ScheduledUnit() {
- this.jobId = null;
this.taskVertex = null;
- this.resourceId = null;
+ this.sharingGroup = null;
}
+ // --------------------------------------------------------------------------------------------
- public JobID getJobId() {
- return jobId;
+ public JobVertexID getJobVertexId() {
+ return this.taskVertex.getJobvertexId();
}
public ExecutionVertex2 getTaskVertex() {
return taskVertex;
}
- public ResourceId getSharedResourceId() {
- return resourceId;
+ public SlotSharingGroup getSlotSharingGroup() {
+ return sharingGroup;
}
+
+ // --------------------------------------------------------------------------------------------
@Override
public String toString() {
- return "(job=" + jobId + ", resourceId=" + resourceId + ", vertex=" + taskVertex + ')';
+ return "{vertex=" + taskVertex.getSimpleName() + ", sharingUnit=" + sharingGroup + '}';
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e6aadfcc/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulingStrategy.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulingStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulingStrategy.java
deleted file mode 100644
index 3bca46b..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulingStrategy.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/***********************************************************************************************************************
- *
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- *
- **********************************************************************************************************************/
-
-package org.apache.flink.runtime.jobmanager.scheduler;
-
-/**
- * The scheduling strategy describes how scheduler distributes tasks across resources.
- */
-public enum SchedulingStrategy {
-
- /**
- * This strategy tries to keep all machines utilized roughly the same.
- */
- SPREAD_OUT_TASKS,
-
- /**
- * This strategy will put as many tasks on one each machine as possible, before putting
- * tasks on the next machine.
- */
- CLUSTER_TASKS
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e6aadfcc/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SetQueue.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SetQueue.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SetQueue.java
new file mode 100644
index 0000000..56b1146
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SetQueue.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager.scheduler;
+
+import java.util.AbstractQueue;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.Queue;
+
+/**
+ * A queue that maintains set characteristics, i.e., elements
+ * that are already in the queue may not be added another time.
+ *
+ * @param <E> The type of the elements in the queue.
+ */
+public class SetQueue<E> extends AbstractQueue<E> implements Queue<E> {
+
+ private final LinkedHashSet<E> set = new LinkedHashSet<E>();
+
+ @Override
+ public boolean offer(E e) {
+ if (e == null) {
+ throw new NullPointerException();
+ }
+
+ // may, or may not, add the element.
+ set.add(e);
+
+ // we always return true, because the queue did not reject the element
+ // due to capacity constraints
+ return true;
+ }
+
+ @Override
+ public E poll() {
+ Iterator<E> iter = set.iterator();
+ if (iter.hasNext()) {
+ E next = iter.next();
+ iter.remove();
+ return next;
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ public E peek() {
+ Iterator<E> iter = set.iterator();
+ if (iter.hasNext()) {
+ return iter.next();
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ public Iterator<E> iterator() {
+ return set.iterator();
+ }
+
+ @Override
+ public int size() {
+ return set.size();
+ }
+
+ @Override
+ public void clear() {
+ set.clear();
+ }
+
+ @Override
+ public boolean remove(Object o) {
+ return set.remove(o);
+ }
+
+ @Override
+ public boolean contains(Object o) {
+ return set.contains(o);
+ }
+
+ @Override
+ public boolean removeAll(Collection<?> c) {
+ return set.removeAll(c);
+ }
+
+ @Override
+ public boolean containsAll(Collection<?> c) {
+ return set.containsAll(c);
+ }
+
+ @Override
+ public boolean retainAll(Collection<?> c) {
+ return set.retainAll(c);
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public int hashCode() {
+ return set.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj != null && obj.getClass() == SetQueue.class) {
+ return set.equals(((SetQueue<?>) obj).set);
+ }
+ else {
+ return false;
+ }
+ }
+
+ @Override
+ public String toString() {
+ return set.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e6aadfcc/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SharedSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SharedSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SharedSlot.java
new file mode 100644
index 0000000..36d8a8b
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SharedSlot.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager.scheduler;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.flink.runtime.instance.AllocatedSlot;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+public class SharedSlot {
+
+ private final AllocatedSlot allocatedSlot;
+
+ private final SlotSharingGroupAssignment assignmentGroup;
+
+ private final Set<SubSlot> subSlots;
+
+ private int subSlotNumber;
+
+ private volatile boolean disposed;
+
+ // --------------------------------------------------------------------------------------------
+
+ public SharedSlot(AllocatedSlot allocatedSlot, SlotSharingGroupAssignment assignmentGroup) {
+ if (allocatedSlot == null || assignmentGroup == null) {
+ throw new NullPointerException();
+ }
+
+ this.allocatedSlot = allocatedSlot;
+ this.assignmentGroup = assignmentGroup;
+ this.subSlots = new HashSet<SubSlot>();
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ public AllocatedSlot getAllocatedSlot() {
+ return this.allocatedSlot;
+ }
+
+ public boolean isDisposed() {
+ return disposed;
+ }
+
+ public int getNumberOfAllocatedSubSlots() {
+ synchronized (this.subSlots) {
+ return this.subSlots.size();
+ }
+ }
+
+ public SubSlot allocateSubSlot(JobVertexID jid) {
+ synchronized (this.subSlots) {
+ if (isDisposed()) {
+ return null;
+ } else {
+ SubSlot ss = new SubSlot(this, subSlotNumber++, jid);
+ this.subSlots.add(ss);
+ return ss;
+ }
+ }
+ }
+
+ void returnAllocatedSlot(SubSlot slot) {
+ boolean release;
+
+ synchronized (this.subSlots) {
+ if (!this.subSlots.remove(slot)) {
+ throw new IllegalArgumentException("Wrong shared slot for subslot.");
+ }
+
+ release = assignmentGroup.sharedSlotAvailableForJid(this, slot.getJobVertexId(), this.subSlots.isEmpty());
+
+ if (release) {
+ disposed = true;
+ }
+ }
+
+ // do this call outside the lock, because releasing the allocated slot may go into further scheduler calls
+ if (release) {
+ this.allocatedSlot.releaseSlot();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e6aadfcc/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFuture.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFuture.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFuture.java
new file mode 100644
index 0000000..679cccc
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFuture.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager.scheduler;
+
+import org.apache.flink.runtime.instance.AllocatedSlot;
+
+public class SlotAllocationFuture {
+
+ private final Object monitor = new Object();
+
+ private volatile AllocatedSlot slot;
+
+ private volatile SlotAllocationFutureAction action;
+
+ // --------------------------------------------------------------------------------------------
+
+ public SlotAllocationFuture() {}
+
+ public SlotAllocationFuture(AllocatedSlot slot) {
+ this.slot = slot;
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ public AllocatedSlot waitTillAllocated() throws InterruptedException {
+ return waitTillAllocated(0);
+ }
+
+ public AllocatedSlot waitTillAllocated(long timeout) throws InterruptedException {
+ synchronized (monitor) {
+ while (slot == null) {
+ monitor.wait(timeout);
+ }
+
+ return slot;
+ }
+ }
+
+ public void setFutureAction(SlotAllocationFutureAction action) {
+ synchronized (monitor) {
+ if (this.action != null) {
+ throw new IllegalStateException("Future already has an action registered.");
+ }
+
+ this.action = action;
+
+ if (this.slot != null) {
+ action.slotAllocated(this.slot);
+ }
+ }
+ }
+
+ public void setSlot(AllocatedSlot slot) {
+ if (slot == null) {
+ throw new NullPointerException();
+ }
+
+ synchronized (monitor) {
+ if (this.slot != null) {
+ throw new IllegalStateException("The future has already been assigned a slot.");
+ }
+
+ this.slot = slot;
+ monitor.notifyAll();
+
+ if (action != null) {
+ action.slotAllocated(slot);
+ }
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public String toString() {
+ return slot == null ? "PENDING" : "DONE";
+ }
+}