You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/09/21 04:12:33 UTC
[09/63] [abbrv] Refactor job graph construction to incremental
attachment based
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/OutputFormatOutputVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/OutputFormatOutputVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/OutputFormatOutputVertex.java
deleted file mode 100644
index 08a03bc..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/OutputFormatOutputVertex.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobgraph;
-
-import org.apache.flink.api.common.io.OutputFormat;
-import org.apache.flink.api.common.operators.util.UserCodeWrapper;
-import org.apache.flink.runtime.operators.util.TaskConfig;
-
-/**
- * A JobOutputVertex is a specific sub-type of a {@link AbstractJobOutputVertex} and is designed
- * for Nephele tasks which sink data in a not further specified way. As every job output vertex,
- * a JobOutputVertex must not have any further output.
- */
-public class OutputFormatOutputVertex extends AbstractJobOutputVertex {
- /**
- * Contains the output format associated to this output vertex. It can be <pre>null</pre>.
- */
- private OutputFormat<?> outputFormat;
-
-
- /**
- * Creates a new job file output vertex with the specified name.
- *
- * @param name
- * the name of the new job file output vertex
- * @param jobGraph
- * the job graph this vertex belongs to
- */
- public OutputFormatOutputVertex(String name, JobGraph jobGraph) {
- this(name, null, jobGraph);
- }
-
- public OutputFormatOutputVertex(String name, JobVertexID id, JobGraph jobGraph) {
- super(name, id, jobGraph);
- }
-
- /**
- * Creates a new job file input vertex.
- *
- * @param jobGraph
- * the job graph this vertex belongs to
- */
- public OutputFormatOutputVertex(JobGraph jobGraph) {
- this(null, jobGraph);
- }
-
- public void setOutputFormat(OutputFormat<?> format) {
- this.outputFormat = format;
- }
-
- public void initializeOutputFormatFromTaskConfig(ClassLoader cl) {
- TaskConfig cfg = new TaskConfig(getConfiguration());
- UserCodeWrapper<OutputFormat<?>> wrapper = cfg.<OutputFormat<?>>getStubWrapper(cl);
-
- if (wrapper != null) {
- this.outputFormat = wrapper.getUserCodeObject(OutputFormat.class, cl);
- this.outputFormat.configure(cfg.getStubParameters());
- }
- }
-
- /**
- * Returns the output format. It can also be <pre>null</pre>.
- *
- * @return output format or <pre>null</pre>
- */
- public OutputFormat<?> getOutputFormat() { return outputFormat; }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/OutputFormatVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/OutputFormatVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/OutputFormatVertex.java
new file mode 100644
index 0000000..029d109
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/OutputFormatVertex.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobgraph;
+
+import org.apache.flink.api.common.io.InitializeOnMaster;
+import org.apache.flink.api.common.io.OutputFormat;
+import org.apache.flink.api.common.operators.util.UserCodeWrapper;
+import org.apache.flink.runtime.operators.util.TaskConfig;
+
+/**
+ * A task vertex that run an initialization on the master, trying to deserialize an output format
+ * and initializing it on master, if necessary.
+ */
+public class OutputFormatVertex extends AbstractJobVertex {
+
+ private static final long serialVersionUID = 1L;
+
+
+ /** Caches the output format associated to this output vertex. */
+ private transient OutputFormat<?> outputFormat;
+
+ /**
+ * Creates a new task vertex with the specified name.
+ *
+ * @param name The name of the task vertex.
+ */
+ public OutputFormatVertex(String name) {
+ super(name);
+ }
+
+
+ @Override
+ public void initializeOnMaster(ClassLoader loader) throws Exception {
+ if (this.outputFormat == null) {
+ TaskConfig cfg = new TaskConfig(getConfiguration());
+ UserCodeWrapper<OutputFormat<?>> wrapper = cfg.<OutputFormat<?>>getStubWrapper(loader);
+
+ if (wrapper == null) {
+ throw new Exception("No output format present in OutputFormatVertex's task configuration.");
+ }
+
+ this.outputFormat = wrapper.getUserCodeObject(OutputFormat.class, loader);
+ this.outputFormat.configure(cfg.getStubParameters());
+ }
+
+ if (this.outputFormat instanceof InitializeOnMaster) {
+ ((InitializeOnMaster) this.outputFormat).initializeGlobal(getParallelism());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SimpleInputVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SimpleInputVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SimpleInputVertex.java
deleted file mode 100644
index 3699f0e..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SimpleInputVertex.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobgraph;
-
-import org.apache.flink.core.io.InputSplit;
-
-
-public class SimpleInputVertex extends AbstractJobInputVertex {
-
- /**
- * Creates a new job file output vertex with the specified name.
- *
- * @param name
- * the name of the new job file output vertex
- * @param jobGraph
- * the job graph this vertex belongs to
- */
- public SimpleInputVertex(String name, JobGraph jobGraph) {
- this(name, null, jobGraph);
- }
-
- public SimpleInputVertex(String name, JobVertexID id, JobGraph jobGraph) {
- super(name, id, jobGraph);
- }
-
- /**
- * Creates a new job file input vertex.
- *
- * @param jobGraph
- * the job graph this vertex belongs to
- */
- public SimpleInputVertex(JobGraph jobGraph) {
- this(null, jobGraph);
- }
-
- @Override
- public Class<? extends InputSplit> getInputSplitType() {
- return null;
- }
-
- @Override
- public InputSplit[] getInputSplits(int minNumSplits) throws Exception {
- return null;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SimpleOutputVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SimpleOutputVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SimpleOutputVertex.java
deleted file mode 100644
index 8709a07..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SimpleOutputVertex.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobgraph;
-
-/**
- * A JobOutputVertex is a specific sub-type of a {@link AbstractJobOutputVertex} and is designed
- * for Nephele tasks which sink data in a not further specified way. As every job output vertex,
- * a JobOutputVertex must not have any further output.
- */
-public class SimpleOutputVertex extends AbstractJobOutputVertex {
-
- /**
- * Creates a new job file output vertex with the specified name.
- *
- * @param name
- * the name of the new job file output vertex
- * @param jobGraph
- * the job graph this vertex belongs to
- */
- public SimpleOutputVertex(String name, JobGraph jobGraph) {
- this(name, null, jobGraph);
- }
-
- public SimpleOutputVertex(String name, JobVertexID id, JobGraph jobGraph) {
- super(name, id, jobGraph);
- }
-
- /**
- * Creates a new job file input vertex.
- *
- * @param jobGraph
- * the job graph this vertex belongs to
- */
- public SimpleOutputVertex(JobGraph jobGraph) {
- this(null, jobGraph);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java
index d3ad516..aab9c89 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java
@@ -29,7 +29,7 @@ public abstract class AbstractInvokable {
/**
* The environment assigned to this invokable.
*/
- private volatile Environment environment = null;
+ private volatile Environment environment;
/**
* Must be overwritten by the concrete task to instantiate the required record reader and record writer.
@@ -60,7 +60,6 @@ public abstract class AbstractInvokable {
*
* @return the environment of this task or <code>null</code> if the environment has not yet been set
*/
- // TODO: This method should be final
public Environment getEnvironment() {
return this.environment;
}
@@ -72,7 +71,6 @@ public abstract class AbstractInvokable {
* @return the current number of subtasks the respective task is split into
*/
public final int getCurrentNumberOfSubtasks() {
-
return this.environment.getCurrentNumberOfSubtasks();
}
@@ -82,7 +80,6 @@ public abstract class AbstractInvokable {
* @return the index of this subtask in the subtask group
*/
public final int getIndexInSubtaskGroup() {
-
return this.environment.getIndexInSubtaskGroup();
}
@@ -92,7 +89,6 @@ public abstract class AbstractInvokable {
* @return the task configuration object which was attached to the original {@link org.apache.flink.runtime.jobgraph.AbstractJobVertex}
*/
public final Configuration getTaskConfiguration() {
-
return this.environment.getTaskConfiguration();
}
@@ -102,40 +98,10 @@ public abstract class AbstractInvokable {
* @return the job configuration object which was attached to the original {@link org.apache.flink.runtime.jobgraph.JobGraph}
*/
public final Configuration getJobConfiguration() {
-
return this.environment.getJobConfiguration();
}
/**
- * This method should be called by the user code if a custom
- * user thread has been started.
- *
- * @param userThread
- * the user thread which has been started
- */
- public final void userThreadStarted(Thread userThread) {
-
- if (this.environment != null) {
- this.environment.userThreadStarted(userThread);
- }
-
- }
-
- /**
- * This method should be called by the user code if a custom
- * user thread has finished.
- *
- * @param userThread
- * the user thread which has finished
- */
- public final void userThreadFinished(Thread userThread) {
-
- if (this.environment != null) {
- this.environment.userThreadFinished(userThread);
- }
- }
-
- /**
* This method is called when a task is canceled either as a result of a user abort or an execution failure. It can
* be overwritten to respond to shut down the user code properly.
*
@@ -143,7 +109,6 @@ public abstract class AbstractInvokable {
* thrown if any exception occurs during the execution of the user code
*/
public void cancel() throws Exception {
-
// The default implementation does nothing.
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/InputSplitIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/InputSplitIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/InputSplitIterator.java
index 7aa3374..94e6cab 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/InputSplitIterator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/InputSplitIterator.java
@@ -24,7 +24,7 @@ import java.util.Iterator;
import org.apache.flink.core.io.InputSplit;
/**
- * The input split iterator allows an {@link AbstractInputTask} to iterator over all input splits it is supposed to
+ * The input split iterator allows a task to iterate over all input splits it is supposed to
* consume. Internally, the input split iterator calls an {@link InputSplitProvider} on each <code>next</code> call in
* order to facilitate lazy split assignment.
*
@@ -72,7 +72,6 @@ public class InputSplitIterator<T extends InputSplit> implements Iterator<T> {
@SuppressWarnings("unchecked")
@Override
public T next() {
-
T retVal = null;
if (this.nextInputSplit == null) {
@@ -88,8 +87,6 @@ public class InputSplitIterator<T extends InputSplit> implements Iterator<T> {
@Override
public void remove() {
-
throw new RuntimeException("The InputSplitIterator does not implement the remove method");
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/InputSplitProvider.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/InputSplitProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/InputSplitProvider.java
index 22722e7..20a4ab1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/InputSplitProvider.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/InputSplitProvider.java
@@ -16,23 +16,21 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.jobgraph.tasks;
import org.apache.flink.core.io.InputSplit;
/**
- * An input split provider can be successively queried to provide a series of {@link InputSplit} objects an
- * {@link AbstractInputTask} is supposed to consume in the course of its execution.
- *
+ * An input split provider can be successively queried to provide a series of {@link InputSplit} objects a
+ * task is supposed to consume in the course of its execution.
*/
public interface InputSplitProvider {
/**
- * Requests the next input split to be consumed by the calling {@link AbstractInputTask}.
+ * Requests the next input split to be consumed by the calling task.
*
- * @return the next input split to be consumed by the calling {@link AbstractInputTask} or <code>null</code> if the
- * {@link AbstractInputTask} shall not consume any further input splits.
+ * @return the next input split to be consumed by the calling task or <code>null</code> if the
+ * task shall not consume any further input splits.
*/
InputSplit getNextInputSplit();
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/DeploymentManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/DeploymentManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/DeploymentManager.java
deleted file mode 100644
index b8d9557..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/DeploymentManager.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.jobmanager;
-
-import java.util.List;
-
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-import org.apache.flink.runtime.instance.Instance;
-import org.apache.flink.runtime.jobgraph.JobID;
-
-/**
- * A deployment manager is responsible for deploying a list of {@link ExecutionVertex} objects the given
- * {@link org.apache.flink.runtime.instance.Instance}. It is called by a {@link org.apache.flink.runtime.jobmanager.scheduler.DefaultScheduler} implementation whenever at least one
- * {@link ExecutionVertex} has become ready to be executed.
- *
- */
-public interface DeploymentManager {
-
- /**
- * Deploys the list of vertices on the given {@link org.apache.flink.runtime.instance.Instance}.
- *
- * @param jobID
- * the ID of the job the vertices to be deployed belong to
- * @param instance
- * the instance on which the vertices shall be deployed
- * @param verticesToBeDeployed
- * the list of vertices to be deployed
- */
- void deploy(JobID jobID, Instance instance, List<ExecutionVertex> verticesToBeDeployed);
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/EventCollector.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/EventCollector.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/EventCollector.java
index 6800a68..d36659f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/EventCollector.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/EventCollector.java
@@ -16,7 +16,6 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.jobmanager;
import java.util.ArrayList;
@@ -32,27 +31,20 @@ import org.apache.flink.runtime.event.job.ExecutionStateChangeEvent;
import org.apache.flink.runtime.event.job.JobEvent;
import org.apache.flink.runtime.event.job.ManagementEvent;
import org.apache.flink.runtime.event.job.RecentJobEvent;
-import org.apache.flink.runtime.event.job.VertexAssignmentEvent;
import org.apache.flink.runtime.event.job.VertexEvent;
import org.apache.flink.runtime.execution.ExecutionListener;
-import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.execution.ExecutionState2;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
-import org.apache.flink.runtime.executiongraph.ExecutionGraphIterator;
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-import org.apache.flink.runtime.executiongraph.ExecutionVertexID;
-import org.apache.flink.runtime.executiongraph.InternalJobStatus;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.JobStatusListener;
import org.apache.flink.runtime.executiongraph.ManagementGraphFactory;
-import org.apache.flink.runtime.executiongraph.VertexAssignmentListener;
-import org.apache.flink.runtime.instance.AllocatedResource;
-import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.jobgraph.JobID;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.archive.ArchiveListener;
import org.apache.flink.runtime.managementgraph.ManagementGraph;
import org.apache.flink.runtime.managementgraph.ManagementVertex;
-import org.apache.flink.runtime.managementgraph.ManagementVertexID;
import org.apache.flink.runtime.profiling.ProfilingListener;
import org.apache.flink.runtime.profiling.types.ProfilingEvent;
@@ -62,7 +54,6 @@ import org.apache.flink.runtime.profiling.types.ProfilingEvent;
* the event collector removes all intervals which are older than the interval.
* <p>
* This class is thread-safe.
- *
*/
public final class EventCollector extends TimerTask implements ProfilingListener {
@@ -72,85 +63,43 @@ public final class EventCollector extends TimerTask implements ProfilingListener
* the data provided by the <code>executionStateChanged</code> callback method.
* However, these IDs are needed to create the construct the {@link VertexEvent} and the
* {@link ExecutionStateChangeEvent}.
- *
*/
private static final class ExecutionListenerWrapper implements ExecutionListener {
- /**
- * The event collector to forward the created event to.
- */
+ /** The event collector to forward the created event to. */
private final EventCollector eventCollector;
- /**
- * The vertex this listener belongs to.
- */
- private final ExecutionVertex vertex;
+ private final ExecutionGraph graph;
+
- /**
- * Constructs a new execution listener object.
- *
- * @param eventCollector
- * the event collector to forward the created event to
- * @param vertex
- * the vertex this listener belongs to.
- */
- public ExecutionListenerWrapper(final EventCollector eventCollector, final ExecutionVertex vertex) {
+ public ExecutionListenerWrapper(EventCollector eventCollector, ExecutionGraph graph) {
this.eventCollector = eventCollector;
- this.vertex = vertex;
+ this.graph = graph;
}
- /**
- * {@inheritDoc}
- */
@Override
- public void executionStateChanged(final JobID jobID, final ExecutionVertexID vertexID,
- final ExecutionState newExecutionState, final String optionalMessage) {
-
+ public void executionStateChanged(JobID jobID, JobVertexID vertexId, int subtask, ExecutionAttemptID executionId,
+ ExecutionState2 newExecutionState, String optionalMessage)
+ {
final long timestamp = System.currentTimeMillis();
- final JobVertexID jobVertexID = this.vertex.getGroupVertex().getJobVertexID();
- final String taskName = this.vertex.getGroupVertex().getName();
- final int totalNumberOfSubtasks = this.vertex.getGroupVertex().getCurrentNumberOfGroupMembers();
- final int indexInSubtaskGroup = this.vertex.getIndexInVertexGroup();
+ final ExecutionJobVertex vertex = graph.getJobVertex(vertexId);
+
+ final String taskName = vertex == null ? "(null)" : vertex.getJobVertex().getName();
+ final int totalNumberOfSubtasks = vertex == null ? -1 : vertex.getParallelism();
// Create a new vertex event
- final VertexEvent vertexEvent = new VertexEvent(timestamp, jobVertexID, taskName, totalNumberOfSubtasks,
- indexInSubtaskGroup, newExecutionState, optionalMessage);
+ final VertexEvent vertexEvent = new VertexEvent(timestamp, vertexId, taskName, totalNumberOfSubtasks,
+ subtask, newExecutionState, optionalMessage);
this.eventCollector.addEvent(jobID, vertexEvent);
final ExecutionStateChangeEvent executionStateChangeEvent = new ExecutionStateChangeEvent(timestamp,
- vertexID.toManagementVertexID(), newExecutionState);
+ vertexId.toManagementVertexId(subtask), newExecutionState);
this.eventCollector.updateManagementGraph(jobID, executionStateChangeEvent, optionalMessage);
this.eventCollector.addEvent(jobID, executionStateChangeEvent);
}
-
- /**
- * {@inheritDoc}
- */
- @Override
- public void userThreadStarted(final JobID jobID, final ExecutionVertexID vertexID, final Thread userThread) {
- // Nothing to do here
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public void userThreadFinished(final JobID jobID, final ExecutionVertexID vertexID, final Thread userThread) {
- // Nothing to do here
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public int getPriority() {
-
- return 20;
- }
-
}
/**
@@ -162,24 +111,16 @@ public final class EventCollector extends TimerTask implements ProfilingListener
*/
private static final class JobStatusListenerWrapper implements JobStatusListener {
- /**
- * The event collector to forward the created event to.
- */
+ /** The event collector to forward the created event to. */
private final EventCollector eventCollector;
- /**
- * The name of the job this wrapper has been created for.
- */
+ /** The name of the job this wrapper has been created for. */
private final String jobName;
- /**
- * <code>true</code> if profiling events are collected for the job, <code>false</code> otherwise.
- */
+ /** <code>true</code> if profiling events are collected for the job, <code>false</code> otherwise. */
private final boolean isProfilingAvailable;
- /**
- * The time stamp of the job submission
- */
+ /** The time stamp of the job submission */
private final long submissionTimestamp;
/**
@@ -194,101 +135,32 @@ public final class EventCollector extends TimerTask implements ProfilingListener
* @param submissionTimestamp
* the submission time stamp of the job
*/
- public JobStatusListenerWrapper(final EventCollector eventCollector, final String jobName,
- final boolean isProfilingAvailable, final long submissionTimestamp) {
-
+ public JobStatusListenerWrapper(EventCollector eventCollector, String jobName,
+ boolean isProfilingAvailable, long submissionTimestamp)
+ {
this.eventCollector = eventCollector;
this.jobName = jobName;
this.isProfilingAvailable = isProfilingAvailable;
this.submissionTimestamp = submissionTimestamp;
}
- /**
- * {@inheritDoc}
- */
@Override
- public void jobStatusHasChanged(final ExecutionGraph executionGraph, final InternalJobStatus newJobStatus,
- final String optionalMessage) {
+ public void jobStatusHasChanged(ExecutionGraph executionGraph, JobStatus newJobStatus, String optionalMessage) {
final JobID jobID = executionGraph.getJobID();
- if (newJobStatus == InternalJobStatus.SCHEDULED) {
+ if (newJobStatus == JobStatus.RUNNING) {
final ManagementGraph managementGraph = ManagementGraphFactory.fromExecutionGraph(executionGraph);
this.eventCollector.addManagementGraph(jobID, managementGraph);
}
// Update recent job event
- final JobStatus jobStatus = InternalJobStatus.toJobStatus(newJobStatus);
- if (jobStatus != null) {
- this.eventCollector.updateRecentJobEvent(jobID, this.jobName, this.isProfilingAvailable,
- this.submissionTimestamp, jobStatus);
-
- this.eventCollector.addEvent(jobID,
- new JobEvent(System.currentTimeMillis(), jobStatus, optionalMessage));
- }
- }
- }
-
- /**
- * The vertex assignment listener wrapper is an auxiliary class. It is required
- * because the job ID cannot be accessed from the data provided by the <code>vertexAssignmentChanged</code> callback
- * method. However, this job ID is needed to prepare the {@link VertexAssignmentEvent} for transmission.
- *
- */
- private static final class VertexAssignmentListenerWrapper implements VertexAssignmentListener {
-
- /**
- * The event collector to forward the created event to.
- */
- private final EventCollector eventCollector;
-
- /**
- * The ID the job this wrapper has been created for.
- */
- private final JobID jobID;
-
- /**
- * Constructs a new vertex assignment listener wrapper.
- *
- * @param eventCollector
- * the event collector to forward the events to
- * @param jobID
- * the ID of the job
- */
- public VertexAssignmentListenerWrapper(final EventCollector eventCollector, final JobID jobID) {
- this.eventCollector = eventCollector;
- this.jobID = jobID;
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public void vertexAssignmentChanged(final ExecutionVertexID id, final AllocatedResource newAllocatedResource) {
-
- // Create a new vertex assignment event
- final ManagementVertexID managementVertexID = id.toManagementVertexID();
- final long timestamp = System.currentTimeMillis();
-
- final Instance instance = newAllocatedResource.getInstance();
- VertexAssignmentEvent event;
- if (instance == null) {
- event = new VertexAssignmentEvent(timestamp, managementVertexID, "null");
- } else {
-
- String instanceName = null;
- if (instance.getInstanceConnectionInfo() != null) {
- instanceName = instance.getInstanceConnectionInfo().toString();
- } else {
- instanceName = instance.toString();
- }
+ this.eventCollector.updateRecentJobEvent(jobID, this.jobName, this.isProfilingAvailable,
+ this.submissionTimestamp, newJobStatus);
- event = new VertexAssignmentEvent(timestamp, managementVertexID, instanceName);
- }
-
- this.eventCollector.updateManagementGraph(jobID, event);
- this.eventCollector.addEvent(this.jobID, event);
+ this.eventCollector.addEvent(jobID,
+ new JobEvent(System.currentTimeMillis(), newJobStatus, optionalMessage));
}
}
@@ -344,8 +216,7 @@ public final class EventCollector extends TimerTask implements ProfilingListener
* <code>true</code> if {@link ManagementEvent} objects shall be added to the list as well,
* <code>false</code> otherwise
*/
- public void getEventsForJob(final JobID jobID, final List<AbstractEvent> eventList,
- final boolean includeManagementEvents) {
+ public void getEventsForJob(JobID jobID, List<AbstractEvent> eventList, boolean includeManagementEvents) {
synchronized (this.collectedEvents) {
@@ -431,15 +302,15 @@ public final class EventCollector extends TimerTask implements ProfilingListener
* @param jobStatus
* the status of the job
*/
- private void updateRecentJobEvent(final JobID jobID, final String jobName, final boolean isProfilingEnabled,
- final long submissionTimestamp, final JobStatus jobStatus) {
-
+ private void updateRecentJobEvent(JobID jobID, String jobName, boolean isProfilingEnabled,
+ long submissionTimestamp, JobStatus jobStatus)
+ {
final long currentTime = System.currentTimeMillis();
+
final RecentJobEvent recentJobEvent = new RecentJobEvent(jobID, jobName, jobStatus, isProfilingEnabled,
submissionTimestamp, currentTime);
synchronized (this.recentJobs) {
-
this.recentJobs.put(jobID, recentJobEvent);
}
}
@@ -448,7 +319,7 @@ public final class EventCollector extends TimerTask implements ProfilingListener
* Registers a job in form of its execution graph representation
* with the job progress collector. The collector will subscribe
* to state changes of the individual subtasks. A separate
- * deregistration is not necessary since the job progress collector
+ * de-registration is not necessary since the job progress collector
* periodically discards outdated progress information.
*
* @param executionGraph
@@ -458,26 +329,12 @@ public final class EventCollector extends TimerTask implements ProfilingListener
* @param submissionTimestamp
* the submission time stamp of the job
*/
- public void registerJob(final ExecutionGraph executionGraph, final boolean profilingAvailable,
- final long submissionTimestamp) {
-
- final Iterator<ExecutionVertex> it = new ExecutionGraphIterator(executionGraph, true);
-
- while (it.hasNext()) {
+ public void registerJob(ExecutionGraph executionGraph, boolean profilingAvailable, long submissionTimestamp) {
- final ExecutionVertex vertex = it.next();
+ executionGraph.registerExecutionListener(new ExecutionListenerWrapper(this, executionGraph));
- // Register the listener object which will pass state changes on to the collector
- vertex.registerExecutionListener(new ExecutionListenerWrapper(this, vertex));
-
- // Register the listener object which will pass assignment changes on to the collector
- vertex.registerVertexAssignmentListener(new VertexAssignmentListenerWrapper(this, executionGraph.getJobID()));
- }
-
- // Register one job status listener wrapper for the entire job
executionGraph.registerJobStatusListener(new JobStatusListenerWrapper(this, executionGraph.getJobName(),
profilingAvailable, submissionTimestamp));
-
}
/**
@@ -547,7 +404,6 @@ public final class EventCollector extends TimerTask implements ProfilingListener
@Override
public void processProfilingEvents(final ProfilingEvent profilingEvent) {
-
// Simply add profiling events to the job's event queue
addEvent(profilingEvent.getJobID(), profilingEvent);
}
@@ -561,7 +417,6 @@ public final class EventCollector extends TimerTask implements ProfilingListener
* the management graph to be added
*/
void addManagementGraph(final JobID jobID, final ManagementGraph managementGraph) {
-
synchronized (this.recentManagementGraphs) {
this.recentManagementGraphs.put(jobID, managementGraph);
}
@@ -576,38 +431,12 @@ public final class EventCollector extends TimerTask implements ProfilingListener
* @return the management graph for the job with the given ID or <code>null</code> if no such graph exists
*/
public ManagementGraph getManagementGraph(final JobID jobID) {
-
synchronized (this.recentManagementGraphs) {
return this.recentManagementGraphs.get(jobID);
}
}
/**
- * Applies changes in the vertex assignment to the stored management graph.
- *
- * @param jobID
- * the ID of the job whose management graph shall be updated
- * @param vertexAssignmentEvent
- * the event describing the changes in the vertex assignment
- */
- private void updateManagementGraph(final JobID jobID, final VertexAssignmentEvent vertexAssignmentEvent) {
-
- synchronized (this.recentManagementGraphs) {
-
- final ManagementGraph managementGraph = this.recentManagementGraphs.get(jobID);
- if (managementGraph == null) {
- return;
- }
- final ManagementVertex vertex = managementGraph.getVertexByID(vertexAssignmentEvent.getVertexID());
- if (vertex == null) {
- return;
- }
-
- vertex.setInstanceName(vertexAssignmentEvent.getInstanceName());
- }
- }
-
- /**
* Applies changes in the state of an execution vertex to the stored management graph.
*
* @param jobID
@@ -615,7 +444,7 @@ public final class EventCollector extends TimerTask implements ProfilingListener
* @param executionStateChangeEvent
* the event describing the changes in the execution state of the vertex
*/
- private void updateManagementGraph(final JobID jobID, final ExecutionStateChangeEvent executionStateChangeEvent, String optionalMessage) {
+ private void updateManagementGraph(JobID jobID, ExecutionStateChangeEvent executionStateChangeEvent, String optionalMessage) {
synchronized (this.recentManagementGraphs) {
@@ -629,7 +458,7 @@ public final class EventCollector extends TimerTask implements ProfilingListener
}
vertex.setExecutionState(executionStateChangeEvent.getNewExecutionState());
- if (executionStateChangeEvent.getNewExecutionState() == ExecutionState.FAILED) {
+ if (optionalMessage != null) {
vertex.setOptMessage(optionalMessage);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
index 3b76b78..fc76d73 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
@@ -16,7 +16,6 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.jobmanager;
import java.io.File;
@@ -25,11 +24,10 @@ import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
-import java.util.HashSet;
-import java.util.Iterator;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
-import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@@ -48,61 +46,49 @@ import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
-import org.apache.flink.core.io.StringRecord;
+import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.core.io.InputSplitAssigner;
import org.apache.flink.runtime.ExecutionMode;
+import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.accumulators.AccumulatorEvent;
import org.apache.flink.runtime.client.AbstractJobResult;
+import org.apache.flink.runtime.client.AbstractJobResult.ReturnCode;
import org.apache.flink.runtime.client.JobCancelResult;
import org.apache.flink.runtime.client.JobProgressResult;
import org.apache.flink.runtime.client.JobSubmissionResult;
-import org.apache.flink.runtime.client.AbstractJobResult.ReturnCode;
-import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.event.job.AbstractEvent;
import org.apache.flink.runtime.event.job.RecentJobEvent;
-import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
-import org.apache.flink.runtime.executiongraph.ExecutionEdge;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
-import org.apache.flink.runtime.executiongraph.ExecutionGraphIterator;
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-import org.apache.flink.runtime.executiongraph.ExecutionVertexID;
-import org.apache.flink.runtime.executiongraph.GraphConversionException;
-import org.apache.flink.runtime.executiongraph.InternalJobStatus;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.JobStatusListener;
-import org.apache.flink.runtime.instance.DefaultInstanceManager;
import org.apache.flink.runtime.instance.HardwareDescription;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.instance.InstanceConnectionInfo;
+import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.instance.InstanceManager;
import org.apache.flink.runtime.instance.LocalInstanceManager;
import org.apache.flink.runtime.io.network.ConnectionInfoLookupResponse;
-import org.apache.flink.runtime.io.network.RemoteReceiver;
import org.apache.flink.runtime.io.network.channels.ChannelID;
import org.apache.flink.runtime.ipc.RPC;
import org.apache.flink.runtime.ipc.Server;
import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.accumulators.AccumulatorManager;
import org.apache.flink.runtime.jobmanager.archive.ArchiveListener;
import org.apache.flink.runtime.jobmanager.archive.MemoryArchivist;
import org.apache.flink.runtime.jobmanager.scheduler.DefaultScheduler;
-import org.apache.flink.runtime.jobmanager.splitassigner.InputSplitWrapper;
import org.apache.flink.runtime.jobmanager.web.WebInfoServer;
import org.apache.flink.runtime.managementgraph.ManagementGraph;
-import org.apache.flink.runtime.managementgraph.ManagementVertexID;
-import org.apache.flink.runtime.profiling.JobManagerProfiler;
-import org.apache.flink.runtime.profiling.ProfilingUtils;
import org.apache.flink.runtime.protocols.AccumulatorProtocol;
import org.apache.flink.runtime.protocols.ChannelLookupProtocol;
import org.apache.flink.runtime.protocols.ExtendedManagementProtocol;
import org.apache.flink.runtime.protocols.InputSplitProviderProtocol;
import org.apache.flink.runtime.protocols.JobManagerProtocol;
-import org.apache.flink.runtime.taskmanager.AbstractTaskResult;
-import org.apache.flink.runtime.taskmanager.TaskCancelResult;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
-import org.apache.flink.runtime.taskmanager.TaskKillResult;
-import org.apache.flink.runtime.taskmanager.TaskSubmissionResult;
import org.apache.flink.runtime.types.IntegerRecord;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.runtime.util.ExecutorThreadFactory;
@@ -113,15 +99,13 @@ import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.log4j.PatternLayout;
+import com.google.common.base.Preconditions;
+
/**
- * In Nephele the job manager is the central component for communication with clients, creating
- * schedules for incoming jobs and supervise their execution. A job manager may only exist once in
- * the system and its address must be known the clients.
- * Task managers can discover the job manager by means of an UDP broadcast and afterwards advertise
- * themselves as new workers for tasks.
- *
+ * The JobManager is the master that coordinates the distributed execution.
+ * It receives jobs from clients, tracks the distributed execution.
*/
-public class JobManager implements DeploymentManager, ExtendedManagementProtocol, InputSplitProviderProtocol,
+public class JobManager implements ExtendedManagementProtocol, InputSplitProviderProtocol,
JobManagerProtocol, ChannelLookupProtocol, JobStatusListener, AccumulatorProtocol
{
@@ -130,32 +114,46 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
private final static int FAILURE_RETURN_CODE = 1;
+ /** Executor service for asynchronous commands (to relieve the RPC threads of work) */
private final ExecutorService executorService = Executors.newCachedThreadPool(ExecutorThreadFactory.INSTANCE);
- private final Server jobManagerServer;
- private final EventCollector eventCollector;
-
- private final ArchiveListener archive;
+ /** The RPC end point through which the JobManager gets its calls */
+ private final Server jobManagerServer;
+ /** Keeps track of the currently available task managers */
private final InstanceManager instanceManager;
+ /** Assigns tasks to slots and keeps track on available and allocated task slots*/
private final DefaultScheduler scheduler;
+ /** The currently running jobs */
+ private final ConcurrentHashMap<JobID, ExecutionGraph> currentJobs;
+
+
+ // begin: these will be consolidated / removed
+ private final EventCollector eventCollector;
+
+ private final ArchiveListener archive;
+
private final AccumulatorManager accumulatorManager;
-
private final int recommendedClientPollingInterval;
-
+ // end: these will be consolidated / removed
+
private final AtomicBoolean isShutdownInProgress = new AtomicBoolean(false);
-
+
private volatile boolean isShutDown;
private WebInfoServer server;
+ // --------------------------------------------------------------------------------------------
+ // Initialization & Shutdown
+ // --------------------------------------------------------------------------------------------
+
public JobManager(ExecutionMode executionMode) throws Exception {
final String ipcAddressString = GlobalConfiguration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null);
@@ -190,6 +188,8 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
this.archive = null;
}
+ this.currentJobs = new ConcurrentHashMap<JobID, ExecutionGraph>();
+
// Create the accumulator manager, with same archiving limit as web
// interface. We need to store the accumulators for at least one job.
// Otherwise they might be deleted before the client requested the
@@ -218,21 +218,15 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
this.instanceManager = new LocalInstanceManager(numTaskManagers);
}
else if (executionMode == ExecutionMode.CLUSTER) {
- this.instanceManager = new DefaultInstanceManager();
+ this.instanceManager = new InstanceManager();
}
else {
throw new IllegalArgumentException("ExecutionMode");
}
- // Try to load the scheduler for the given execution mode
- final String schedulerClassName = JobManagerUtils.getSchedulerClassName(executionMode);
- LOG.info("Trying to load " + schedulerClassName + " as scheduler");
-
- // Try to get the instance manager class name
- this.scheduler = JobManagerUtils.loadScheduler(schedulerClassName, this, this.instanceManager);
- if (this.scheduler == null) {
- throw new Exception("Unable to load scheduler " + schedulerClassName);
- }
+ // create the scheduler and make it listen at the availability of new instances
+ this.scheduler = new DefaultScheduler();
+ this.instanceManager.addInstanceListener(this.scheduler);
}
public void shutdown() {
@@ -275,393 +269,223 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
LOG.debug("Shutdown of job manager completed");
}
- /**
- * Entry point for the program
- *
- * @param args
- * arguments from the command line
- */
-
- public static void main(String[] args) {
- // determine if a valid log4j config exists and initialize a default logger if not
- if (System.getProperty("log4j.configuration") == null) {
- Logger root = Logger.getRootLogger();
- root.removeAllAppenders();
- PatternLayout layout = new PatternLayout("%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n");
- ConsoleAppender appender = new ConsoleAppender(layout, "System.err");
- root.addAppender(appender);
- root.setLevel(Level.INFO);
- }
-
- JobManager jobManager;
- try {
- jobManager = initialize(args);
- // Start info server for jobmanager
- jobManager.startInfoServer();
- }
- catch (Exception e) {
- LOG.fatal(e.getMessage(), e);
- System.exit(FAILURE_RETURN_CODE);
- }
-
- // Clean up is triggered through a shutdown hook
- // freeze this thread to keep the JVM alive (the job manager threads are daemon threads)
- Object w = new Object();
- synchronized (w) {
- try {
- w.wait();
- } catch (InterruptedException e) {}
- }
- }
-
- @SuppressWarnings("static-access")
- public static JobManager initialize(String[] args) throws Exception {
- final Option configDirOpt = OptionBuilder.withArgName("config directory").hasArg()
- .withDescription("Specify configuration directory.").create("configDir");
-
- final Option executionModeOpt = OptionBuilder.withArgName("execution mode").hasArg()
- .withDescription("Specify execution mode.").create("executionMode");
-
- final Options options = new Options();
- options.addOption(configDirOpt);
- options.addOption(executionModeOpt);
-
- CommandLineParser parser = new GnuParser();
- CommandLine line = null;
- try {
- line = parser.parse(options, args);
- } catch (ParseException e) {
- LOG.error("CLI Parsing failed. Reason: " + e.getMessage());
- System.exit(FAILURE_RETURN_CODE);
- }
-
- final String configDir = line.getOptionValue(configDirOpt.getOpt(), null);
- final String executionModeName = line.getOptionValue(executionModeOpt.getOpt(), "local");
-
- ExecutionMode executionMode = null;
- if ("local".equals(executionModeName)) {
- executionMode = ExecutionMode.LOCAL;
- } else if ("cluster".equals(executionModeName)) {
- executionMode = ExecutionMode.CLUSTER;
- } else {
- System.err.println("Unrecognized execution mode: " + executionModeName);
- System.exit(FAILURE_RETURN_CODE);
- }
-
- // print some startup environment info, like user, code revision, etc
- EnvironmentInformation.logEnvironmentInfo(LOG, "JobManager");
-
- // First, try to load global configuration
- GlobalConfiguration.loadConfiguration(configDir);
-
- // Create a new job manager object
- JobManager jobManager = new JobManager(executionMode);
-
- // Set base dir for info server
- Configuration infoserverConfig = GlobalConfiguration.getConfiguration();
- if (configDir != null && new File(configDir).isDirectory()) {
- infoserverConfig.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, configDir+"/..");
- }
- GlobalConfiguration.includeConfiguration(infoserverConfig);
- return jobManager;
- }
-
+ // --------------------------------------------------------------------------------------------
+ // Job Execution
+ // --------------------------------------------------------------------------------------------
@Override
public JobSubmissionResult submitJob(JobGraph job) throws IOException {
+
+ boolean success = false;
+
try {
// First check if job is null
if (job == null) {
return new JobSubmissionResult(AbstractJobResult.ReturnCode.ERROR, "Submitted job is null!");
}
- if (LOG.isDebugEnabled()) {
- LOG.debug("Submitted job " + job.getName() + " is not null");
- }
-
- // Check if any vertex of the graph has null edges
- AbstractJobVertex jv = job.findVertexWithNullEdges();
- if (jv != null) {
- JobSubmissionResult result = new JobSubmissionResult(AbstractJobResult.ReturnCode.ERROR, "Vertex "
- + jv.getName() + " has at least one null edge");
- return result;
+ if (LOG.isInfoEnabled()) {
+ LOG.info(String.format("Received job %s (%s)", job.getJobID(), job.getName()));
}
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Submitted job " + job.getName() + " has no null edges");
+
+ // get the existing execution graph (if we attach), or construct a new empty one to attach
+ ExecutionGraph executionGraph = this.currentJobs.get(job.getJobID());
+ if (executionGraph == null) {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Creating new execution graph for job " + job.getJobID() + " (" + job.getName() + ')');
+ }
+
+ executionGraph = new ExecutionGraph(job.getJobID(), job.getName(), job.getJobConfiguration(), this.executorService);
+ ExecutionGraph previous = this.currentJobs.putIfAbsent(job.getJobID(), executionGraph);
+ if (previous != null) {
+ throw new JobException("Concurrent submission of a job with the same jobId: " + job.getJobID());
+ }
}
-
- // Next, check if the graph is weakly connected
- if (!job.isWeaklyConnected()) {
- JobSubmissionResult result = new JobSubmissionResult(AbstractJobResult.ReturnCode.ERROR,
- "Job graph is not weakly connected");
- return result;
+ else {
+ if (LOG.isInfoEnabled()) {
+ LOG.info(String.format("Found existing execution graph for id %s, attaching this job.", job.getJobID()));
+ }
}
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("The graph of job " + job.getName() + " is weakly connected");
+
+ // grab the class loader for user-defined code
+ final ClassLoader userCodeLoader = LibraryCacheManager.getClassLoader(job.getJobID());
+ if (userCodeLoader == null) {
+ throw new JobException("The user code class loader could not be initialized.");
}
-
- // Check if job graph has cycles
- if (!job.isAcyclic()) {
- JobSubmissionResult result = new JobSubmissionResult(AbstractJobResult.ReturnCode.ERROR,
- "Job graph is not a DAG");
- return result;
+
+ String[] jarFilesForJob = LibraryCacheManager.getRequiredJarFiles(job.getJobID());
+ for (String fileId : jarFilesForJob) {
+ executionGraph.addUserCodeJarFile(fileId);
}
-
+
+ // first, perform the master initialization of the nodes
if (LOG.isDebugEnabled()) {
- LOG.debug("The graph of job " + job.getName() + " is acyclic");
- }
-
- // Check constrains on degree
- jv = job.areVertexDegreesCorrect();
- if (jv != null) {
- JobSubmissionResult result = new JobSubmissionResult(AbstractJobResult.ReturnCode.ERROR,
- "Degree of vertex " + jv.getName() + " is incorrect");
- return result;
+ LOG.debug(String.format("Running master initialization of job %s (%s)", job.getJobID(), job.getName()));
}
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("All vertices of job " + job.getName() + " have the correct degree");
+ try {
+ for (AbstractJobVertex vertex : job.getVertices()) {
+ // check that the vertex has an executable class
+ String executableClass = vertex.getInvokableClassName();
+ if (executableClass == null || executableClass.length() == 0) {
+ throw new JobException(String.format("The vertex %s (%s) has no invokable class.", vertex.getID(), vertex.getName()));
+ }
+
+ // master side initialization
+ vertex.initializeOnMaster(userCodeLoader);
+ }
}
-
- if (!job.isInstanceDependencyChainAcyclic()) {
- JobSubmissionResult result = new JobSubmissionResult(AbstractJobResult.ReturnCode.ERROR,
- "The dependency chain for instance sharing contains a cycle");
-
- return result;
+ catch (FileNotFoundException e) {
+ LOG.error("File-not-Found: " + e.getMessage());
+ return new JobSubmissionResult(AbstractJobResult.ReturnCode.ERROR, e.getMessage());
}
-
+
+ // first topologically sort the job vertices to form the basis of creating the execution graph
+ List<AbstractJobVertex> topoSorted = job.getVerticesSortedTopologicallyFromSources();
+
+ // first convert this job graph to an execution graph
if (LOG.isDebugEnabled()) {
- LOG.debug("The dependency chain for instance sharing is acyclic");
+ LOG.debug(String.format("Adding %d vertices from job graph %s (%s)", topoSorted.size(), job.getJobID(), job.getName()));
}
-
- // Try to create initial execution graph from job graph
- LOG.info("Creating initial execution graph from job graph " + job.getName());
- ExecutionGraph eg;
-
- try {
- eg = new ExecutionGraph(job, 1);
- } catch (GraphConversionException e) {
- if (e.getCause() == null) {
- return new JobSubmissionResult(AbstractJobResult.ReturnCode.ERROR, StringUtils.stringifyException(e));
- } else {
- Throwable t = e.getCause();
- if (t instanceof FileNotFoundException) {
- return new JobSubmissionResult(AbstractJobResult.ReturnCode.ERROR, t.getMessage());
- } else {
- return new JobSubmissionResult(AbstractJobResult.ReturnCode.ERROR, StringUtils.stringifyException(t));
- }
- }
+
+ executionGraph.attachJobGraph(topoSorted);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(String.format("Successfully created execution graph from job graph %s (%s)", job.getJobID(), job.getName()));
}
// Register job with the progress collector
if (this.eventCollector != null) {
- this.eventCollector.registerJob(eg, false, System.currentTimeMillis());
+ this.eventCollector.registerJob(executionGraph, false, System.currentTimeMillis());
}
// Register for updates on the job status
- eg.registerJobStatusListener(this);
+ executionGraph.registerJobStatusListener(this);
// Schedule job
if (LOG.isInfoEnabled()) {
LOG.info("Scheduling job " + job.getName());
}
- try {
- this.scheduler.scheduleJob(eg);
- } catch (SchedulingException e) {
- unregisterJob(eg);
- JobSubmissionResult result = new JobSubmissionResult(AbstractJobResult.ReturnCode.ERROR, StringUtils.stringifyException(e));
- return result;
- }
+ executionGraph.scheduleForExecution(this.scheduler);
// Return on success
+ success = true;
return new JobSubmissionResult(AbstractJobResult.ReturnCode.SUCCESS, null);
}
catch (Throwable t) {
LOG.error("Job submission failed.", t);
return new JobSubmissionResult(AbstractJobResult.ReturnCode.ERROR, StringUtils.stringifyException(t));
}
- }
-
-
- public InstanceManager getInstanceManager() {
- return this.instanceManager;
- }
-
- /**
- * This method is a convenience method to unregister a job from all of
- * Nephele's monitoring, profiling and optimization components at once.
- * Currently, it is only being used to unregister from profiling (if activated).
- *
- * @param executionGraph
- * the execution graph to remove from the job manager
- */
- private void unregisterJob(final ExecutionGraph executionGraph) {
-
- // Remove job from profiler (if activated)
- if (this.profiler != null
- && executionGraph.getJobConfiguration().getBoolean(ProfilingUtils.PROFILE_JOB_KEY, true)) {
- this.profiler.unregisterProfilingJob(executionGraph);
-
- if (this.eventCollector != null) {
- this.profiler.unregisterFromProfilingData(executionGraph.getJobID(), this.eventCollector);
- }
- }
-
- // Remove job from input split manager
- if (this.inputSplitManager != null) {
- this.inputSplitManager.unregisterJob(executionGraph);
- }
-
- // Unregister job with library cache manager
- try {
- LibraryCacheManager.unregister(executionGraph.getJobID());
- } catch (IOException ioe) {
- if (LOG.isWarnEnabled()) {
- LOG.warn(ioe);
+ finally {
+ if (!success) {
+ this.currentJobs.remove(job.getJobID());
+
+ try {
+ LibraryCacheManager.unregister(job.getJobID());
+ }
+ catch (IllegalStateException e) {
+ // may happen if the job failed before being registered at the
+ // library cache manager
+ }
+ catch (Throwable t) {
+ LOG.error("Error while de-registering job at library cache manager.", t);
+ }
}
}
}
-
@Override
- public void sendHeartbeat(final InstanceConnectionInfo instanceConnectionInfo) {
-
- // Delegate call to instance manager
- if (this.instanceManager != null) {
-
- final Runnable heartBeatRunnable = new Runnable() {
+ public JobCancelResult cancelJob(JobID jobID) throws IOException {
- @Override
- public void run() {
- instanceManager.reportHeartBeat(instanceConnectionInfo);
- }
- };
+ LOG.info("Trying to cancel job with ID " + jobID);
- this.executorService.execute(heartBeatRunnable);
+ final ExecutionGraph eg = this.currentJobs.get(jobID);
+ if (eg == null) {
+ LOG.info("No job found with ID " + jobID);
+ return new JobCancelResult(ReturnCode.ERROR, "Cannot find job with ID " + jobID);
}
- }
- @Override
- public RegisterTaskManagerResult registerTaskManager(final InstanceConnectionInfo instanceConnectionInfo,
- final HardwareDescription hardwareDescription, final IntegerRecord numberOfSlots){
- if(this.instanceManager != null) {
- final Runnable registerTaskManagerRunnable = new Runnable() {
- @Override
- public void run(){
- instanceManager.registerTaskManager(instanceConnectionInfo, hardwareDescription,
- numberOfSlots.getValue());
- }
- };
+ final Runnable cancelJobRunnable = new Runnable() {
+ @Override
+ public void run() {
+ eg.cancel();
+ }
+ };
- this.executorService.execute(registerTaskManagerRunnable);
- return new RegisterTaskManagerResult(RegisterTaskManagerResult.ReturnCode.SUCCESS);
- }
+ eg.execute(cancelJobRunnable);
- return new RegisterTaskManagerResult(RegisterTaskManagerResult.ReturnCode.FAILURE);
+ return new JobCancelResult(AbstractJobResult.ReturnCode.SUCCESS, null);
}
-
-
+
@Override
- public void updateTaskExecutionState(final TaskExecutionState executionState) throws IOException {
-
- // Ignore calls with executionResult == null
- if (executionState == null) {
- LOG.error("Received call to updateTaskExecutionState with executionState == null");
- return;
- }
+ public void updateTaskExecutionState(TaskExecutionState executionState) throws IOException {
+ Preconditions.checkNotNull(executionState);
- if (executionState.getExecutionState() == ExecutionState.FAILED) {
- LOG.error(executionState.getDescription());
- }
- final ExecutionGraph eg = this.scheduler.getExecutionGraphByID(executionState.getJobID());
+ final ExecutionGraph eg = this.currentJobs.get(executionState.getJobID());
if (eg == null) {
LOG.error("Cannot find execution graph for ID " + executionState.getJobID() + " to change state to "
+ executionState.getExecutionState());
return;
}
- final ExecutionVertex vertex = eg.getVertexByID(executionState.getID());
- if (vertex == null) {
- LOG.error("Cannot find vertex with ID " + executionState.getID() + " of job " + eg.getJobID()
- + " to change state to " + executionState.getExecutionState());
- return;
- }
-
- // Asynchronously update execute state of vertex
- vertex.updateExecutionStateAsynchronously(executionState.getExecutionState(), executionState.getDescription());
+ eg.updateState(executionState);
}
-
-
+
@Override
- public JobCancelResult cancelJob(final JobID jobID) throws IOException {
+ public InputSplit requestNextInputSplit(JobID jobID, JobVertexID vertexId) throws IOException {
- LOG.info("Trying to cancel job with ID " + jobID);
-
- final ExecutionGraph eg = this.scheduler.getExecutionGraphByID(jobID);
- if (eg == null) {
- return new JobCancelResult(ReturnCode.ERROR, "Cannot find job with ID " + jobID);
+ final ExecutionGraph graph = this.currentJobs.get(jobID);
+ if (graph == null) {
+ LOG.error("Cannot find execution graph to job ID " + jobID);
+ return null;
}
- final Runnable cancelJobRunnable = new Runnable() {
-
- @Override
- public void run() {
- eg.updateJobStatus(InternalJobStatus.CANCELING, "Job canceled by user");
- final TaskCancelResult cancelResult = cancelJob(eg);
- if (cancelResult != null) {
- LOG.error(cancelResult.getDescription());
- }
- }
- };
-
- eg.executeCommand(cancelJobRunnable);
-
- LOG.info("Cancel of job " + jobID + " successfully triggered");
+ final ExecutionJobVertex vertex = graph.getJobVertex(vertexId);
+ if (vertex == null) {
+ LOG.error("Cannot find execution vertex for vertex ID " + vertexId);
+ return null;
+ }
- return new JobCancelResult(AbstractJobResult.ReturnCode.SUCCESS, null);
+ InputSplitAssigner splitAssigner = vertex.getSplitAssigner();
+ if (splitAssigner == null) {
+ LOG.error("No InputSplitAssigner for vertex ID " + vertexId);
+ return null;
+ }
+
+
+ return splitAssigner.getNextInputSplit(null);
}
+
+ @Override
+ public void jobStatusHasChanged(ExecutionGraph executionGraph, JobStatus newJobStatus, String optionalMessage) {
- /**
- * Cancels all the tasks in the current and upper stages of the
- * given execution graph.
- *
- * @param eg
- * the execution graph representing the job to cancel.
- * @return <code>null</code> if no error occurred during the cancel attempt,
- * otherwise the returned object will describe the error
- */
- private TaskCancelResult cancelJob(final ExecutionGraph eg) {
-
- TaskCancelResult errorResult = null;
-
- /**
- * Cancel all nodes in the current and upper execution stages.
- */
- final Iterator<ExecutionVertex> it = new ExecutionGraphIterator(eg, eg.getIndexOfCurrentExecutionStage(),
- false, true);
- while (it.hasNext()) {
+ final JobID jid = executionGraph.getJobID();
+
+ if (LOG.isInfoEnabled()) {
+ String message = optionalMessage == null ? "." : ": " + optionalMessage;
+ LOG.info(String.format("Status of job %s (%s) changed to %s%s",
+ jid, executionGraph.getJobName(), newJobStatus, message));
+ }
- final ExecutionVertex vertex = it.next();
- final TaskCancelResult result = vertex.cancelTask();
- if (result.getReturnCode() != AbstractTaskResult.ReturnCode.SUCCESS) {
- errorResult = result;
+ // remove the job graph if the state is any terminal state
+ if (newJobStatus == JobStatus.FINISHED || newJobStatus == JobStatus.CANCELED || newJobStatus == JobStatus.FAILED) {
+ this.currentJobs.remove(jid);
+
+ try {
+ LibraryCacheManager.unregister(jid);
+ }
+ catch (Throwable t) {
+ LOG.warn("Could not properly unregister job " + jid + " from the library cache.");
}
}
-
- return errorResult;
}
-
@Override
public JobProgressResult getJobProgress(final JobID jobID) throws IOException {
if (this.eventCollector == null) {
- return new JobProgressResult(ReturnCode.ERROR, "JobManager does not support progress reports for jobs",
- null);
+ return new JobProgressResult(ReturnCode.ERROR, "JobManager does not support progress reports for jobs", null);
}
final SerializableArrayList<AbstractEvent> eventList = new SerializableArrayList<AbstractEvent>();
@@ -674,104 +498,32 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
@Override
public ConnectionInfoLookupResponse lookupConnectionInfo(InstanceConnectionInfo caller, JobID jobID, ChannelID sourceChannelID) {
- final ExecutionGraph eg = this.scheduler.getExecutionGraphByID(jobID);
+ final ExecutionGraph eg = this.currentJobs.get(jobID);
if (eg == null) {
LOG.error("Cannot find execution graph to job ID " + jobID);
return ConnectionInfoLookupResponse.createReceiverNotFound();
}
- final InternalJobStatus jobStatus = eg.getJobStatus();
- if (jobStatus == InternalJobStatus.FAILING || jobStatus == InternalJobStatus.CANCELING) {
- return ConnectionInfoLookupResponse.createJobIsAborting();
- }
-
- final ExecutionEdge edge = eg.getEdgeByID(sourceChannelID);
- if (edge == null) {
- LOG.error("Cannot find execution edge associated with ID " + sourceChannelID);
- return ConnectionInfoLookupResponse.createReceiverNotFound();
- }
-
- if (sourceChannelID.equals(edge.getInputChannelID())) {
- // Request was sent from an input channel
+ return eg.lookupConnectionInfoAndDeployReceivers(caller, sourceChannelID);
+ }
- final ExecutionVertex connectedVertex = edge.getOutputGate().getVertex();
-
- final Instance assignedInstance = connectedVertex.getAllocatedResource().getInstance();
- if (assignedInstance == null) {
- LOG.error("Cannot resolve lookup: vertex found for channel ID " + edge.getOutputGateIndex()
- + " but no instance assigned");
- // LOG.info("Created receiverNotReady for " + connectedVertex + " 1");
- return ConnectionInfoLookupResponse.createReceiverNotReady();
- }
-
- // Check execution state
- final ExecutionState executionState = connectedVertex.getExecutionState();
- if (executionState == ExecutionState.FINISHED) {
- // that should not happen. if there is data pending, the receiver cannot be ready
- return ConnectionInfoLookupResponse.createReceiverNotFound();
- }
-
- // running is common, finishing is happens when the lookup is for the close event
- if (executionState != ExecutionState.RUNNING && executionState != ExecutionState.FINISHING) {
- // LOG.info("Created receiverNotReady for " + connectedVertex + " in state " + executionState + " 2");
- return ConnectionInfoLookupResponse.createReceiverNotReady();
- }
-
- if (assignedInstance.getInstanceConnectionInfo().equals(caller)) {
- // Receiver runs on the same task manager
- return ConnectionInfoLookupResponse.createReceiverFoundAndReady(edge.getOutputChannelID());
- } else {
- // Receiver runs on a different task manager
-
- final InstanceConnectionInfo ici = assignedInstance.getInstanceConnectionInfo();
- final InetSocketAddress isa = new InetSocketAddress(ici.address(), ici.dataPort());
-
- return ConnectionInfoLookupResponse.createReceiverFoundAndReady(new RemoteReceiver(isa, edge.getConnectionID()));
- }
- }
- // else, the request is for an output channel
- // Find vertex of connected input channel
- final ExecutionVertex targetVertex = edge.getInputGate().getVertex();
-
- // Check execution state
- final ExecutionState executionState = targetVertex.getExecutionState();
-
- // check whether the task needs to be deployed
- if (executionState != ExecutionState.RUNNING && executionState != ExecutionState.FINISHING && executionState != ExecutionState.FINISHED) {
-
- if (executionState == ExecutionState.ASSIGNED) {
- final Runnable command = new Runnable() {
- @Override
- public void run() {
- scheduler.deployAssignedVertices(targetVertex);
- }
- };
- eg.executeCommand(command);
- }
-
- // LOG.info("Created receiverNotReady for " + targetVertex + " in state " + executionState + " 3");
- return ConnectionInfoLookupResponse.createReceiverNotReady();
- }
-
- final Instance assignedInstance = targetVertex.getAllocatedResource().getInstance();
- if (assignedInstance == null) {
- LOG.error("Cannot resolve lookup: vertex found for channel ID " + edge.getInputChannelID() + " but no instance assigned");
- // LOG.info("Created receiverNotReady for " + targetVertex + " in state " + executionState + " 4");
- return ConnectionInfoLookupResponse.createReceiverNotReady();
- }
-
- if (assignedInstance.getInstanceConnectionInfo().equals(caller)) {
- // Receiver runs on the same task manager
- return ConnectionInfoLookupResponse.createReceiverFoundAndReady(edge.getInputChannelID());
- } else {
- // Receiver runs on a different task manager
- final InstanceConnectionInfo ici = assignedInstance.getInstanceConnectionInfo();
- final InetSocketAddress isa = new InetSocketAddress(ici.address(), ici.dataPort());
-
- return ConnectionInfoLookupResponse.createReceiverFoundAndReady(new RemoteReceiver(isa, edge.getConnectionID()));
- }
+ // --------------------------------------------------------------------------------------------
+ // Properties
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * Tests whether the job manager has been shut down completely.
+ *
+ * @return <code>true</code> if the job manager has been shut down completely, <code>false</code> otherwise
+ */
+ public boolean isShutDown() {
+ return this.isShutDown;
}
-
+
+ public InstanceManager getInstanceManager() {
+ return this.instanceManager;
+ }
+
/**
* Returns current ManagementGraph from eventCollector and, if not current, from archive
*
@@ -828,239 +580,10 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
return eventList;
}
-
- @Override
- public void killTask(final JobID jobID, final ManagementVertexID id) throws IOException {
-
- final ExecutionGraph eg = this.scheduler.getExecutionGraphByID(jobID);
- if (eg == null) {
- LOG.error("Cannot find execution graph for job " + jobID);
- return;
- }
-
- final ExecutionVertex vertex = eg.getVertexByID(ExecutionVertexID.fromManagementVertexID(id));
- if (vertex == null) {
- LOG.error("Cannot find execution vertex with ID " + id);
- return;
- }
-
- LOG.info("Killing task " + vertex + " of job " + jobID);
-
- final Runnable runnable = new Runnable() {
-
- @Override
- public void run() {
-
- final TaskKillResult result = vertex.killTask();
- if (result.getReturnCode() != AbstractTaskResult.ReturnCode.SUCCESS) {
- LOG.error(result.getDescription());
- }
- }
- };
-
- eg.executeCommand(runnable);
- }
-
- /**
- * Tests whether the job manager has been shut down completely.
- *
- * @return <code>true</code> if the job manager has been shut down completely, <code>false</code> otherwise
- */
- public boolean isShutDown() {
- return this.isShutDown;
- }
-
-
-
- @Override
- public void jobStatusHasChanged(final ExecutionGraph executionGraph, final InternalJobStatus newJobStatus,
- final String optionalMessage) {
-
- LOG.info("Status of job " + executionGraph.getJobName() + "(" + executionGraph.getJobID() + ")"
- + " changed to " + newJobStatus);
-
- if (newJobStatus == InternalJobStatus.FAILING) {
-
- // Cancel all remaining tasks
- cancelJob(executionGraph);
- }
-
- if (newJobStatus == InternalJobStatus.CANCELED || newJobStatus == InternalJobStatus.FAILED
- || newJobStatus == InternalJobStatus.FINISHED) {
- // Unregister job for Nephele's monitoring, optimization components, and dynamic input split assignment
- unregisterJob(executionGraph);
- }
- }
-
-
- @Override
- public void logBufferUtilization(final JobID jobID) throws IOException {
-
- final ExecutionGraph eg = this.scheduler.getExecutionGraphByID(jobID);
- if (eg == null) {
- return;
- }
-
- final Set<Instance> allocatedInstance = new HashSet<Instance>();
-
- final Iterator<ExecutionVertex> it = new ExecutionGraphIterator(eg, true);
- while (it.hasNext()) {
-
- final ExecutionVertex vertex = it.next();
- final ExecutionState state = vertex.getExecutionState();
- if (state == ExecutionState.RUNNING || state == ExecutionState.FINISHING) {
- final Instance instance = vertex.getAllocatedResource().getInstance();
-
- if (instance instanceof DummyInstance) {
- LOG.error("Found instance of type DummyInstance for vertex " + vertex.getName() + " (state "
- + state + ")");
- continue;
- }
-
- allocatedInstance.add(instance);
- }
- }
-
- // Send requests to task managers from separate thread
- final Runnable requestRunnable = new Runnable() {
-
- @Override
- public void run() {
-
- final Iterator<Instance> it2 = allocatedInstance.iterator();
-
- try {
- while (it2.hasNext()) {
- it2.next().logBufferUtilization();
- }
- } catch (IOException ioe) {
- LOG.error(ioe);
- }
-
- }
- };
-
- // Hand over to the executor service
- this.executorService.execute(requestRunnable);
- }
-
@Override
public int getAvailableSlots() {
return getInstanceManager().getTotalNumberOfSlots();
}
-
-
- @Override
- public void deploy(final JobID jobID, final Instance instance,
- final List<ExecutionVertex> verticesToBeDeployed) {
-
- if (verticesToBeDeployed.isEmpty()) {
- LOG.error("Method 'deploy' called but list of vertices to be deployed is empty");
- return;
- }
-
- for (final ExecutionVertex vertex : verticesToBeDeployed) {
-
- // Check vertex state
- if (vertex.getExecutionState() != ExecutionState.READY) {
- LOG.error("Expected vertex " + vertex + " to be in state READY but it is in state "
- + vertex.getExecutionState());
- }
-
- vertex.updateExecutionState(ExecutionState.STARTING, null);
- }
-
- // Create a new runnable and pass it the executor service
- final Runnable deploymentRunnable = new Runnable() {
-
- /**
- * {@inheritDoc}
- */
- @Override
- public void run() {
-
- // Check if all required libraries are available on the instance
- try {
- instance.checkLibraryAvailability(jobID);
- } catch (IOException ioe) {
- LOG.error("Cannot check library availability: " + StringUtils.stringifyException(ioe));
- }
-
- final List<TaskDeploymentDescriptor> submissionList = new SerializableArrayList<TaskDeploymentDescriptor>();
-
- // Check the consistency of the call
- for (final ExecutionVertex vertex : verticesToBeDeployed) {
-
- submissionList.add(vertex.constructDeploymentDescriptor());
-
- LOG.info("Starting task " + vertex + " on " + vertex.getAllocatedResource().getInstance());
- }
-
- List<TaskSubmissionResult> submissionResultList = null;
-
- try {
- submissionResultList = instance.submitTasks(submissionList);
- } catch (final IOException ioe) {
- final String errorMsg = StringUtils.stringifyException(ioe);
- for (final ExecutionVertex vertex : verticesToBeDeployed) {
- vertex.updateExecutionStateAsynchronously(ExecutionState.FAILED, errorMsg);
- }
- }
-
- if (verticesToBeDeployed.size() != submissionResultList.size()) {
- LOG.error("size of submission result list does not match size of list with vertices to be deployed");
- }
-
- int count = 0;
- for (final TaskSubmissionResult tsr : submissionResultList) {
-
- ExecutionVertex vertex = verticesToBeDeployed.get(count++);
- if (!vertex.getID().equals(tsr.getVertexID())) {
- LOG.error("Expected different order of objects in task result list");
- vertex = null;
- for (final ExecutionVertex candVertex : verticesToBeDeployed) {
- if (tsr.getVertexID().equals(candVertex.getID())) {
- vertex = candVertex;
- break;
- }
- }
-
- if (vertex == null) {
- LOG.error("Cannot find execution vertex for vertex ID " + tsr.getVertexID());
- continue;
- }
- }
-
- if (tsr.getReturnCode() != AbstractTaskResult.ReturnCode.SUCCESS) {
- // Change the execution state to failed and let the scheduler deal with the rest
- vertex.updateExecutionStateAsynchronously(ExecutionState.FAILED, tsr.getDescription());
- }
- }
- }
- };
-
- this.executorService.execute(deploymentRunnable);
- }
-
-
- @Override
- public InputSplitWrapper requestNextInputSplit(final JobID jobID, final ExecutionVertexID vertexID,
- final IntegerRecord sequenceNumber) throws IOException {
-
- final ExecutionGraph graph = this.scheduler.getExecutionGraphByID(jobID);
- if (graph == null) {
- LOG.error("Cannot find execution graph to job ID " + jobID);
- return null;
- }
-
- final ExecutionVertex vertex = graph.getVertexByID(vertexID);
- if (vertex == null) {
- LOG.error("Cannot find execution vertex for vertex ID " + vertexID);
- return null;
- }
-
- return new InputSplitWrapper(jobID, this.inputSplitManager.getNextInputSplit(vertex, sequenceNumber.getValue()));
- }
/**
* Starts the Jetty Infoserver for the Jobmanager
@@ -1081,17 +604,11 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
}
- // TODO Add to RPC?
public List<RecentJobEvent> getOldJobs() throws IOException {
-
- //final List<RecentJobEvent> eventList = new SerializableArrayList<RecentJobEvent>();
-
if (this.archive == null) {
throw new IOException("No instance of the event collector found");
}
- //this.eventCollector.getRecentJobs(eventList);
-
return this.archive.getJobs();
}
@@ -1103,8 +620,8 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
return this.instanceManager.getNumberOfRegisteredTaskManagers();
}
- public Map<InstanceConnectionInfo, Instance> getInstances() {
- return this.instanceManager.getInstances();
+ public Map<InstanceID, Instance> getInstances() {
+ return this.instanceManager.getAllRegisteredInstances();
}
@Override
@@ -1120,4 +637,118 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
public Map<String, Accumulator<?, ?>> getAccumulators(JobID jobID) {
return this.accumulatorManager.getJobAccumulators(jobID);
}
+
+ public Map<JobID, ExecutionGraph> getCurrentJobs() {
+ return Collections.unmodifiableMap(currentJobs);
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // TaskManager to JobManager communication
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public boolean sendHeartbeat(InstanceID taskManagerId) {
+ return this.instanceManager.reportHeartBeat(taskManagerId);
+ }
+
+ @Override
+ public InstanceID registerTaskManager(InstanceConnectionInfo instanceConnectionInfo, HardwareDescription hardwareDescription, int numberOfSlots) {
+ return this.instanceManager.registerTaskManager(instanceConnectionInfo, hardwareDescription, numberOfSlots);
+ }
+
+
+ // --------------------------------------------------------------------------------------------
+ // Executable
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * Entry point for the program
+ *
+ * @param args
+ * arguments from the command line
+ */
+
+ public static void main(String[] args) {
+ // determine if a valid log4j config exists and initialize a default logger if not
+ if (System.getProperty("log4j.configuration") == null) {
+ Logger root = Logger.getRootLogger();
+ root.removeAllAppenders();
+ PatternLayout layout = new PatternLayout("%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n");
+ ConsoleAppender appender = new ConsoleAppender(layout, "System.err");
+ root.addAppender(appender);
+ root.setLevel(Level.INFO);
+ }
+
+ JobManager jobManager;
+ try {
+ jobManager = initialize(args);
+ // Start info server for jobmanager
+ jobManager.startInfoServer();
+ }
+ catch (Exception e) {
+ LOG.fatal(e.getMessage(), e);
+ System.exit(FAILURE_RETURN_CODE);
+ }
+
+ // Clean up is triggered through a shutdown hook
+ // freeze this thread to keep the JVM alive (the job manager threads are daemon threads)
+ Object w = new Object();
+ synchronized (w) {
+ try {
+ w.wait();
+ } catch (InterruptedException e) {}
+ }
+ }
+
+ @SuppressWarnings("static-access")
+ public static JobManager initialize(String[] args) throws Exception {
+ final Option configDirOpt = OptionBuilder.withArgName("config directory").hasArg()
+ .withDescription("Specify configuration directory.").create("configDir");
+
+ final Option executionModeOpt = OptionBuilder.withArgName("execution mode").hasArg()
+ .withDescription("Specify execution mode.").create("executionMode");
+
+ final Options options = new Options();
+ options.addOption(configDirOpt);
+ options.addOption(executionModeOpt);
+
+ CommandLineParser parser = new GnuParser();
+ CommandLine line = null;
+ try {
+ line = parser.parse(options, args);
+ } catch (ParseException e) {
+ LOG.error("CLI Parsing failed. Reason: " + e.getMessage());
+ System.exit(FAILURE_RETURN_CODE);
+ }
+
+ final String configDir = line.getOptionValue(configDirOpt.getOpt(), null);
+ final String executionModeName = line.getOptionValue(executionModeOpt.getOpt(), "local");
+
+ ExecutionMode executionMode = null;
+ if ("local".equals(executionModeName)) {
+ executionMode = ExecutionMode.LOCAL;
+ } else if ("cluster".equals(executionModeName)) {
+ executionMode = ExecutionMode.CLUSTER;
+ } else {
+ System.err.println("Unrecognized execution mode: " + executionModeName);
+ System.exit(FAILURE_RETURN_CODE);
+ }
+
+ // print some startup environment info, like user, code revision, etc
+ EnvironmentInformation.logEnvironmentInfo(LOG, "JobManager");
+
+ // First, try to load global configuration
+ GlobalConfiguration.loadConfiguration(configDir);
+
+ // Create a new job manager object
+ JobManager jobManager = new JobManager(executionMode);
+
+ // Set base dir for info server
+ Configuration infoserverConfig = GlobalConfiguration.getConfiguration();
+ if (configDir != null && new File(configDir).isDirectory()) {
+ infoserverConfig.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, configDir+"/..");
+ }
+ GlobalConfiguration.includeConfiguration(infoserverConfig);
+ return jobManager;
+ }
}