You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/09/21 04:12:39 UTC
[15/63] [abbrv] Refactor job graph construction to incremental
attachment based
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
index 82600d2..2d00f40 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
@@ -16,81 +16,63 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.deployment;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.core.io.StringRecord;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
-import org.apache.flink.runtime.executiongraph.ExecutionVertexID;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.jobgraph.JobID;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.util.SerializableArrayList;
-import org.apache.flink.util.StringUtils;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.types.StringValue;
/**
* A task deployment descriptor contains all the information necessary to deploy a task on a task manager.
- * <p>
- * This class is not thread-safe in general.
- *
*/
public final class TaskDeploymentDescriptor implements IOReadableWritable {
- /**
- * The ID of the job the tasks belongs to.
- */
+ /** The ID of the job the tasks belongs to. */
private final JobID jobID;
- /**
- * The task's execution vertex ID.
- */
- private final ExecutionVertexID vertexID;
+ /** The task's job vertex ID. */
+ private final JobVertexID vertexID;
+
+ /** The ID referencing the attempt to execute the task. */
+ private final ExecutionAttemptID executionId;
- /**
- * The task's name.
- */
+ /** The task's name. */
private String taskName;
- /**
- * The task's index in the subtask group.
- */
+ /** The task's index in the subtask group. */
private int indexInSubtaskGroup;
- /**
- * The current number of subtasks.
- */
+ /** The current number of subtasks. */
private int currentNumberOfSubtasks;
- /**
- * The configuration of the job the task belongs to.
- */
+ /** The configuration of the job the task belongs to. */
private Configuration jobConfiguration;
- /**
- * The task's configuration object.
- */
+ /** The task's configuration object. */
private Configuration taskConfiguration;
+ /** The name of the class containing the task code to be executed. */
+ private String invokableClassName;
- /**
- * The class containing the task code to be executed.
- */
- private Class<? extends AbstractInvokable> invokableClass;
+ /** The list of output gate deployment descriptors. */
+ private List<GateDeploymentDescriptor> outputGates;
- /**
- * The list of output gate deployment descriptors.
- */
- private final SerializableArrayList<GateDeploymentDescriptor> outputGates;
-
- /**
- * The list of input gate deployment descriptors.
- */
- private final SerializableArrayList<GateDeploymentDescriptor> inputGates;
+ /** The list of input gate deployment descriptors. */
+ private List<GateDeploymentDescriptor> inputGates;
+
+ private String[] requiredJarFiles;
+
+ private int targetSlotNumber;
/**
* Constructs a task deployment descriptor.
@@ -109,167 +91,58 @@ public final class TaskDeploymentDescriptor implements IOReadableWritable {
* the configuration of the job the task belongs to
* @param taskConfiguration
* the task's configuration object
- * @param invokableClass
+ * @param invokableClassName
* the class containing the task code to be executed
* @param outputGates
* list of output gate deployment descriptors
- * @param inputGateIDs
- * list of input gate deployment descriptors
*/
- public TaskDeploymentDescriptor(final JobID jobID, final ExecutionVertexID vertexID, final String taskName,
- final int indexInSubtaskGroup, final int currentNumberOfSubtasks, final Configuration jobConfiguration,
- final Configuration taskConfiguration,
- final Class<? extends AbstractInvokable> invokableClass,
- final SerializableArrayList<GateDeploymentDescriptor> outputGates,
- final SerializableArrayList<GateDeploymentDescriptor> inputGates) {
-
- if (jobID == null) {
- throw new IllegalArgumentException("Argument jobID must not be null");
- }
-
- if (vertexID == null) {
- throw new IllegalArgumentException("Argument vertexID must not be null");
- }
-
- if (taskName == null) {
- throw new IllegalArgumentException("Argument taskName must not be null");
- }
-
- if (indexInSubtaskGroup < 0) {
- throw new IllegalArgumentException("Argument indexInSubtaskGroup must not be smaller than zero");
+ public TaskDeploymentDescriptor(JobID jobID, JobVertexID vertexID, ExecutionAttemptID execuionId,
+ String taskName, int indexInSubtaskGroup, int currentNumberOfSubtasks,
+ Configuration jobConfiguration, Configuration taskConfiguration,
+ String invokableClassName,
+ List<GateDeploymentDescriptor> outputGates,
+ List<GateDeploymentDescriptor> inputGates,
+ String[] requiredJarFiles,
+ int targetSlotNumber)
+ {
+ if (jobID == null || vertexID == null || execuionId == null || taskName == null || indexInSubtaskGroup < 0 ||
+ currentNumberOfSubtasks <= indexInSubtaskGroup || jobConfiguration == null ||
+ taskConfiguration == null || invokableClassName == null || outputGates == null || inputGates == null)
+ {
+ throw new IllegalArgumentException();
}
-
- if (currentNumberOfSubtasks < indexInSubtaskGroup) {
- throw new IllegalArgumentException(
- "Argument currentNumberOfSubtasks must not be smaller than argument indexInSubtaskGroup");
- }
-
- if (jobConfiguration == null) {
- throw new IllegalArgumentException("Argument jobConfiguration must not be null");
- }
-
- if (taskConfiguration == null) {
- throw new IllegalArgumentException("Argument taskConfiguration must not be null");
- }
-
- if (invokableClass == null) {
- throw new IllegalArgumentException("Argument invokableClass must not be null");
- }
-
- if (outputGates == null) {
- throw new IllegalArgumentException("Argument outputGates must not be null");
- }
-
- if (inputGates == null) {
- throw new IllegalArgumentException("Argument inputGates must not be null");
- }
-
+
this.jobID = jobID;
this.vertexID = vertexID;
+ this.executionId = execuionId;
this.taskName = taskName;
this.indexInSubtaskGroup = indexInSubtaskGroup;
this.currentNumberOfSubtasks = currentNumberOfSubtasks;
this.jobConfiguration = jobConfiguration;
this.taskConfiguration = taskConfiguration;
- this.invokableClass = invokableClass;
+ this.invokableClassName = invokableClassName;
this.outputGates = outputGates;
this.inputGates = inputGates;
+ this.requiredJarFiles = requiredJarFiles == null ? new String[0] : requiredJarFiles;
+ this.targetSlotNumber = targetSlotNumber;
}
/**
* Default constructor for serialization/deserialization.
*/
public TaskDeploymentDescriptor() {
-
this.jobID = new JobID();
- this.vertexID = new ExecutionVertexID();
- this.taskName = null;
- this.indexInSubtaskGroup = 0;
- this.currentNumberOfSubtasks = 0;
+ this.vertexID = new JobVertexID();
+ this.executionId = new ExecutionAttemptID();
this.jobConfiguration = new Configuration();
this.taskConfiguration = new Configuration();
- this.invokableClass = null;
- this.outputGates = new SerializableArrayList<GateDeploymentDescriptor>();
- this.inputGates = new SerializableArrayList<GateDeploymentDescriptor>();
- }
-
-
- @Override
- public void write(final DataOutputView out) throws IOException {
-
- this.jobID.write(out);
- this.vertexID.write(out);
- StringRecord.writeString(out, this.taskName);
- out.writeInt(this.indexInSubtaskGroup);
- out.writeInt(this.currentNumberOfSubtasks);
-
- // Write out the names of the required jar files
- final String[] requiredJarFiles = LibraryCacheManager.getRequiredJarFiles(this.jobID);
-
- out.writeInt(requiredJarFiles.length);
- for (int i = 0; i < requiredJarFiles.length; i++) {
- StringRecord.writeString(out, requiredJarFiles[i]);
- }
-
- // Write out the name of the invokable class
- if (this.invokableClass == null) {
- throw new IOException("this.invokableClass is null");
- }
-
- StringRecord.writeString(out, this.invokableClass.getName());
-
- this.jobConfiguration.write(out);
- this.taskConfiguration.write(out);
-
- this.outputGates.write(out);
- this.inputGates.write(out);
+ this.outputGates = Collections.emptyList();
+ this.inputGates = Collections.emptyList();
+ this.requiredJarFiles = new String[0];
}
- @SuppressWarnings("unchecked")
- @Override
- public void read(final DataInputView in) throws IOException {
-
- this.jobID.read(in);
- this.vertexID.read(in);
- this.taskName = StringRecord.readString(in);
- this.indexInSubtaskGroup = in.readInt();
- this.currentNumberOfSubtasks = in.readInt();
-
- // Read names of required jar files
- final String[] requiredJarFiles = new String[in.readInt()];
- for (int i = 0; i < requiredJarFiles.length; i++) {
- requiredJarFiles[i] = StringRecord.readString(in);
- }
-
- // Now register data with the library manager
- LibraryCacheManager.register(this.jobID, requiredJarFiles);
-
- // Get ClassLoader from Library Manager
- final ClassLoader cl = LibraryCacheManager.getClassLoader(this.jobID);
-
- // Read the name of the invokable class;
- final String invokableClassName = StringRecord.readString(in);
-
- if (invokableClassName == null) {
- throw new IOException("invokableClassName is null");
- }
-
- try {
- this.invokableClass = (Class<? extends AbstractInvokable>) Class.forName(invokableClassName, true, cl);
- } catch (ClassNotFoundException cnfe) {
- throw new IOException("Class " + invokableClassName + " not found in one of the supplied jar files: "
- + StringUtils.stringifyException(cnfe));
- }
-
- this.jobConfiguration = new Configuration(cl);
- this.jobConfiguration.read(in);
- this.taskConfiguration = new Configuration(cl);
- this.taskConfiguration.read(in);
- this.outputGates.read(in);
- this.inputGates.read(in);
- }
/**
* Returns the ID of the job the tasks belongs to.
@@ -277,7 +150,6 @@ public final class TaskDeploymentDescriptor implements IOReadableWritable {
* @return the ID of the job the tasks belongs to
*/
public JobID getJobID() {
-
return this.jobID;
}
@@ -286,10 +158,13 @@ public final class TaskDeploymentDescriptor implements IOReadableWritable {
*
* @return the task's execution vertex ID
*/
- public ExecutionVertexID getVertexID() {
-
+ public JobVertexID getVertexID() {
return this.vertexID;
}
+
+ public ExecutionAttemptID getExecutionId() {
+ return executionId;
+ }
/**
* Returns the task's name.
@@ -297,7 +172,6 @@ public final class TaskDeploymentDescriptor implements IOReadableWritable {
* @return the task's name
*/
public String getTaskName() {
-
return this.taskName;
}
@@ -307,7 +181,6 @@ public final class TaskDeploymentDescriptor implements IOReadableWritable {
* @return the task's index in the subtask group
*/
public int getIndexInSubtaskGroup() {
-
return this.indexInSubtaskGroup;
}
@@ -317,7 +190,6 @@ public final class TaskDeploymentDescriptor implements IOReadableWritable {
* @return the current number of subtasks
*/
public int getCurrentNumberOfSubtasks() {
-
return this.currentNumberOfSubtasks;
}
@@ -327,7 +199,6 @@ public final class TaskDeploymentDescriptor implements IOReadableWritable {
* @return the configuration of the job the tasks belongs to
*/
public Configuration getJobConfiguration() {
-
return this.jobConfiguration;
}
@@ -337,61 +208,97 @@ public final class TaskDeploymentDescriptor implements IOReadableWritable {
* @return the task's configuration object
*/
public Configuration getTaskConfiguration() {
-
return this.taskConfiguration;
}
/**
- * Returns the class containing the task code to be executed.
+ * Returns the name of the class containing the task code to be executed.
*
- * @return the class containing the task code to be executed
+ * @return The name of the class containing the task code to be executed
*/
- public Class<? extends AbstractInvokable> getInvokableClass() {
-
- return this.invokableClass;
+ public String getInvokableClassName() {
+ return this.invokableClassName;
}
- /**
- * Returns the number of output gate deployment descriptors contained in this task deployment descriptor.
- *
- * @return the number of output gate deployment descriptors
- */
- public int getNumberOfOutputGateDescriptors() {
-
- return this.outputGates.size();
+ public List<GateDeploymentDescriptor> getOutputGates() {
+ return outputGates;
}
-
- /**
- * Returns the output gate descriptor with the given index
- *
- * @param index
- * the index if the output gate descriptor to return
- * @return the output gate descriptor with the given index
- */
- public GateDeploymentDescriptor getOutputGateDescriptor(final int index) {
-
- return this.outputGates.get(index);
+
+ public List<GateDeploymentDescriptor> getInputGates() {
+ return inputGates;
}
-
- /**
- * Returns the number of output gate deployment descriptors contained in this task deployment descriptor.
- *
- * @return the number of output gate deployment descriptors
- */
- public int getNumberOfInputGateDescriptors() {
-
- return this.inputGates.size();
+
+ // --------------------------------------------------------------------------------------------
+ // Serialization
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public void write(final DataOutputView out) throws IOException {
+ jobID.write(out);
+ vertexID.write(out);
+ executionId.write(out);
+
+ StringValue.writeString(taskName, out);
+ StringValue.writeString(invokableClassName, out);
+
+ out.writeInt(indexInSubtaskGroup);
+ out.writeInt(currentNumberOfSubtasks);
+ out.writeInt(targetSlotNumber);
+
+ jobConfiguration.write(out);
+ taskConfiguration.write(out);
+
+ writeGateList(inputGates, out);
+ writeGateList(outputGates, out);
+
+ out.writeInt(requiredJarFiles.length);
+ for (int i = 0; i < requiredJarFiles.length; i++) {
+ StringValue.writeString(requiredJarFiles[i], out);
+ }
}
- /**
- * Returns the input gate descriptor with the given index
- *
- * @param index
- * the index if the input gate descriptor to return
- * @return the input gate descriptor with the given index
- */
- public GateDeploymentDescriptor getInputGateDescriptor(final int index) {
-
- return this.inputGates.get(index);
+ @Override
+ public void read(DataInputView in) throws IOException {
+ jobID.read(in);
+ vertexID.read(in);
+ executionId.read(in);
+
+ taskName = StringValue.readString(in);
+ invokableClassName = StringValue.readString(in);
+
+ indexInSubtaskGroup = in.readInt();
+ currentNumberOfSubtasks = in.readInt();
+ targetSlotNumber = in.readInt();
+
+ jobConfiguration.read(in);
+ taskConfiguration.read(in);
+
+ inputGates = readGateList(in);
+ outputGates = readGateList(in);
+
+ String[] jarFiles = new String[in.readInt()];
+ for (int i = 0; i < jarFiles.length; i++) {
+ jarFiles[i] = StringValue.readString(in);
+ }
+ }
+
+ private static final void writeGateList(List<GateDeploymentDescriptor> list, DataOutputView out) throws IOException {
+ out.writeInt(list.size());
+ for (GateDeploymentDescriptor gdd : list) {
+ gdd.write(out);
+ }
+ }
+
+ private static final List<GateDeploymentDescriptor> readGateList(DataInputView in) throws IOException {
+ final int len = in.readInt();
+ ArrayList<GateDeploymentDescriptor> list = new ArrayList<GateDeploymentDescriptor>(len);
+
+ for (int i = 0; i < len; i++) {
+ GateDeploymentDescriptor gdd = new GateDeploymentDescriptor();
+ gdd.read(in);
+ list.add(gdd);
+ }
+
+ return list;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/event/job/AbstractEvent.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/event/job/AbstractEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/event/job/AbstractEvent.java
index 61970da..d7e4712 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/event/job/AbstractEvent.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/event/job/AbstractEvent.java
@@ -16,7 +16,6 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.event.job;
import java.io.IOException;
@@ -30,29 +29,20 @@ import org.apache.flink.core.memory.DataOutputView;
/**
* An abstract event is transmitted from the job manager to the
- * job client in order to inform the user about the job progress.
- *
+ * job client in order to inform the user about the job progress
*/
public abstract class AbstractEvent implements IOReadableWritable {
- /**
- * Static variable that points to the current global sequence number
- */
+ /** Static variable that points to the current global sequence number */
private static final AtomicLong GLOBAL_SEQUENCE_NUMBER = new AtomicLong(0);
- /**
- * Auxiliary object which helps to convert a {@link Date} object to the given string representation.
- */
+ /** Auxiliary object which helps to convert a {@link Date} object to the given string representation. */
private static final SimpleDateFormat DATA_FORMATTER = new SimpleDateFormat("MM/dd/yyyy HH:mm:ss");
- /**
- * The timestamp of the event.
- */
+ /** The timestamp of the event. */
private long timestamp = -1;
- /**
- * The sequence number of the event.
- */
+ /** The sequence number of the event. */
private long sequenceNumber = -1;
/**
@@ -61,7 +51,7 @@ public abstract class AbstractEvent implements IOReadableWritable {
* @param timestamp
* the timestamp of the event.
*/
- public AbstractEvent(final long timestamp) {
+ public AbstractEvent(long timestamp) {
this.timestamp = timestamp;
this.sequenceNumber = GLOBAL_SEQUENCE_NUMBER.incrementAndGet();
}
@@ -75,23 +65,17 @@ public abstract class AbstractEvent implements IOReadableWritable {
* is required for the deserialization process and is not
* supposed to be called directly.
*/
- public AbstractEvent() {
- }
+ public AbstractEvent() {}
@Override
- public void read(final DataInputView in) throws IOException {
-
- // Read the timestamp
+ public void read(DataInputView in) throws IOException {
this.timestamp = in.readLong();
this.sequenceNumber = in.readLong();
}
-
@Override
- public void write(final DataOutputView out) throws IOException {
-
- // Write the timestamp
+ public void write(DataOutputView out) throws IOException {
out.writeLong(this.timestamp);
out.writeLong(this.sequenceNumber);
}
@@ -113,18 +97,14 @@ public abstract class AbstractEvent implements IOReadableWritable {
* the timestamp in milliseconds since the beginning of "the epoch"
* @return the string unified representation of the timestamp
*/
- public static String timestampToString(final long timestamp) {
-
+ public static String timestampToString(long timestamp) {
return DATA_FORMATTER.format(new Date(timestamp));
-
}
@Override
public boolean equals(final Object obj) {
-
if (obj instanceof AbstractEvent) {
-
final AbstractEvent abstractEvent = (AbstractEvent) obj;
if (this.timestamp == abstractEvent.getTimestamp()) {
return true;
@@ -137,7 +117,6 @@ public abstract class AbstractEvent implements IOReadableWritable {
@Override
public int hashCode() {
-
- return (int) (this.timestamp % Integer.MAX_VALUE);
+ return (int) (this.timestamp ^ (this.timestamp >>> 32));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/event/job/ExecutionStateChangeEvent.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/event/job/ExecutionStateChangeEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/event/job/ExecutionStateChangeEvent.java
index bb55eac..15bae60 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/event/job/ExecutionStateChangeEvent.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/event/job/ExecutionStateChangeEvent.java
@@ -23,13 +23,11 @@ import java.io.IOException;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.execution.ExecutionState2;
import org.apache.flink.runtime.managementgraph.ManagementVertexID;
-import org.apache.flink.runtime.util.EnumUtils;
/**
- * An {@link ExecutionStateChangeEvent} can be used to notify other objects about an execution state change of a vertex.
- *
+ * An {@link ExecutionStateChangeEvent} can be used to notify other objects about an execution state change of a vertex.
*/
public final class ExecutionStateChangeEvent extends AbstractEvent implements ManagementEvent {
@@ -41,7 +39,7 @@ public final class ExecutionStateChangeEvent extends AbstractEvent implements Ma
/**
* The new execution state of the vertex this event refers to.
*/
- private ExecutionState newExecutionState;
+ private ExecutionState2 newExecutionState;
/**
* Constructs a new vertex event object.
@@ -53,8 +51,7 @@ public final class ExecutionStateChangeEvent extends AbstractEvent implements Ma
* @param newExecutionState
* the new execution state of the vertex this event refers to
*/
- public ExecutionStateChangeEvent(final long timestamp, final ManagementVertexID managementVertexID,
- final ExecutionState newExecutionState) {
+ public ExecutionStateChangeEvent(long timestamp, ManagementVertexID managementVertexID, ExecutionState2 newExecutionState) {
super(timestamp);
this.managementVertexID = managementVertexID;
this.newExecutionState = newExecutionState;
@@ -69,7 +66,7 @@ public final class ExecutionStateChangeEvent extends AbstractEvent implements Ma
super();
this.managementVertexID = new ManagementVertexID();
- this.newExecutionState = ExecutionState.CREATED;
+ this.newExecutionState = ExecutionState2.CREATED;
}
/**
@@ -86,28 +83,24 @@ public final class ExecutionStateChangeEvent extends AbstractEvent implements Ma
*
* @return the new execution state of the vertex this event refers to
*/
- public ExecutionState getNewExecutionState() {
+ public ExecutionState2 getNewExecutionState() {
return this.newExecutionState;
}
@Override
- public void read(final DataInputView in) throws IOException {
-
+ public void read(DataInputView in) throws IOException {
super.read(in);
-
this.managementVertexID.read(in);
- this.newExecutionState = EnumUtils.readEnum(in, ExecutionState.class);
+ this.newExecutionState = ExecutionState2.values()[in.readInt()];
}
@Override
- public void write(final DataOutputView out) throws IOException {
-
+ public void write(DataOutputView out) throws IOException {
super.write(out);
-
this.managementVertexID.write(out);
- EnumUtils.writeEnum(out, this.newExecutionState);
+ out.writeInt(this.newExecutionState.ordinal());
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/event/job/JobEvent.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/event/job/JobEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/event/job/JobEvent.java
index 210fb54..2f32686 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/event/job/JobEvent.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/event/job/JobEvent.java
@@ -16,7 +16,6 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.event.job;
import java.io.IOException;
@@ -30,18 +29,13 @@ import org.apache.flink.runtime.util.EnumUtils;
/**
* A job event object is used by the job manager to inform a client about
* changes of the job's status.
- *
*/
public class JobEvent extends AbstractEvent {
- /**
- * The current status of the job.
- */
+ /** The current status of the job. */
private JobStatus currentJobStatus;
- /**
- * An optional message attached to the event, possibly <code>null</code>.
- */
+ /** An optional message attached to the event, possibly <code>null</code>. */
private String optionalMessage = null;
/**
@@ -68,8 +62,7 @@ public class JobEvent extends AbstractEvent {
*/
public JobEvent() {
super();
-
- this.currentJobStatus = JobStatus.SCHEDULED;
+ this.currentJobStatus = JobStatus.CREATED;
}
@@ -111,20 +104,17 @@ public class JobEvent extends AbstractEvent {
* @return the optional message, possibly <code>null</code>.
*/
public String getOptionalMessage() {
-
return this.optionalMessage;
}
public String toString() {
-
return timestampToString(getTimestamp()) + ":\tJob execution switched to status " + this.currentJobStatus;
}
@Override
public boolean equals(final Object obj) {
-
if (!super.equals(obj)) {
return false;
}
@@ -154,7 +144,6 @@ public class JobEvent extends AbstractEvent {
@Override
public int hashCode() {
-
return super.hashCode();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/event/job/ManagementEvent.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/event/job/ManagementEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/event/job/ManagementEvent.java
index 4b33f47..695ae5d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/event/job/ManagementEvent.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/event/job/ManagementEvent.java
@@ -16,14 +16,10 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.event.job;
/**
* This is a marker interface only. It marks events which
- * should only be accessible via the {@link ExtendedManagementProtocol}.
- *
+ * should only be accessible via the {@link org.apache.flink.runtime.protocols.ExtendedManagementProtocol}.
*/
-public interface ManagementEvent {
-
-}
+public interface ManagementEvent {}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/event/job/VertexAssignmentEvent.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/event/job/VertexAssignmentEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/event/job/VertexAssignmentEvent.java
deleted file mode 100644
index 0e048dd..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/event/job/VertexAssignmentEvent.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.event.job;
-
-import java.io.IOException;
-
-import org.apache.flink.core.io.StringRecord;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.runtime.managementgraph.ManagementVertexID;
-
-/**
- * A {@link VertexAssignmentEvent} can be used to notify other objects about changes in the assignment of vertices to
- * instances.
- *
- */
-public final class VertexAssignmentEvent extends AbstractEvent implements ManagementEvent {
-
- /**
- * The ID identifies the vertex this events refers to.
- */
- private ManagementVertexID managementVertexID;
-
- /**
- * The name of the instance the vertex is now assigned to.
- */
- private String instanceName;
-
- /**
- * Constructs a new event.
- *
- * @param timestamp
- * the timestamp of the event
- * @param managementVertexID
- * identifies the vertex this event refers to
- * @param instanceName
- * the name of the instance the vertex is now assigned to
- */
- public VertexAssignmentEvent(final long timestamp, final ManagementVertexID managementVertexID,
- final String instanceName) {
- super(timestamp);
-
- this.managementVertexID = managementVertexID;
- this.instanceName = instanceName;
- }
-
- /**
- * Constructor for serialization/deserialization. Should not be called on other occasions.
- */
- public VertexAssignmentEvent() {
- super();
-
- this.managementVertexID = new ManagementVertexID();
- }
-
- /**
- * Returns the ID of the vertex this event refers to.
- *
- * @return the ID of the vertex this event refers to
- */
- public ManagementVertexID getVertexID() {
- return this.managementVertexID;
- }
-
- /**
- * Returns the name of the instance the vertex is now assigned to.
- *
- * @return the name of the instance the vertex is now assigned to
- */
- public String getInstanceName() {
- return this.instanceName;
- }
-
- @Override
- public void read(final DataInputView in) throws IOException {
-
- super.read(in);
-
- this.managementVertexID.read(in);
- this.instanceName = StringRecord.readString(in);
- }
-
-
- @Override
- public void write(final DataOutputView out) throws IOException {
-
- super.write(out);
-
- this.managementVertexID.write(out);
- StringRecord.writeString(out, this.instanceName);
- }
-
-
- @Override
- public boolean equals(final Object obj) {
-
- if (!super.equals(obj)) {
- return false;
- }
-
- if (!(obj instanceof VertexAssignmentEvent)) {
- return false;
- }
-
- final VertexAssignmentEvent vae = (VertexAssignmentEvent) obj;
-
- if (!this.managementVertexID.equals(vae.getVertexID())) {
- return false;
- }
-
- if (this.instanceName == null) {
- if (vae.getInstanceName() != null) {
- return false;
- }
- } else {
- if (!this.instanceName.equals(vae.getInstanceName())) {
- return false;
- }
- }
-
- return true;
- }
-
-
- @Override
- public int hashCode() {
-
- if (this.managementVertexID != null) {
- return this.managementVertexID.hashCode();
- }
-
- return super.hashCode();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/event/job/VertexEvent.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/event/job/VertexEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/event/job/VertexEvent.java
index 69ab16d..fb3b247 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/event/job/VertexEvent.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/event/job/VertexEvent.java
@@ -16,7 +16,6 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.event.job;
import java.io.IOException;
@@ -24,46 +23,31 @@ import java.io.IOException;
import org.apache.flink.core.io.StringRecord;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.execution.ExecutionState2;
import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.util.EnumUtils;
/**
- * Vertex events are transmitted from the job manager
- * to the job client in order to inform the user about
+ * Vertex events are transmitted from the job manager to the job client in order to inform the user about
* changes in terms of a tasks execution state.
- *
*/
public class VertexEvent extends AbstractEvent {
- /**
- * The ID of the job vertex this event belongs to.
- */
+ /** The ID of the job vertex this event belongs to. */
private JobVertexID jobVertexID;
- /**
- * The name of the job vertex this event belongs to.
- */
+ /** The name of the job vertex this event belongs to. */
private String jobVertexName;
- /**
- * The number of subtasks the corresponding vertex has been split into at runtime.
- */
+ /** The number of subtasks the corresponding vertex has been split into at runtime. */
private int totalNumberOfSubtasks;
- /**
- * The index of the subtask that this event belongs to.
- */
+ /** The index of the subtask that this event belongs to. */
private int indexOfSubtask;
- /**
- * The current execution state of the subtask this event belongs to.
- */
- private ExecutionState currentExecutionState;
+ /** The current execution state of the subtask this event belongs to. */
+ private ExecutionState2 currentExecutionState;
- /**
- * An optional more detailed description of the event.
- */
+ /** An optional more detailed description of the event. */
private String description;
/**
@@ -84,10 +68,12 @@ public class VertexEvent extends AbstractEvent {
* @param description
* an optional description
*/
- public VertexEvent(final long timestamp, final JobVertexID jobVertexID, final String jobVertexName,
- final int totalNumberOfSubtasks, final int indexOfSubtask, final ExecutionState currentExecutionState,
- final String description) {
+ public VertexEvent(long timestamp, JobVertexID jobVertexID, String jobVertexName,
+ int totalNumberOfSubtasks, int indexOfSubtask, ExecutionState2 currentExecutionState,
+ String description)
+ {
super(timestamp);
+
this.jobVertexID = jobVertexID;
this.jobVertexName = jobVertexName;
this.totalNumberOfSubtasks = totalNumberOfSubtasks;
@@ -108,7 +94,7 @@ public class VertexEvent extends AbstractEvent {
this.jobVertexName = null;
this.totalNumberOfSubtasks = -1;
this.indexOfSubtask = -1;
- this.currentExecutionState = ExecutionState.CREATED;
+ this.currentExecutionState = ExecutionState2.CREATED;
this.description = null;
}
@@ -122,7 +108,7 @@ public class VertexEvent extends AbstractEvent {
this.jobVertexName = StringRecord.readString(in);
this.totalNumberOfSubtasks = in.readInt();
this.indexOfSubtask = in.readInt();
- this.currentExecutionState = EnumUtils.readEnum(in, ExecutionState.class);
+ this.currentExecutionState = ExecutionState2.values()[in.readInt()];
this.description = StringRecord.readString(in);
}
@@ -136,7 +122,7 @@ public class VertexEvent extends AbstractEvent {
StringRecord.writeString(out, this.jobVertexName);
out.writeInt(this.totalNumberOfSubtasks);
out.writeInt(this.indexOfSubtask);
- EnumUtils.writeEnum(out, this.currentExecutionState);
+ out.writeInt(this.currentExecutionState.ordinal());
StringRecord.writeString(out, this.description);
}
@@ -183,7 +169,7 @@ public class VertexEvent extends AbstractEvent {
*
* @return the current execution state of the subtask this event belongs to
*/
- public ExecutionState getCurrentExecutionState() {
+ public ExecutionState2 getCurrentExecutionState() {
return currentExecutionState;
}
@@ -261,7 +247,6 @@ public class VertexEvent extends AbstractEvent {
@Override
public int hashCode() {
-
return super.hashCode();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/AbstractEvent.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/AbstractEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/AbstractEvent.java
index e18b1eb..22c0620 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/AbstractEvent.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/AbstractEvent.java
@@ -16,17 +16,13 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.event.task;
import org.apache.flink.core.io.IOReadableWritable;
/**
* This type of event can be used to exchange notification messages between
- * different {@link TaskManager} objects at runtime using the communication
- * channels Nephele has established between different tasks.
- *
+ * different {@link org.apache.flink.runtime.taskmanager.TaskManager} objects
+ * at runtime using the communication channels.
*/
-public abstract class AbstractEvent implements IOReadableWritable {
-
-}
+public abstract class AbstractEvent implements IOReadableWritable {}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
index 51ba96c..06c9fb6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
@@ -82,20 +82,6 @@ public interface Environment {
int getIndexInSubtaskGroup();
/**
- * Sends a notification that objects that a new user thread has been started to the execution observer.
- *
- * @param userThread the user thread which has been started
- */
- void userThreadStarted(Thread userThread);
-
- /**
- * Sends a notification that a user thread has finished to the execution observer.
- *
- * @param userThread the user thread which has finished
- */
- void userThreadFinished(Thread userThread);
-
- /**
* Returns the input split provider assigned to this environment.
*
* @return the input split provider or <code>null</code> if no such provider has been assigned to this environment.
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionAttempt.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionAttempt.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionAttempt.java
new file mode 100644
index 0000000..9b39851
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionAttempt.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.execution;
+
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex2;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+/**
+ * An attempt to execute a task for a {@link ExecutionVertex2}.
+ */
+public class ExecutionAttempt implements java.io.Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+
+ private final JobVertexID vertexId;
+
+ private final int subtaskIndex;
+
+ private final ExecutionAttemptID executionId;
+
+ private final int attempt;
+
+ // --------------------------------------------------------------------------------------------
+
+ public ExecutionAttempt(JobVertexID vertexId, int subtaskIndex, ExecutionAttemptID executionId, int attempt) {
+ if (vertexId == null || executionId == null || subtaskIndex < 0 || attempt < 1) {
+ throw new IllegalArgumentException();
+ }
+
+ this.vertexId = vertexId;
+ this.subtaskIndex = subtaskIndex;
+ this.executionId = executionId;
+ this.attempt = attempt;
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ public JobVertexID getVertexId() {
+ return vertexId;
+ }
+
+ public int getSubtaskIndex() {
+ return subtaskIndex;
+ }
+
+ public ExecutionAttemptID getExecutionId() {
+ return executionId;
+ }
+
+ public int getAttempt() {
+ return attempt;
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public int hashCode() {
+ return vertexId.hashCode() +
+ executionId.hashCode() +
+ 31 * subtaskIndex +
+ 17 * attempt;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof ExecutionAttempt) {
+ ExecutionAttempt other = (ExecutionAttempt) obj;
+ return this.executionId.equals(other.executionId) &&
+ this.vertexId.equals(other.vertexId) &&
+ this.subtaskIndex == other.subtaskIndex &&
+ this.attempt == other.attempt;
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public String toString() {
+ return String.format("ExecutionAttempt (vertex=%s, subtask=%d, executionAttemptId=%s, attempt=%d)",
+ vertexId, subtaskIndex, executionId, attempt);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionListener.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionListener.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionListener.java
index ded630f..b08c847 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionListener.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionListener.java
@@ -18,55 +18,16 @@
package org.apache.flink.runtime.execution;
-import org.apache.flink.runtime.executiongraph.ExecutionVertexID;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
/**
- * This interface must be implemented by classes which should be able to receive notifications about
+ * Implementing this interface allows classes to receive notifications about
* changes of a task's execution state.
*/
public interface ExecutionListener {
- /**
- * Called when the execution state of the associated task has changed. It is important to point out that multiple
- * execution listeners can be invoked as a reaction to a state change, according to their priority. As a result, the
- * value of <code>newExecutionState</code> may be out-dated by the time a particular execution listener is called.
- * To determine the most recent state of the respective task, it is recommended to store a reference on the
- * execution that represents it and then call <code>getExecutionState()</code> on the vertex within this method.
- *
- * @param jobID
- * the ID of the job the task belongs to
- * @param vertexID
- * the ID of the task whose state has changed
- * @param newExecutionState
- * the execution state the task has just switched to
- * @param optionalMessage
- * an optional message providing further information on the state change
- */
- void executionStateChanged(JobID jobID, ExecutionVertexID vertexID, ExecutionState newExecutionState,
- String optionalMessage);
-
- /**
- * Called when the user task has started a new thread.
- *
- * @param jobID
- * the ID of the job the task belongs to
- * @param vertexID
- * the ID of the task that started of new thread
- * @param userThread
- * the user thread which has been started
- */
- void userThreadStarted(JobID jobID, ExecutionVertexID vertexID, Thread userThread);
-
- /**
- * Called when a thread spawn by a user task has finished.
- *
- * @param jobID
- * the ID of the job the task belongs to
- * @param vertexID
- * the ID of the task whose thread has finished
- * @param userThread
- * the user thread which has finished
- */
- void userThreadFinished(JobID jobID, ExecutionVertexID vertexID, Thread userThread);
+ void executionStateChanged(JobID jobID, JobVertexID vertexId, int subtask, ExecutionAttemptID executionId,
+ ExecutionState2 newExecutionState, String optionalMessage);
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionObserver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionObserver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionObserver.java
index c3b9b72..20a6180 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionObserver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionObserver.java
@@ -16,7 +16,6 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.execution;
public interface ExecutionObserver {
@@ -29,23 +28,7 @@ public interface ExecutionObserver {
* @param optionalMessage
* an optional message providing further information on the state change
*/
- void executionStateChanged(ExecutionState newExecutionState, String optionalMessage);
-
- /**
- * Called when the user task has started a new thread.
- *
- * @param userThread
- * the user thread which has been started
- */
- void userThreadStarted(Thread userThread);
-
- /**
- * Called when a thread spawn by a user task has finished.
- *
- * @param userThread
- * the user thread which has finished
- */
- void userThreadFinished(Thread userThread);
+ void executionStateChanged(ExecutionState2 newExecutionState, String optionalMessage);
/**
* Returns whether the task has been canceled.
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionState2.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionState2.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionState2.java
new file mode 100644
index 0000000..c2b2070
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionState2.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.execution;
+
+public enum ExecutionState2 {
+
+ CREATED,
+
+ SCHEDULED,
+
+ DEPLOYING,
+
+ RUNNING,
+
+ FINISHED,
+
+ CANCELING,
+
+ CANCELED,
+
+ FAILED
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionStateTransition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionStateTransition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionStateTransition.java
deleted file mode 100644
index 98557f9..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionStateTransition.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.execution;
-
-import static org.apache.flink.runtime.execution.ExecutionState.CANCELED;
-import static org.apache.flink.runtime.execution.ExecutionState.CANCELING;
-import static org.apache.flink.runtime.execution.ExecutionState.FAILED;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This class is a utility class to check the consistency of Nephele's execution state model.
- *
- */
-public final class ExecutionStateTransition {
-
- /**
- * The log object used for debugging.
- */
- private static final Logger LOG = LoggerFactory.getLogger(ExecutionStateTransition.class);
-
- /**
- * Private constructor to prevent instantiation of object.
- */
- private ExecutionStateTransition() {
- }
-
- /**
- * Checks the transition of the execution state and outputs an error in case of an unexpected state transition.
- *
- * @param jobManager
- * <code>true</code> to indicate the method is called by the job manager,
- * <code>false/<code> to indicate it is called by a task manager
- * @param taskName
- * the name of the task whose execution has changed
- * @param oldState
- * the old execution state
- * @param newState
- * the new execution state
- */
- public static void checkTransition(boolean jobManager, String taskName, ExecutionState oldState, ExecutionState newState) {
-
- LOG.info((jobManager ? "JM: " : "TM: ") + "ExecutionState set from " + oldState + " to " + newState + " for task " + taskName);
-
- boolean unexpectedStateChange = true;
-
- // This is the regular life cycle of a task
- if (oldState == ExecutionState.CREATED && newState == ExecutionState.SCHEDULED) {
- unexpectedStateChange = false;
- }
- else if (oldState == ExecutionState.SCHEDULED && newState == ExecutionState.ASSIGNED) {
- unexpectedStateChange = false;
- }
- else if (oldState == ExecutionState.ASSIGNED && newState == ExecutionState.READY) {
- unexpectedStateChange = false;
- }
- else if (oldState == ExecutionState.READY && newState == ExecutionState.STARTING) {
- unexpectedStateChange = false;
- }
- else if (oldState == ExecutionState.STARTING && newState == ExecutionState.RUNNING) {
- unexpectedStateChange = false;
- }
- else if (oldState == ExecutionState.RUNNING && newState == ExecutionState.FINISHING) {
- unexpectedStateChange = false;
- }
- else if (oldState == ExecutionState.FINISHING && newState == ExecutionState.FINISHED) {
- unexpectedStateChange = false;
- }
-
- // A vertex might skip the SCHEDULED state if its resource has been allocated in a previous stage.
- else if (oldState == ExecutionState.CREATED && newState == ExecutionState.ASSIGNED) {
- unexpectedStateChange = false;
- }
-
- // This transition can appear if a task in a stage which is not yet executed gets canceled.
- else if (oldState == ExecutionState.SCHEDULED && newState == ExecutionState.CANCELING) {
- unexpectedStateChange = false;
- }
-
- // This transition can appear if a task in a stage which is not yet executed gets canceled.
- else if (oldState == ExecutionState.ASSIGNED && newState == ExecutionState.CANCELING) {
- unexpectedStateChange = false;
- }
-
- // This transition can appear if a task is canceled that is not yet running on the task manager.
- else if (oldState == ExecutionState.READY && newState == ExecutionState.CANCELING) {
- unexpectedStateChange = false;
- }
-
- // -------------- error cases --------------
- else if (newState == FAILED || newState == CANCELED || newState == CANCELING) {
- // any state may fail or cancel itself
- unexpectedStateChange = false;
- }
-
- if (unexpectedStateChange) {
- LOG.error("Unexpected state change: " + oldState + " -> " + newState);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java
index 6bfaf2a..cb7b290 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java
@@ -16,16 +16,29 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.execution;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.runtime.deployment.GateDeploymentDescriptor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
-import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.Buffer;
import org.apache.flink.runtime.io.network.bufferprovider.BufferAvailabilityListener;
@@ -43,177 +56,139 @@ import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.runtime.memorymanager.MemoryManager;
import org.apache.flink.runtime.protocols.AccumulatorProtocol;
+import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.util.StringUtils;
-import java.io.IOException;
-import java.util.ArrayDeque;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
-import java.util.Set;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.FutureTask;
+import com.google.common.base.Preconditions;
+
-/**
- * The user code of every Nephele task runs inside a <code>RuntimeEnvironment</code> object. The environment provides
- * important services to the task. It keeps track of setting up the communication channels and provides access to input
- * splits, memory manager, etc.
- * <p/>
- * This class is thread-safe.
- */
public class RuntimeEnvironment implements Environment, BufferProvider, LocalBufferPoolOwner, Runnable {
- /**
- * The log object used for debugging.
- */
- private static final Logger LOG = LoggerFactory.getLogger(RuntimeEnvironment.class);
+ /** The log object used for debugging. */
+ private static final Log LOG = LogFactory.getLog(RuntimeEnvironment.class);
- /**
- * The interval to sleep in case a communication channel is not yet entirely set up (in milliseconds).
- */
+ /** The interval to sleep in case a communication channel is not yet entirely set up (in milliseconds). */
private static final int SLEEPINTERVAL = 100;
+
+ // --------------------------------------------------------------------------------------------
- /**
- * List of output gates created by the task.
- */
- private final List<OutputGate> outputGates = new CopyOnWriteArrayList<OutputGate>();
-
- /**
- * List of input gates created by the task.
- */
- private final List<InputGate<? extends IOReadableWritable>> inputGates = new CopyOnWriteArrayList<InputGate<? extends IOReadableWritable>>();
-
- /**
- * Queue of unbound input gate IDs which are required for deserializing an environment in the course of an RPC
- * call.
- */
- private final Queue<GateID> unboundInputGateIDs = new ArrayDeque<GateID>();
-
- /**
- * The memory manager of the current environment (currently the one associated with the executing TaskManager).
- */
- private final MemoryManager memoryManager;
-
- /**
- * The io manager of the current environment (currently the one associated with the executing TaskManager).
- */
- private final IOManager ioManager;
+ /** The task that owns this environment */
+ private final Task owner;
+
+
+ /** The job configuration encapsulated in the environment object. */
+ private final Configuration jobConfiguration;
- /**
- * Class of the task to run in this environment.
- */
+ /** The task configuration encapsulated in the environment object. */
+ private final Configuration taskConfiguration;
+
+
+ /** ClassLoader for all user code classes */
+ private final ClassLoader userCodeClassLoader;
+
+ /** Class of the task to run in this environment. */
private final Class<? extends AbstractInvokable> invokableClass;
- /**
- * Instance of the class to be run in this environment.
- */
+ /** Instance of the class to be run in this environment. */
private final AbstractInvokable invokable;
+
+
+ /** List of output gates created by the task. */
+ private final ArrayList<OutputGate> outputGates = new ArrayList<OutputGate>();
- /**
- * The ID of the job this task belongs to.
- */
- private final JobID jobID;
+ /** List of input gates created by the task. */
+ private final ArrayList<InputGate<? extends IOReadableWritable>> inputGates = new ArrayList<InputGate<? extends IOReadableWritable>>();
- /**
- * The job configuration encapsulated in the environment object.
- */
- private final Configuration jobConfiguration;
+ /** Unbound input gate IDs which are required for deserializing an environment in the course of an RPC call. */
+ private final Queue<GateID> unboundInputGateIDs = new ArrayDeque<GateID>();
- /**
- * The task configuration encapsulated in the environment object.
- */
- private final Configuration taskConfiguration;
+ /** The memory manager of the current environment (currently the one associated with the executing TaskManager). */
+ private final MemoryManager memoryManager;
- /**
- * The input split provider that can be queried for new input splits.
- */
+ /** The I/O manager of the current environment (currently the one associated with the executing TaskManager). */
+ private final IOManager ioManager;
+
+ /** The input split provider that can be queried for new input splits. */
private final InputSplitProvider inputSplitProvider;
- /**
- * The observer object for the task's execution.
- */
- private volatile ExecutionObserver executionObserver = null;
- /**
- * The thread executing the task in the environment.
- */
+ /** The thread executing the task in the environment. */
private volatile Thread executingThread;
/**
* The RPC proxy to report accumulators to JobManager
*/
- private AccumulatorProtocol accumulatorProtocolProxy = null;
-
- /**
- * The index of this subtask in the subtask group.
- */
- private final int indexInSubtaskGroup;
-
- /**
- * The current number of subtasks the respective task is split into.
- */
- private final int currentNumberOfSubtasks;
-
- /**
- * The name of the task running in this environment.
- */
- private final String taskName;
+ private final AccumulatorProtocol accumulatorProtocolProxy;
+ private final Map<String,FutureTask<Path>> cacheCopyTasks = new HashMap<String, FutureTask<Path>>();
+
private LocalBufferPool outputBufferPool;
-
- private final Map<String,FutureTask<Path>> cacheCopyTasks;
- private volatile boolean canceled;
+ private AtomicBoolean canceled = new AtomicBoolean();
+
+
+ public RuntimeEnvironment(Task owner, TaskDeploymentDescriptor tdd,
+ ClassLoader userCodeClassLoader,
+ MemoryManager memoryManager, IOManager ioManager,
+ InputSplitProvider inputSplitProvider,
+ AccumulatorProtocol accumulatorProtocolProxy)
+ throws Exception
+ {
+ Preconditions.checkNotNull(owner);
+ Preconditions.checkNotNull(memoryManager);
+ Preconditions.checkNotNull(ioManager);
+ Preconditions.checkNotNull(inputSplitProvider);
+ Preconditions.checkNotNull(accumulatorProtocolProxy);
+ Preconditions.checkNotNull(userCodeClassLoader);
+
+ this.owner = owner;
- /**
- * Constructs a runtime environment from a task deployment description.
- *
- * @param tdd
- * the task deployment description
- * @param memoryManager
- * the task manager's memory manager component
- * @param ioManager
- * the task manager's I/O manager component
- * @param inputSplitProvider
- * the input split provider for this environment
- * @throws Exception
- * thrown if an error occurs while instantiating the invokable class
- */
- public RuntimeEnvironment(final TaskDeploymentDescriptor tdd,
- final MemoryManager memoryManager, final IOManager ioManager,
- final InputSplitProvider inputSplitProvider,
- AccumulatorProtocol accumulatorProtocolProxy, Map<String, FutureTask<Path>> cpTasks) throws Exception {
-
- this.jobID = tdd.getJobID();
- this.taskName = tdd.getTaskName();
- this.invokableClass = tdd.getInvokableClass();
- this.jobConfiguration = tdd.getJobConfiguration();
- this.taskConfiguration = tdd.getTaskConfiguration();
- this.indexInSubtaskGroup = tdd.getIndexInSubtaskGroup();
- this.currentNumberOfSubtasks = tdd.getCurrentNumberOfSubtasks();
this.memoryManager = memoryManager;
this.ioManager = ioManager;
this.inputSplitProvider = inputSplitProvider;
this.accumulatorProtocolProxy = accumulatorProtocolProxy;
- this.cacheCopyTasks = cpTasks;
- this.invokable = this.invokableClass.newInstance();
+ // load and instantiate the invokable class
+ this.userCodeClassLoader = userCodeClassLoader;
+ try {
+ final String className = tdd.getInvokableClassName();
+ this.invokableClass = Class.forName(className, true, userCodeClassLoader).asSubclass(AbstractInvokable.class);
+ }
+ catch (Throwable t) {
+ throw new Exception("Could not load invokable class.", t);
+ }
+
+ try {
+ this.invokable = this.invokableClass.newInstance();
+ }
+ catch (Throwable t) {
+ throw new Exception("Could not instantiate the invokable class.", t);
+ }
+
+ this.jobConfiguration = tdd.getJobConfiguration();
+ this.taskConfiguration = tdd.getTaskConfiguration();
+
this.invokable.setEnvironment(this);
this.invokable.registerInputOutput();
- int numOutputGates = tdd.getNumberOfOutputGateDescriptors();
-
- for (int i = 0; i < numOutputGates; ++i) {
- this.outputGates.get(i).initializeChannels(tdd.getOutputGateDescriptor(i));
+ List<GateDeploymentDescriptor> inGates = tdd.getInputGates();
+ List<GateDeploymentDescriptor> outGates = tdd.getOutputGates();
+
+
+ if (this.inputGates.size() != inGates.size()) {
+ throw new Exception("The number of readers created in 'registerInputOutput()' "
+ + "is different than the number of connected incoming edges in the job graph.");
}
-
- int numInputGates = tdd.getNumberOfInputGateDescriptors();
-
- for (int i = 0; i < numInputGates; i++) {
- this.inputGates.get(i).initializeChannels(tdd.getInputGateDescriptor(i));
+ if (this.outputGates.size() != outGates.size()) {
+ throw new Exception("The number of writers created in 'registerInputOutput()' "
+ + "is different than the number of connected outgoing edges in the job graph.");
+ }
+
+ for (int i = 0; i < inGates.size(); i++) {
+ this.inputGates.get(i).initializeChannels(inGates.get(i));
+ }
+ for (int i = 0; i < outGates.size(); i++) {
+ this.outputGates.get(i).initializeChannels(outGates.get(i));
}
}
@@ -228,7 +203,7 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
@Override
public JobID getJobID() {
- return this.jobID;
+ return this.owner.getJobID();
}
@Override
@@ -246,30 +221,24 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
@Override
public void run() {
- if (invokable == null) {
- LOG.error("ExecutionEnvironment has no Invokable set");
- }
- // Now the actual program starts to run
- changeExecutionState(ExecutionState.RUNNING, null);
-
- // If the task has been canceled in the mean time, do not even start it
- if (this.executionObserver.isCanceled()) {
- changeExecutionState(ExecutionState.CANCELED, null);
+ // quick fail in case the task was cancelled while the tread was started
+ if (owner.isCanceled()) {
+ owner.cancelingDone();
return;
}
-
+
try {
- ClassLoader cl = LibraryCacheManager.getClassLoader(jobID);
- Thread.currentThread().setContextClassLoader(cl);
+ Thread.currentThread().setContextClassLoader(userCodeClassLoader);
this.invokable.invoke();
// Make sure, we enter the catch block when the task has been canceled
- if (this.executionObserver.isCanceled()) {
- throw new InterruptedException();
+ if (this.owner.isCanceled()) {
+ throw new CancelTaskException();
}
- } catch (Throwable t) {
- if (!this.executionObserver.isCanceled()) {
+ }
+ catch (Throwable t) {
+ if (!this.owner.isCanceled()) {
// Perform clean up when the task failed and has been not canceled by the user
try {
@@ -282,19 +251,16 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
// Release all resources that may currently be allocated by the individual channels
releaseAllChannelResources();
- if (this.executionObserver.isCanceled() || t instanceof CancelTaskException) {
- changeExecutionState(ExecutionState.CANCELED, null);
+ if (this.owner.isCanceled() || t instanceof CancelTaskException) {
+ this.owner.cancelingDone();
}
else {
- changeExecutionState(ExecutionState.FAILED, StringUtils.stringifyException(t));
+ this.owner.markFailed(t);
}
return;
}
- // Task finished running, but there may be unconsumed output data in some of the channels
- changeExecutionState(ExecutionState.FINISHING, null);
-
try {
// If there is any unclosed input gate, close it and propagate close operation to corresponding output gate
closeInputGates();
@@ -307,16 +273,16 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
// Now we wait until all output channels have written out their data and are closed
waitForOutputChannelsToBeClosed();
- } catch (Throwable t) {
-
+ }
+ catch (Throwable t) {
// Release all resources that may currently be allocated by the individual channels
releaseAllChannelResources();
- if (this.executionObserver.isCanceled() || t instanceof CancelTaskException) {
- changeExecutionState(ExecutionState.CANCELED, null);
+ if (this.owner.isCanceled() || t instanceof CancelTaskException) {
+ this.owner.cancelingDone();
}
else {
- changeExecutionState(ExecutionState.FAILED, StringUtils.stringifyException(t));
+ this.owner.markFailed(t);
}
return;
@@ -326,7 +292,9 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
releaseAllChannelResources();
// Finally, switch execution state to FINISHED and report to job manager
- changeExecutionState(ExecutionState.FINISHED, null);
+ if (!owner.markAsFinished()) {
+ owner.markFailed(new Exception());
+ }
}
@Override
@@ -403,12 +371,8 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
synchronized (this) {
if (this.executingThread == null) {
- if (this.taskName == null) {
- this.executingThread = new Thread(this);
- }
- else {
- this.executingThread = new Thread(this, getTaskNameWithIndex());
- }
+ String name = owner.getTaskNameWithSubtasks();
+ this.executingThread = new Thread(this, name);
}
return this.executingThread;
@@ -416,9 +380,11 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
}
public void cancelExecution() {
- canceled = true;
+ if (!canceled.compareAndSet(false, true)) {
+ return;
+ }
- LOG.info("Canceling " + getTaskNameWithIndex());
+ LOG.info("Canceling " + owner.getTaskNameWithSubtasks());
// Request user code to shut down
if (this.invokable != null) {
@@ -429,6 +395,8 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
}
}
+ final Thread executingThread = this.executingThread;
+
// interrupt the running thread and wait for it to die
executingThread.interrupt();
@@ -442,10 +410,10 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
// Continuously interrupt the user thread until it changed to state CANCELED
while (executingThread != null && executingThread.isAlive()) {
- LOG.warn("Task " + getTaskName() + " did not react to cancelling signal. Sending repeated interrupt.");
+ LOG.warn("Task " + owner.getTaskNameWithSubtasks() + " did not react to cancelling signal. Sending repeated interrupt.");
if (LOG.isDebugEnabled()) {
- StringBuilder bld = new StringBuilder("Task ").append(getTaskName()).append(" is stuck in method:\n");
+ StringBuilder bld = new StringBuilder("Task ").append(owner.getTaskNameWithSubtasks()).append(" is stuck in method:\n");
StackTraceElement[] stack = executingThread.getStackTrace();
for (StackTraceElement e : stack) {
@@ -465,12 +433,11 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
/**
* Blocks until all output channels are closed.
*
- * @throws IOException thrown if an error occurred while closing the output channels
* @throws InterruptedException thrown if the thread waiting for the channels to be closed is interrupted
*/
private void waitForOutputChannelsToBeClosed() throws InterruptedException {
// Make sure, we leave this method with an InterruptedException when the task has been canceled
- if (this.executionObserver.isCanceled()) {
+ if (this.owner.isCanceled()) {
return;
}
@@ -487,10 +454,10 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
*/
private void waitForInputChannelsToBeClosed() throws IOException, InterruptedException {
// Wait for disconnection of all output gates
- while (!canceled) {
+ while (!canceled.get()) {
// Make sure, we leave this method with an InterruptedException when the task has been canceled
- if (this.executionObserver.isCanceled()) {
+ if (this.owner.isCanceled()) {
throw new InterruptedException();
}
@@ -554,41 +521,17 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
@Override
public int getCurrentNumberOfSubtasks() {
- return this.currentNumberOfSubtasks;
+ return owner.getNumberOfSubtasks();
}
@Override
public int getIndexInSubtaskGroup() {
- return this.indexInSubtaskGroup;
- }
-
- private void changeExecutionState(final ExecutionState newExecutionState, final String optionalMessage) {
- if (this.executionObserver != null) {
- this.executionObserver.executionStateChanged(newExecutionState, optionalMessage);
- }
+ return owner.getSubtaskIndex();
}
@Override
public String getTaskName() {
- return this.taskName;
- }
-
- /**
- * Returns the name of the task with its index in the subtask group and the total number of subtasks.
- *
- * @return the name of the task with its index in the subtask group and the total number of subtasks
- */
- public String getTaskNameWithIndex() {
- return String.format("%s (%d/%d)", this.taskName, getIndexInSubtaskGroup() + 1, getCurrentNumberOfSubtasks());
- }
-
- /**
- * Sets the execution observer for this environment.
- *
- * @param executionObserver the execution observer for this environment
- */
- public void setExecutionObserver(final ExecutionObserver executionObserver) {
- this.executionObserver = executionObserver;
+ return owner.getTaskName();
}
@Override
@@ -596,20 +539,6 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
return this.inputSplitProvider;
}
- @Override
- public void userThreadStarted(final Thread userThread) {
- if (this.executionObserver != null) {
- this.executionObserver.userThreadStarted(userThread);
- }
- }
-
- @Override
- public void userThreadFinished(final Thread userThread) {
- if (this.executionObserver != null) {
- this.executionObserver.userThreadFinished(userThread);
- }
- }
-
/**
* Releases the allocated resources (particularly buffer) of input and output channels attached to this task. This
* method should only be called after the respected task has stopped running.
@@ -742,6 +671,10 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
return accumulatorProtocolProxy;
}
+ public void addCopyTasksForCacheFile(Map<String, FutureTask<Path>> copyTasks) {
+ this.cacheCopyTasks.putAll(copyTasks);
+ }
+
public void addCopyTaskForCacheFile(String name, FutureTask<Path> copyTask) {
this.cacheCopyTasks.put(name, copyTask);
}
@@ -809,7 +742,7 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
@Override
public void logBufferUtilization() {
LOG.info(String.format("\t%s: %d available, %d requested, %d designated",
- getTaskNameWithIndex(),
+ owner.getTaskNameWithSubtasks(),
this.outputBufferPool.numAvailableBuffers(),
this.outputBufferPool.numRequestedBuffers(),
this.outputBufferPool.numDesignatedBuffers()));
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/LibraryCacheManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/LibraryCacheManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/LibraryCacheManager.java
index 55bf3f0..1076ede 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/LibraryCacheManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/LibraryCacheManager.java
@@ -271,7 +271,7 @@ public final class LibraryCacheManager {
*
* @param id
* the ID of the job to be registered.
- * @param clientPaths
+ * @param requiredJarFiles
* the client path's of the required libraries
* @throws IOException
* thrown if the library cache manager could not be instantiated or one of the requested libraries is not in
@@ -290,7 +290,7 @@ public final class LibraryCacheManager {
*
* @param id
* the ID of the job to be registered.
- * @param clientPaths
+ * @param requiredJarFiles
* the client path's of the required libraries
* @throws IOException
* thrown if one of the requested libraries is not in the cache
@@ -438,8 +438,6 @@ public final class LibraryCacheManager {
* the ID of the job to return the class loader for
* @return the class loader of requested vertex or <code>null</code> if no class loader has been registered with the
* given ID.
- * @throws IOException
- * thrown if the library cache manager could not be instantiated
*/
private ClassLoader getClassLoaderInternal(final JobID id) {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/LibraryCacheUpdate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/LibraryCacheUpdate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/LibraryCacheUpdate.java
index 4310bfb..2cfbe58 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/LibraryCacheUpdate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/LibraryCacheUpdate.java
@@ -16,7 +16,6 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.execution.librarycache;
import java.io.IOException;
@@ -26,15 +25,12 @@ import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
/**
- * This class is used to encapsulate the transmission of a library file in a Nephele RPC call.
- *
+ * This class is used to encapsulate the transmission of a library file in a RPC call.
*/
public class LibraryCacheUpdate implements IOReadableWritable {
- /**
- * The name of the library file that is transmitted with this object.
- */
- private String libraryFileName = null;
+ /** The name of the library file that is transmitted with this object. */
+ private String libraryFileName;
/**
* Constructs a new library cache update object.
@@ -42,32 +38,32 @@ public class LibraryCacheUpdate implements IOReadableWritable {
* @param libraryFileName
* the name of the library that should be transported within this object.
*/
- public LibraryCacheUpdate(final String libraryFileName) {
+ public LibraryCacheUpdate(String libraryFileName) {
+ if (libraryFileName == null) {
+ throw new IllegalArgumentException("libraryFileName must not be null");
+ }
+
this.libraryFileName = libraryFileName;
}
/**
* Constructor used to reconstruct the object at the receiver of the RPC call.
*/
- public LibraryCacheUpdate() {
- }
+ public LibraryCacheUpdate() {}
@Override
- public void read(final DataInputView in) throws IOException {
-
+ public void read(DataInputView in) throws IOException {
LibraryCacheManager.readLibraryFromStream(in);
}
@Override
- public void write(final DataOutputView out) throws IOException {
-
+ public void write(DataOutputView out) throws IOException {
if (this.libraryFileName == null) {
throw new IOException("libraryFileName is null");
}
LibraryCacheManager.writeLibraryToStream(this.libraryFileName, out);
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AllVerticesIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AllVerticesIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AllVerticesIterator.java
new file mode 100644
index 0000000..84781cb
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AllVerticesIterator.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+class AllVerticesIterator implements Iterator<ExecutionVertex2> {
+
+ private final Iterator<ExecutionJobVertex> jobVertices;
+
+ private ExecutionVertex2[] currVertices;
+
+ private int currPos;
+
+
+ public AllVerticesIterator(Iterator<ExecutionJobVertex> jobVertices) {
+ this.jobVertices = jobVertices;
+ }
+
+
+ @Override
+ public boolean hasNext() {
+ while (true) {
+ if (currVertices != null) {
+ if (currPos < currVertices.length) {
+ return true;
+ } else {
+ currVertices = null;
+ }
+ }
+ else if (jobVertices.hasNext()) {
+ currVertices = jobVertices.next().getTaskVertices();
+ currPos = 0;
+ }
+ else {
+ return false;
+ }
+ }
+ }
+
+ @Override
+ public ExecutionVertex2 next() {
+ if (hasNext()) {
+ return currVertices[currPos++];
+ } else {
+ throw new NoSuchElementException();
+ }
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DistributionPatternProvider.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DistributionPatternProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DistributionPatternProvider.java
deleted file mode 100644
index 610b294..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DistributionPatternProvider.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.executiongraph;
-
-import org.apache.flink.runtime.jobgraph.DistributionPattern;
-
-public final class DistributionPatternProvider {
-
- /**
- * Checks if two subtasks of different tasks should be wired.
- *
- * @param pattern
- * the distribution pattern that should be used
- * @param nodeLowerStage
- * the index of the producing task's subtask
- * @param nodeUpperStage
- * the index of the consuming task's subtask
- * @param sizeSetLowerStage
- * the number of subtasks of the producing task
- * @param sizeSetUpperStage
- * the number of subtasks of the consuming task
- * @return <code>true</code> if a wire between the two considered subtasks should be created, <code>false</code>
- * otherwise
- */
- public static boolean createWire(final DistributionPattern pattern, final int nodeLowerStage,
- final int nodeUpperStage, final int sizeSetLowerStage, final int sizeSetUpperStage) {
-
- switch (pattern) {
- case BIPARTITE:
- return true;
-
- case POINTWISE:
- if (sizeSetLowerStage < sizeSetUpperStage) {
- if (nodeLowerStage == (nodeUpperStage % sizeSetLowerStage)) {
- return true;
- }
- } else {
- if ((nodeLowerStage % sizeSetUpperStage) == nodeUpperStage) {
- return true;
- }
- }
-
- return false;
-
- /*
- * case STAR:
- * if (sizeSetLowerStage > sizeSetUpperStage) {
- * int groupNumber = nodeLowerStage / Math.max(sizeSetLowerStage / sizeSetUpperStage, 1);
- * if (nodeUpperStage == groupNumber) {
- * return true;
- * }
- * } else {
- * int groupNumber = nodeUpperStage / Math.max(sizeSetUpperStage / sizeSetLowerStage, 1);
- * if (nodeLowerStage == groupNumber) {
- * return true;
- * }
- * }
- * return false;
- */
-
- default:
- // this will never happen.
- throw new IllegalStateException("No Match for Distribution Pattern found.");
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionAttemptID.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionAttemptID.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionAttemptID.java
new file mode 100644
index 0000000..18e57f0
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionAttemptID.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.runtime.AbstractID;
+
+/**
+ * Unique identifier for the attempt to execute a tasks. Multiple attempts happen
+ * in cases of failures and recovery.
+ */
+public class ExecutionAttemptID extends AbstractID {
+
+ private static final long serialVersionUID = -1169683445778281344L;
+}