You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/09/21 04:12:35 UTC
[11/63] [abbrv] Refactor job graph construction to incremental
attachment based
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex2.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex2.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex2.java
index 09a6f5b..d703b4e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex2.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex2.java
@@ -1,59 +1,259 @@
-/***********************************************************************************************************************
- *
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- *
- **********************************************************************************************************************/
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.flink.runtime.executiongraph;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+
+import org.apache.commons.logging.Log;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.deployment.GateDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+import org.apache.flink.runtime.execution.ExecutionState2;
+import org.apache.flink.runtime.instance.AllocatedSlot;
import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobEdge;
import org.apache.flink.runtime.jobgraph.JobID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.DefaultScheduler;
+import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
+import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotAllocationFuture;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotAllocationFutureAction;
+import org.apache.flink.runtime.taskmanager.TaskOperationResult;
+import org.apache.flink.util.StringUtils;
+
+import static org.apache.flink.runtime.execution.ExecutionState2.*;
+/**
+ *
+ * NOTE ABOUT THE DESIGN RATIONAL:
+ *
+ * In several points of the code, we need to deal with possible concurrent state changes and actions.
+ * For example, while the call to deploy a task (send it to the TaskManager) happens, the task gets cancelled.
+ *
+ * We could lock the entire portion of the code (decision to deploy, deploy, set state to running) such that
+ * it is guaranteed that any "cancel command" will only pick up after deployment is done and that the "cancel
+ * command" call will never overtake the deploying call.
+ *
+ * This blocks the threads big time, because the remote calls may take long. Depending of their locking behavior, it
+ * may even result in distributed deadlocks (unless carefully avoided). We therefore use atomic state updates and
+ * occasional double-checking to ensure that the state after a completed call is as expected, and trigger correcting
+ * actions if it is not. Many actions are also idempotent (like canceling).
+ */
public class ExecutionVertex2 {
+
+ private static final AtomicReferenceFieldUpdater<ExecutionVertex2, ExecutionState2> STATE_UPDATER =
+ AtomicReferenceFieldUpdater.newUpdater(ExecutionVertex2.class, ExecutionState2.class, "state");
+
+ private static final AtomicReferenceFieldUpdater<ExecutionVertex2, AllocatedSlot> ASSIGNED_SLOT_UPDATER =
+ AtomicReferenceFieldUpdater.newUpdater(ExecutionVertex2.class, AllocatedSlot.class, "assignedSlot");
- private final JobVertexID jobVertexId;
+ private static final Log LOG = ExecutionGraph.LOG;
-
-
- public ExecutionVertex2() {
- this(new JobVertexID());
- }
+ private static final int NUM_CANCEL_CALL_TRIES = 3;
+
+ // --------------------------------------------------------------------------------------------
- public ExecutionVertex2(JobVertexID jobVertexId) {
- this.jobVertexId = jobVertexId;
+ private final ExecutionJobVertex jobVertex;
+
+ private final IntermediateResultPartition[] resultPartitions;
+
+ private final ExecutionEdge2[][] inputEdges;
+
+ private final int subTaskIndex;
+
+
+ private volatile ExecutionState2 state = CREATED;
+
+ private volatile AllocatedSlot assignedSlot;
+
+ private volatile Throwable failureCause;
+
+
+ public ExecutionVertex2(ExecutionJobVertex jobVertex, int subTaskIndex, IntermediateResult[] producedDataSets) {
+ this.jobVertex = jobVertex;
+ this.subTaskIndex = subTaskIndex;
+
+ this.resultPartitions = new IntermediateResultPartition[producedDataSets.length];
+ for (int i = 0; i < producedDataSets.length; i++) {
+ IntermediateResultPartition irp = new IntermediateResultPartition(producedDataSets[i], this, subTaskIndex);
+ this.resultPartitions[i] = irp;
+ producedDataSets[i].setPartition(subTaskIndex, irp);
+ }
+
+ this.inputEdges = new ExecutionEdge2[jobVertex.getJobVertex().getInputs().size()][];
}
+ // --------------------------------------------------------------------------------------------
+ // Properties
+ // --------------------------------------------------------------------------------------------
public JobID getJobId() {
- return new JobID();
+ return this.jobVertex.getJobId();
}
-
public JobVertexID getJobvertexId() {
- return this.jobVertexId;
+ return this.jobVertex.getJobVertexId();
}
public String getTaskName() {
- return "task";
+ return this.jobVertex.getJobVertex().getName();
}
public int getTotalNumberOfParallelSubtasks() {
- return 1;
+ return this.jobVertex.getParallelism();
}
public int getParallelSubtaskIndex() {
- return 0;
+ return this.subTaskIndex;
+ }
+
+ public int getNumberOfInputs() {
+ return this.inputEdges.length;
+ }
+
+ public ExecutionEdge2[] getInputEdges(int input) {
+ if (input < 0 || input >= this.inputEdges.length) {
+ throw new IllegalArgumentException(String.format("Input %d is out of range [0..%d)", input, this.inputEdges.length));
+ }
+ return inputEdges[input];
+ }
+
+ public ExecutionState2 getExecutionState() {
+ return state;
+ }
+
+ public Throwable getFailureCause() {
+ return failureCause;
+ }
+
+ public AllocatedSlot getAssignedResource() {
+ return assignedSlot;
+ }
+
+ private ExecutionGraph getExecutionGraph() {
+ return this.jobVertex.getGraph();
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Graph building
+ // --------------------------------------------------------------------------------------------
+
+ public void connectSource(int inputNumber, IntermediateResult source, JobEdge edge, int consumerNumber) {
+
+ final DistributionPattern pattern = edge.getDistributionPattern();
+ final IntermediateResultPartition[] sourcePartitions = source.getPartitions();
+
+ ExecutionEdge2[] edges = null;
+
+ switch (pattern) {
+ case POINTWISE:
+ edges = connectPointwise(sourcePartitions, inputNumber);
+ break;
+
+ case BIPARTITE:
+ edges = connectAllToAll(sourcePartitions, inputNumber);
+ break;
+
+ default:
+ throw new RuntimeException("Unrecognized distribution pattern.");
+
+ }
+
+ this.inputEdges[inputNumber] = edges;
+
+ // add the cousumers to the source
+ for (ExecutionEdge2 ee : edges) {
+ ee.getSource().addConsumer(ee, consumerNumber);
+ }
+ }
+
+ private ExecutionEdge2[] connectAllToAll(IntermediateResultPartition[] sourcePartitions, int inputNumber) {
+ ExecutionEdge2[] edges = new ExecutionEdge2[sourcePartitions.length];
+
+ for (int i = 0; i < sourcePartitions.length; i++) {
+ IntermediateResultPartition irp = sourcePartitions[i];
+ edges[i] = new ExecutionEdge2(irp, this, inputNumber);
+ }
+
+ return edges;
+ }
+
+ private ExecutionEdge2[] connectPointwise(IntermediateResultPartition[] sourcePartitions, int inputNumber) {
+ final int numSources = sourcePartitions.length;
+ final int parallelism = getTotalNumberOfParallelSubtasks();
+
+ // simple case same number of sources as targets
+ if (numSources == parallelism) {
+ return new ExecutionEdge2[] { new ExecutionEdge2(sourcePartitions[subTaskIndex], this, inputNumber) };
+ }
+ else if (numSources < parallelism) {
+
+ int sourcePartition;
+
+ // check if the pattern is regular or irregular
+ // we use int arithmetics for regular, and floating point with rounding for irregular
+ if (parallelism % numSources == 0) {
+ // same number of targets per source
+ int factor = parallelism / numSources;
+ sourcePartition = subTaskIndex / factor;
+ }
+ else {
+ // different number of targets per source
+ float factor = ((float) parallelism) / numSources;
+ sourcePartition = (int) (subTaskIndex / factor);
+ }
+
+ return new ExecutionEdge2[] { new ExecutionEdge2(sourcePartitions[sourcePartition], this, inputNumber) };
+ }
+ else {
+ if (numSources % parallelism == 0) {
+ // same number of targets per source
+ int factor = numSources / parallelism;
+ int startIndex = subTaskIndex * factor;
+
+ ExecutionEdge2[] edges = new ExecutionEdge2[factor];
+ for (int i = 0; i < factor; i++) {
+ edges[i] = new ExecutionEdge2(sourcePartitions[startIndex + i], this, inputNumber);
+ }
+ return edges;
+ }
+ else {
+ float factor = ((float) numSources) / parallelism;
+
+ int start = (int) (subTaskIndex * factor);
+ int end = (subTaskIndex == getTotalNumberOfParallelSubtasks() - 1) ?
+ sourcePartitions.length :
+ (int) ((subTaskIndex + 1) * factor);
+
+ ExecutionEdge2[] edges = new ExecutionEdge2[end - start];
+ for (int i = 0; i < edges.length; i++) {
+ edges[i] = new ExecutionEdge2(sourcePartitions[start + i], this, inputNumber);
+ }
+
+ return edges;
+ }
+ }
}
@@ -61,15 +261,398 @@ public class ExecutionVertex2 {
// Scheduling
// --------------------------------------------------------------------------------------------
+ /**
+ * NOTE: This method only throws exceptions if it is in an illegal state to be scheduled, or if the tasks needs
+ * to be scheduled immediately and no resource is available. If the task is accepted by the schedule, any
+ * error sets the vertex state to failed and triggers the recovery logic.
+ *
+ * @param scheduler
+ *
+ * @throws IllegalStateException Thrown, if the vertex is not in CREATED state, which is the only state that permits scheduling.
+ * @throws NoResourceAvailableException Thrown is no queued scheduling is allowed and no resources are currently available.
+ */
+ public void scheduleForExecution(DefaultScheduler scheduler) throws NoResourceAvailableException {
+ if (scheduler == null) {
+ throw new NullPointerException();
+ }
+
+ if (STATE_UPDATER.compareAndSet(this, CREATED, SCHEDULED)) {
+
+ getExecutionGraph().notifyExecutionChange(getJobvertexId(), subTaskIndex, SCHEDULED, null);
+
+ ScheduledUnit toSchedule = new ScheduledUnit(this, jobVertex.getSlotSharingGroup());
+
+ // IMPORTANT: To prevent leaks of cluster resources, we need to make sure that slots are returned
+ // in all cases where the deployment failed. we use many try {} finally {} clauses to assure that
+
+ boolean queued = jobVertex.getGraph().isQueuedSchedulingAllowed();
+ if (queued) {
+ SlotAllocationFuture future = scheduler.scheduleQueued(toSchedule);
+
+ future.setFutureAction(new SlotAllocationFutureAction() {
+ @Override
+ public void slotAllocated(AllocatedSlot slot) {
+ try {
+ deployToSlot(slot);
+ }
+ catch (Throwable t) {
+ try {
+ slot.releaseSlot();
+ } finally {
+ fail(t);
+ }
+ }
+ }
+ });
+ }
+ else {
+ AllocatedSlot slot = scheduler.scheduleImmediately(toSchedule);
+ try {
+ deployToSlot(slot);
+ }
+ catch (Throwable t) {
+ try {
+ slot.releaseSlot();
+ } finally {
+ fail(t);
+ }
+ }
+ }
+ }
+ else if (this.state == CANCELED) {
+ // this can occur very rarely through heavy races. if the task was canceled, we do not
+ // schedule it
+ return;
+ }
+ else {
+ throw new IllegalStateException("The vertex must be in CREATED state to be scheduled.");
+ }
+ }
+
+
+ public void deployToSlot(final AllocatedSlot slot) throws JobException {
+ // sanity checks
+ if (slot == null) {
+ throw new NullPointerException();
+ }
+ if (!slot.isAlive()) {
+ throw new IllegalArgumentException("Cannot deploy to a slot that is not alive.");
+ }
+
+ // make sure exactly one deployment call happens from the correct state
+ // note: the transition from CREATED to DEPLOYING is for testing purposes only
+ ExecutionState2 previous = this.state;
+ if (previous == SCHEDULED || previous == CREATED) {
+ if (!STATE_UPDATER.compareAndSet(this, previous, DEPLOYING)) {
+ // race condition, someone else beat us to the deploying call.
+ // this should actually not happen and indicates a race somewhere else
+ throw new IllegalStateException("Cannot deploy task: Concurrent deployment call race.");
+ }
+
+ getExecutionGraph().notifyExecutionChange(getJobvertexId(), subTaskIndex, DEPLOYING, null);
+ }
+ else {
+ // vertex may have been cancelled, or it was already scheduled
+ throw new IllegalStateException("The vertex must be in CREATED or SCHEDULED state to be deployed. Found state " + previous);
+ }
+
+ // good, we are allowed to deploy
+ if (!slot.setExecutedVertex(this)) {
+ throw new JobException("Could not assign the ExecutionVertex to the slot " + slot);
+ }
+ setAssignedSlot(slot);
+
+
+ final TaskDeploymentDescriptor deployment = createDeploymentDescriptor();
+
+ // we execute the actual deploy call in a concurrent action to prevent this call from blocking for long
+ Runnable deployaction = new Runnable() {
+
+ @Override
+ public void run() {
+ try {
+ Instance instance = slot.getInstance();
+ instance.checkLibraryAvailability(getJobId());
+
+ TaskOperationResult result = instance.getTaskManagerProxy().submitTask(deployment);
+ if (result.isSuccess()) {
+ switchToRunning();
+ }
+ else {
+ // deployment failed :(
+ fail(new Exception("Failed to deploy the tast to slot " + slot + ": " + result.getDescription()));
+ }
+ }
+ catch (Throwable t) {
+ // some error occurred. fail the task
+ fail(t);
+ }
+ }
+ };
+
+ execute(deployaction);
+ }
+
+ private void switchToRunning() {
+
+ // transition state
+ if (STATE_UPDATER.compareAndSet(ExecutionVertex2.this, DEPLOYING, RUNNING)) {
+
+ getExecutionGraph().notifyExecutionChange(getJobvertexId(), subTaskIndex, RUNNING, null);
+
+ this.jobVertex.vertexSwitchedToRunning(subTaskIndex);
+ }
+ else {
+ // something happened while the call was in progress.
+ // typically, that means canceling while deployment was in progress
+
+ ExecutionState2 currentState = ExecutionVertex2.this.state;
+
+ if (currentState == CANCELING) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(String.format("Concurrent canceling of task %s while deployment was in progress.", ExecutionVertex2.this.toString()));
+ }
+
+ sendCancelRpcCall();
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(String.format("Concurrent unexpected state transition of task %s while deployment was in progress.", ExecutionVertex2.this.toString()));
+ }
+
+ // undo the deployment
+ sendCancelRpcCall();
+
+ // record the failure
+ fail(new Exception("Asynchronous state error. Execution Vertex switched to " + currentState + " while deployment was in progress."));
+ }
+ }
+ }
+
+ public void cancel() {
+ // depending on the previous state, we go directly to cancelled (no cancel call necessary)
+ // -- or to canceling (cancel call needs to be sent to the task manager)
+
+ // because of several possibly previous states, we need to again loop until we make a
+ // successful atomic state transition
+ while (true) {
+
+ ExecutionState2 current = this.state;
+
+ if (current == CANCELING || current == CANCELED) {
+ // already taken care of, no need to cancel again
+ return;
+ }
+
+ // these two are the common cases where we need to send a cancel call
+ else if (current == RUNNING || current == DEPLOYING) {
+ // try to transition to canceling, if successful, send the cancel call
+ if (STATE_UPDATER.compareAndSet(this, current, CANCELING)) {
+
+ getExecutionGraph().notifyExecutionChange(getJobvertexId(), subTaskIndex, CANCELING, null);
+
+ sendCancelRpcCall();
+ return;
+ }
+ // else: fall through the loop
+ }
+
+ else if (current == FINISHED || current == FAILED) {
+ // nothing to do any more. finished failed before it could be cancelled.
+ // in any case, the task is removed from the TaskManager already
+ return;
+ }
+ else if (current == CREATED || current == SCHEDULED) {
+ // from here, we can directly switch to cancelled, because the no task has been deployed
+ if (STATE_UPDATER.compareAndSet(this, current, CANCELED)) {
+
+ getExecutionGraph().notifyExecutionChange(getJobvertexId(), subTaskIndex, CANCELED, null);
+
+ return;
+ }
+ // else: fall through the loop
+ }
+ else {
+ throw new IllegalStateException(current.name());
+ }
+ }
+ }
+
+ public void fail(Throwable t) {
+
+ // damn, we failed. This means only that we keep our books and notify our parent JobExecutionVertex
+ // the actual computation on the task manager is cleaned up by the taskmanager that noticed the failure
+
+ // we may need to loop multiple times (in the presence of concurrent calls) in order to
+ // atomically switch to failed
+ while (true) {
+ ExecutionState2 current = this.state;
+
+ if (current == FAILED) {
+ // concurrently set to failed. It is enough to remember once that we failed (its sad enough)
+ return;
+ }
+
+ if (current == CANCELED) {
+ // we already aborting
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(String.format("Ignoring transition of vertex %s to %s while being %s",
+ getSimpleName(), FAILED, current));
+ }
+ return;
+ }
+
+ // we should be in DEPLOYING or RUNNING when a regular failure happens
+ if (current != DEPLOYING && current != RUNNING && current != CANCELING) {
+ // this should not happen. still, what else to do but to comply and go to the FAILED state
+ // at least we should complain loudly to the log
+ LOG.error(String.format("Vertex %s unexpectedly went from state %s to %s with error: %s",
+ getSimpleName(), CREATED, FAILED, t.getMessage()), t);
+ }
+
+ if (STATE_UPDATER.compareAndSet(this, current, FAILED)) {
+ // success (in a manner of speaking)
+ this.failureCause = t;
+
+ getExecutionGraph().notifyExecutionChange(getJobvertexId(), subTaskIndex, FAILED, StringUtils.stringifyException(t));
+
+ // release the slot (concurrency safe)
+ setAssignedSlot(null);
+
+ this.jobVertex.vertexFailed(subTaskIndex);
+
+ // leave the loop
+ return;
+ }
+ }
+ }
+
+ private void sendCancelRpcCall() {
+ // first of all, copy a reference to the stack. any concurrent change to the
+ // field does not affect us now
+ final AllocatedSlot slot = this.assignedSlot;
+ if (slot == null) {
+ throw new IllegalStateException("Cannot cancel when task was not running or deployed.");
+ }
+
+ Runnable cancelAction = new Runnable() {
+
+ @Override
+ public void run() {
+ Throwable exception = null;
+
+ for (int triesLeft = NUM_CANCEL_CALL_TRIES; triesLeft > 0; --triesLeft) {
+
+ try {
+ // send the call. it may be that the task is not really there (asynchronous / overtaking messages)
+ // in which case it is fine (the deployer catches it)
+ TaskOperationResult result = slot.getInstance().getTaskManagerProxy().cancelTask(getJobvertexId(), subTaskIndex);
+
+ if (result.isSuccess()) {
+
+ // make sure that we release the slot
+ try {
+ // found and canceled
+ if (STATE_UPDATER.compareAndSet(ExecutionVertex2.this, CANCELING, CANCELED)) {
+ // we completed the call.
+ // release the slot resource and let the parent know we have cancelled
+ ExecutionVertex2.this.jobVertex.vertexCancelled(ExecutionVertex2.this.subTaskIndex);
+ }
+ else {
+ ExecutionState2 foundState = ExecutionVertex2.this.state;
+ // failing in the meantime may happen and is no problem
+ if (foundState != FAILED) {
+ // corner case? log at least
+ LOG.error(String.format("Asynchronous race: Found state %s after successful cancel call.", foundState));
+ }
+
+ }
+ } finally {
+ slot.releaseSlot();
+ }
+ }
+ else {
+ // the task was not found, which may be when the task concurrently finishes or fails, or
+ // when the cancel call overtakes the deployment call
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Cancel task call did not find task. Probably cause: Acceptable asynchronous race.");
+ }
+ }
+
+ // in any case, we need not call multiple times, so we quit
+ return;
+ }
+ catch (Throwable t) {
+ if (exception == null) {
+ exception = t;
+ }
+ LOG.error("Canceling vertex " + getSimpleName() + " failed (" + triesLeft + " tries left): " + t.getMessage() , t);
+ }
+ }
+
+ // dang, utterly unsuccessful - the target node must be down, in which case the tasks are lost anyways
+ fail(new Exception("Task could not be canceled.", exception));
+ }
+ };
+
+ execute(cancelAction);
+ }
+
public Iterable<Instance> getPreferredLocations() {
return null;
}
+ private void setAssignedSlot(AllocatedSlot slot) {
+
+ while (true) {
+ AllocatedSlot previous = this.assignedSlot;
+ if (ASSIGNED_SLOT_UPDATER.compareAndSet(this, previous, slot)) {
+ // successfully swapped
+ // release the predecessor, if it was not null. this call is idempotent, so it does not matter if it is
+ // called more than once
+ try {
+ if (previous != null) {
+ previous.releaseSlot();
+ }
+ } catch (Throwable t) {
+ LOG.debug("Error releasing slot " + slot, t);
+ }
+ return;
+ }
+ }
+ }
+
+
+ private TaskDeploymentDescriptor createDeploymentDescriptor() {
+ // create the input gate deployment descriptors
+ List<GateDeploymentDescriptor> inputGates = new ArrayList<GateDeploymentDescriptor>(inputEdges.length);
+ for (ExecutionEdge2[] channels : inputEdges) {
+ inputGates.add(GateDeploymentDescriptor.fromEdges(channels));
+ }
+
+ // create the output gate deployment descriptors
+ List<GateDeploymentDescriptor> outputGates = new ArrayList<GateDeploymentDescriptor>(resultPartitions.length);
+ for (IntermediateResultPartition partition : resultPartitions) {
+ for (List<ExecutionEdge2> channels : partition.getConsumers()) {
+ outputGates.add(GateDeploymentDescriptor.fromEdges(channels));
+ }
+ }
+
+ String[] jarFiles = getExecutionGraph().getUserCodeJarFiles();
+
+ return new TaskDeploymentDescriptor(getJobId(), getJobvertexId(), getTaskName(),
+ subTaskIndex, getTotalNumberOfParallelSubtasks(),
+ getExecutionGraph().getJobConfiguration(), jobVertex.getJobVertex().getConfiguration(),
+ jobVertex.getJobVertex().getInvokableClassName(), outputGates, inputGates, jarFiles);
+ }
// --------------------------------------------------------------------------------------------
// Utilities
// --------------------------------------------------------------------------------------------
+ public void execute(Runnable action) {
+ this.jobVertex.execute(action);
+ }
+
/**
* Creates a simple name representation in the style 'taskname (x/y)', where
* 'taskname' is the name as returned by {@link #getTaskName()}, 'x' is the parallel
@@ -81,4 +664,9 @@ public class ExecutionVertex2 {
public String getSimpleName() {
return getTaskName() + " (" + (getParallelSubtaskIndex()+1) + '/' + getTotalNumberOfParallelSubtasks() + ')';
}
+
+ @Override
+ public String toString() {
+ return getSimpleName() + " [" + state + ']';
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertexID.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertexID.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertexID.java
deleted file mode 100644
index 87e2120..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertexID.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.executiongraph;
-
-import org.apache.flink.runtime.AbstractID;
-import org.apache.flink.runtime.managementgraph.ManagementVertexID;
-
-/**
- * A class for statistically unique execution vertex IDs.
- *
- */
-public class ExecutionVertexID extends AbstractID {
-
- private static final long serialVersionUID = 1L;
-
- /**
- * Creates a new random execution vertex id.
- */
- public ExecutionVertexID() {
- super();
- }
-
- /**
- * Creates a new execution vertex id, equal to the given id.
- *
- * @param from The id to copy.
- */
- public ExecutionVertexID(AbstractID from) {
- super(from);
- }
-
- /**
- * Converts the execution vertex ID into a
- * management vertex ID. The new management vertex ID
- * will be equal to the execution vertex ID in the sense
- * that the <code>equals</code> method will return <code>
- * true</code> when both IDs are compared.
- *
- * @return the new management vertex ID
- */
- public ManagementVertexID toManagementVertexID() {
- return new ManagementVertexID(this);
- }
-
- /**
- * Converts the given management vertex ID into the corresponding execution vertex ID. The new execution vertex ID
- * will be equals to the management vertex ID in the sense that the <code>equals</code> method will return
- * <code>true</code> when both IDs are compared.
- *
- * @param vertexID
- * the management vertex ID to be converted
- * @return the resulting execution vertex ID
- */
- public static ExecutionVertexID fromManagementVertexID(ManagementVertexID vertexID) {
- return new ExecutionVertexID(vertexID);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/GraphConversionException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/GraphConversionException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/GraphConversionException.java
deleted file mode 100644
index 75e2e95..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/GraphConversionException.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.executiongraph;
-
-/**
- * A graph conversion exception is thrown if the creation of transformation
- * of an {@link ExecutionGraph} fails.
- *
- */
-public class GraphConversionException extends Exception {
-
- /**
- * Generated serial version UID.
- */
- private static final long serialVersionUID = -7623370680208569211L;
-
- /**
- * Creates a new exception with the given error message.
- *
- * @param msg
- * the error message to be transported through this exception
- */
- public GraphConversionException(String msg) {
- super(msg);
- }
-
- public GraphConversionException(String message, Throwable cause) {
- super(message, cause);
- }
-
- public GraphConversionException(Throwable cause) {
- super(cause);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java
new file mode 100644
index 0000000..540996f
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+
+public class IntermediateResult {
+
+ private final IntermediateDataSetID id;
+
+ private final ExecutionJobVertex producer;
+
+ private final IntermediateResultPartition[] partitions;
+
+ private final int numParallelProducers;
+
+ private int partitionsAssigned;
+
+ private int numConsumers;
+
+
+ public IntermediateResult(IntermediateDataSetID id, ExecutionJobVertex producer, int numParallelProducers) {
+ this.id = id;
+ this.producer = producer;
+ this.partitions = new IntermediateResultPartition[numParallelProducers];
+ this.numParallelProducers = numParallelProducers;
+
+ // we do not set the intermediate result partitions here, because we let them be initialized by
+ // the execution vertex that produces them
+ }
+
+ public void setPartition(int partitionNumber, IntermediateResultPartition partition) {
+ if (partition == null || partitionNumber < 0 || partitionNumber >= numParallelProducers) {
+ throw new IllegalArgumentException();
+ }
+
+ if (partitions[partitionNumber] != null) {
+ throw new IllegalStateException("Partition #" + partitionNumber + " has already been assigned.");
+ }
+
+ partitions[partitionNumber] = partition;
+ partitionsAssigned++;
+ }
+
+
+
+ public IntermediateDataSetID getId() {
+ return id;
+ }
+
+ public IntermediateResultPartition[] getPartitions() {
+ return partitions;
+ }
+
+ public int registerConsumer() {
+ final int index = numConsumers;
+ numConsumers++;
+
+ for (IntermediateResultPartition p : partitions) {
+ if (p.addConsumerGroup() != index) {
+ throw new RuntimeException("Inconsistent consumer mapping between intermediate result partitions.");
+ }
+ }
+ return index;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java
new file mode 100644
index 0000000..13bb930
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+public class IntermediateResultPartition {
+
+ private final IntermediateResult totalResut;
+
+ private final ExecutionVertex2 producer;
+
+ private final int partition;
+
+ private List<List<ExecutionEdge2>> consumers;
+
+
+ public IntermediateResultPartition(IntermediateResult totalResut, ExecutionVertex2 producer, int partition) {
+ this.totalResut = totalResut;
+ this.producer = producer;
+ this.partition = partition;
+ this.consumers = new ArrayList<List<ExecutionEdge2>>(0);
+ }
+
+ public ExecutionVertex2 getProducer() {
+ return producer;
+ }
+
+ public int getPartition() {
+ return partition;
+ }
+
+ public IntermediateResult getIntermediateResult() {
+ return totalResut;
+ }
+
+ public List<List<ExecutionEdge2>> getConsumers() {
+ return consumers;
+ }
+
+ int addConsumerGroup() {
+ int pos = consumers.size();
+ consumers.add(new ArrayList<ExecutionEdge2>());
+ return pos;
+ }
+
+ public void addConsumer(ExecutionEdge2 edge, int consumerNumber) {
+ consumers.get(consumerNumber).add(edge);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/InternalJobStatus.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/InternalJobStatus.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/InternalJobStatus.java
deleted file mode 100644
index f7ccda1..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/InternalJobStatus.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.executiongraph;
-
-import org.apache.flink.runtime.jobgraph.JobStatus;
-
-/**
- * This enumeration contains all states a job represented by an {@link ExecutionGraph} can have during its lifetime. It
- * contains all states from {@link JobStatus} but also internal states to keep track of shutdown processes.
- * <p>
- * This class is thread-safe.
- *
- */
-public enum InternalJobStatus {
-
- /**
- * All tasks of the job are in the execution state CREATED.
- */
- CREATED,
-
- /**
- * All tasks of the job have been accepted by the scheduler, resources have been requested
- */
- SCHEDULED,
-
- /**
- * At least one task of the job is running, none has definitely failed.
- */
- RUNNING,
-
- /**
- * At least one task of the job has definitely failed and cannot be recovered. The job is in the process of being
- * terminated.
- */
- FAILING,
-
- /**
- * At least one task of the job has definitively failed and cannot
- * be recovered anymore. As a result, the job has been terminated.
- */
- FAILED,
-
- /**
- * At least one task has been canceled as a result of a user request. The job is in the process of being canceled
- * completely.
- */
- CANCELING,
-
- /**
- * All tasks of the job are canceled as a result of a user request. The job has been terminated.
- */
- CANCELED,
-
- /**
- * All of the job's tasks have successfully finished.
- */
- FINISHED;
-
- /**
- * Converts an internal job status in a {@link JobStatus} state.
- *
- * @param status
- * the internal job status to converted.
- * @return the corresponding job status or <code>null</code> if no corresponding job status exists
- */
- @SuppressWarnings("incomplete-switch")
- public static JobStatus toJobStatus(InternalJobStatus status) {
-
- switch (status) {
-
- case CREATED:
- return JobStatus.CREATED;
- case SCHEDULED:
- return JobStatus.SCHEDULED;
- case RUNNING:
- return JobStatus.RUNNING;
- case FAILED:
- return JobStatus.FAILED;
- case CANCELED:
- return JobStatus.CANCELED;
- case FINISHED:
- return JobStatus.FINISHED;
- }
-
- return null;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/JobStatusListener.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/JobStatusListener.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/JobStatusListener.java
index aabed5c..512a381 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/JobStatusListener.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/JobStatusListener.java
@@ -16,25 +16,21 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.executiongraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+
/**
- * This interface allows objects to receive notifications
- * when the status of an observed job has changed.
- *
+ * This interface allows objects to receive notifications when the status of an observed job has changed.
*/
public interface JobStatusListener {
-/**
- * Called when the status of the job with the given {@li
+ /**
+ * Called when the status of the job changed.
*
- * @param executionGraph
- * the executionGraph representing the job the event belongs to
- * @param newJobStatus
- * the new job status
- * @param optionalMessage
- * an optional message (possibly <code>null</code>) that can be attached to the state change
+ * @param executionGraph The executionGraph representing the job.
+ * @param newJobStatus The new job status.
+ * @param optionalMessage An optional message (possibly <code>null</code>) that can be attached to the state change.
*/
- void jobStatusHasChanged(ExecutionGraph executionGraph, InternalJobStatus newJobStatus, String optionalMessage);
+ void jobStatusHasChanged(ExecutionGraph executionGraph, JobStatus newJobStatus, String optionalMessage);
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocatedSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocatedSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocatedSlot.java
index cb7e658..f3f489b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocatedSlot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocatedSlot.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.instance;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.apache.flink.runtime.executiongraph.ExecutionVertex2;
import org.apache.flink.runtime.jobgraph.JobID;
@@ -30,7 +31,10 @@ public class AllocatedSlot {
private static final AtomicIntegerFieldUpdater<AllocatedSlot> STATUS_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(AllocatedSlot.class, "status");
-
+
+ private static final AtomicReferenceFieldUpdater<AllocatedSlot, ExecutionVertex2> VERTEX_UPDATER =
+ AtomicReferenceFieldUpdater.newUpdater(AllocatedSlot.class, ExecutionVertex2.class, "executedVertex");
+
private static final int ALLOCATED_AND_ALIVE = 0; // tasks may be added and might be running
private static final int CANCELLED = 1; // no more tasks may run
private static final int RELEASED = 2; // has been given back to the instance
@@ -45,10 +49,13 @@ public class AllocatedSlot {
/** The number of the slot on which the task is deployed */
private final int slotNumber;
+ /** Vertex being executed in the slot. Volatile to force a memory barrier and allow for correct double-checking */
+ private volatile ExecutionVertex2 executedVertex;
+
+ /** The state of the vertex, only atomically updated */
private volatile int status = ALLOCATED_AND_ALIVE;
-
public AllocatedSlot(JobID jobID, Instance instance, int slotNumber) {
if (jobID == null || instance == null || slotNumber < 0) {
throw new IllegalArgumentException();
@@ -78,18 +85,34 @@ public class AllocatedSlot {
return slotNumber;
}
- // --------------------------------------------------------------------------------------------
-
- /**
- * @param vertex
- *
- * @return True, if the task was scheduled correctly, false if the slot was asynchronously deallocated
- * in the meantime.
- */
- public boolean runTask(ExecutionVertex2 vertex) {
+ public boolean setExecutedVertex(ExecutionVertex2 executedVertex) {
+ if (executedVertex == null) {
+ throw new NullPointerException();
+ }
+
+ // check that we can actually run in this slot
+ if (status != ALLOCATED_AND_ALIVE) {
+ return false;
+ }
+
+ // atomically assign the vertex
+ if (!VERTEX_UPDATER.compareAndSet(this, null, executedVertex)) {
+ return false;
+ }
+
+ // we need to do a double check that we were not cancelled in the meantime
+ if (status != ALLOCATED_AND_ALIVE) {
+ this.executedVertex = null;
+ return false;
+ }
+
return true;
}
+ public ExecutionVertex2 getExecutedVertex() {
+ return executedVertex;
+ }
+
// --------------------------------------------------------------------------------------------
// Status and life cycle
// --------------------------------------------------------------------------------------------
@@ -110,6 +133,9 @@ public class AllocatedSlot {
public void cancel() {
if (STATUS_UPDATER.compareAndSet(this, ALLOCATED_AND_ALIVE, CANCELLED)) {
// kill all tasks currently running in this slot
+ if (this.executedVertex != null) {
+ this.executedVertex.fail(new Exception("The slot in which the task was scheduled has been cancelled."));
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/instance/DefaultInstanceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/DefaultInstanceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/DefaultInstanceManager.java
deleted file mode 100644
index eca23c4..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/DefaultInstanceManager.java
+++ /dev/null
@@ -1,314 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.instance;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.Timer;
-import java.util.TimerTask;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.GlobalConfiguration;
-
-/**
- * A simple implementation of an {@link InstanceManager}.
- */
-public class DefaultInstanceManager implements InstanceManager {
-
- private static final Logger LOG = LoggerFactory.getLogger(DefaultInstanceManager.class);
-
- // ------------------------------------------------------------------------
- // Fields
- // ------------------------------------------------------------------------
-
- /** Global lock */
- private final Object lock = new Object();
-
- /** Set of hosts known to run a task manager that are thus able to execute tasks (by ID). */
- private final Map<InstanceID, Instance> registeredHostsById;
-
- /** Set of hosts known to run a task manager that are thus able to execute tasks (by connection). */
- private final Map<InstanceConnectionInfo, Instance> registeredHostsByConnection;
-
- /** Set of hosts that were present once and have died */
- private final Set<InstanceConnectionInfo> deadHosts;
-
- /** Listeners that want to be notified about availability and disappearance of instances */
- private final List<InstanceListener> instanceListeners = new ArrayList<InstanceListener>();
-
- /** Duration after which a task manager is considered dead if it did not send a heart-beat message. */
- private final long heartbeatTimeout;
-
- /** The total number of task slots that the system has */
- private int totalNumberOfAliveTaskSlots;
-
- /** Flag marking the system as shut down */
- private volatile boolean shutdown;
-
- // ------------------------------------------------------------------------
- // Constructor and set-up
- // ------------------------------------------------------------------------
-
- /**
- * Creates an instance manager, using the global configuration value for maximum interval between heartbeats
- * where a task manager is still considered alive.
- */
- public DefaultInstanceManager() {
- this(1000 * GlobalConfiguration.getLong(
- ConfigConstants.JOB_MANAGER_DEAD_TASKMANAGER_TIMEOUT_KEY,
- ConfigConstants.DEFAULT_JOB_MANAGER_DEAD_TASKMANAGER_TIMEOUT));
- }
-
- public DefaultInstanceManager(long heartbeatTimeout) {
- this(heartbeatTimeout, heartbeatTimeout);
- }
-
- public DefaultInstanceManager(long heartbeatTimeout, long cleanupInterval) {
- if (heartbeatTimeout <= 0 || cleanupInterval <= 0) {
- throw new IllegalArgumentException("Heartbeat timeout and cleanup interval must be positive.");
- }
-
- this.registeredHostsById = new HashMap<InstanceID, Instance>();
- this.registeredHostsByConnection = new HashMap<InstanceConnectionInfo, Instance>();
- this.deadHosts = new HashSet<InstanceConnectionInfo>();
- this.heartbeatTimeout = heartbeatTimeout;
-
- new Timer(true).schedule(cleanupStaleMachines, cleanupInterval, cleanupInterval);
- }
-
- @Override
- public void shutdown() {
- synchronized (this.lock) {
- if (this.shutdown) {
- return;
- }
- this.shutdown = true;
-
- this.cleanupStaleMachines.cancel();
-
- for (Instance i : this.registeredHostsById.values()) {
- i.markDead();
- }
-
- this.registeredHostsById.clear();
- this.registeredHostsByConnection.clear();
- this.deadHosts.clear();
- this.totalNumberOfAliveTaskSlots = 0;
- }
- }
-
- @Override
- public boolean reportHeartBeat(InstanceID instanceId) {
- if (instanceId == null) {
- throw new IllegalArgumentException("InstanceID may not be null.");
- }
-
- synchronized (this.lock) {
- if (this.shutdown) {
- throw new IllegalStateException("InstanceManager is shut down.");
- }
-
- Instance host = registeredHostsById.get(instanceId);
-
- if (host == null){
- if (LOG.isDebugEnabled()) {
- LOG.debug("Received hearbeat from unknown TaskManager with instance ID " + instanceId.toString() +
- " Possibly TaskManager was maked as dead (timed-out) earlier. " +
- "Reporting back that task manager is no longer known.");
- }
- return false;
- }
-
- host.reportHeartBeat();
- return true;
- }
- }
-
- @Override
- public InstanceID registerTaskManager(InstanceConnectionInfo instanceConnectionInfo, HardwareDescription resources, int numberOfSlots){
- synchronized(this.lock){
- if (this.shutdown) {
- throw new IllegalStateException("InstanceManager is shut down.");
- }
-
- Instance prior = registeredHostsByConnection.get(instanceConnectionInfo);
- if (prior != null) {
- LOG.error("Registration attempt from TaskManager with connection info " + instanceConnectionInfo +
- ". This connection is already registered under ID " + prior.getId());
- return null;
- }
-
- boolean wasDead = this.deadHosts.remove(instanceConnectionInfo);
- if (wasDead) {
- LOG.info("Registering TaskManager with connection info " + instanceConnectionInfo +
- " which was marked as dead earlier because of a heart-beat timeout.");
- }
-
- InstanceID id = null;
- do {
- id = new InstanceID();
- } while (registeredHostsById.containsKey(id));
-
-
- Instance host = new Instance(instanceConnectionInfo, id, resources, numberOfSlots);
-
- registeredHostsById.put(id, host);
- registeredHostsByConnection.put(instanceConnectionInfo, host);
-
- totalNumberOfAliveTaskSlots += numberOfSlots;
-
- if (LOG.isInfoEnabled()) {
- LOG.info(String.format("Registered TaskManager at %s as %s. Current number of registered hosts is %d.",
- instanceConnectionInfo, id, registeredHostsById.size()));
- }
-
- host.reportHeartBeat();
-
- // notify all listeners (for example the scheduler)
- notifyNewInstance(host);
-
- return id;
- }
- }
-
- @Override
- public int getNumberOfRegisteredTaskManagers() {
- return this.registeredHostsById.size();
- }
-
- @Override
- public int getTotalNumberOfSlots() {
- return this.totalNumberOfAliveTaskSlots;
- }
-
- @Override
- public Map<InstanceID, Instance> getAllRegisteredInstances() {
- return this.registeredHostsById;
- }
-
- // --------------------------------------------------------------------------------------------
-
- public void addInstanceListener(InstanceListener listener) {
- synchronized (this.instanceListeners) {
- this.instanceListeners.add(listener);
- }
- }
-
- public void removeInstanceListener(InstanceListener listener) {
- synchronized (this.instanceListeners) {
- this.instanceListeners.remove(listener);
- }
- }
-
- private void notifyNewInstance(Instance instance) {
- synchronized (this.instanceListeners) {
- for (InstanceListener listener : this.instanceListeners) {
- try {
- listener.newInstanceAvailable(instance);
- }
- catch (Throwable t) {
- LOG.error("Notification of new instance availability failed.", t);
- }
- }
- }
- }
-
- private void notifyDeadInstance(Instance instance) {
- synchronized (this.instanceListeners) {
- for (InstanceListener listener : this.instanceListeners) {
- try {
- listener.instanceDied(instance);
- }
- catch (Throwable t) {
- LOG.error("Notification of dead instance failed.", t);
- }
- }
- }
- }
-
- // --------------------------------------------------------------------------------------------
-
- private void checkForDeadInstances() {
- final long now = System.currentTimeMillis();
- final long timeout = DefaultInstanceManager.this.heartbeatTimeout;
-
- synchronized (DefaultInstanceManager.this.lock) {
- if (DefaultInstanceManager.this.shutdown) {
- return;
- }
-
- final Iterator<Map.Entry<InstanceID, Instance>> entries = registeredHostsById.entrySet().iterator();
-
- // check all hosts whether they did not send heart-beat messages.
- while (entries.hasNext()) {
-
- final Map.Entry<InstanceID, Instance> entry = entries.next();
- final Instance host = entry.getValue();
-
- if (!host.isStillAlive(now, timeout)) {
-
- // remove from the living
- entries.remove();
- registeredHostsByConnection.remove(host.getInstanceConnectionInfo());
-
- // add to the dead
- deadHosts.add(host.getInstanceConnectionInfo());
-
- host.markDead();
-
- totalNumberOfAliveTaskSlots -= host.getTotalNumberOfSlots();
-
- LOG.info(String.format("TaskManager %s at %s did not report a heartbeat for %d msecs - marking as dead. Current number of registered hosts is %d.",
- host.getId(), host.getInstanceConnectionInfo(), heartbeatTimeout, registeredHostsById.size()));
-
- // report to all listeners
- notifyDeadInstance(host);
- }
- }
- }
- }
- // --------------------------------------------------------------------------------------------
-
- /**
- * Periodic task that checks whether hosts have not sent their heart-beat
- * messages and purges the hosts in this case.
- */
- private final TimerTask cleanupStaleMachines = new TimerTask() {
- @Override
- public void run() {
- try {
- checkForDeadInstances();
- }
- catch (Throwable t) {
- LOG.error("Checking for dead instances failed.", t);
- }
- }
- };
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
index a168b2c..543ae86 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
@@ -27,6 +27,10 @@ import java.util.List;
import java.util.Queue;
import java.util.Set;
+import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
+import org.apache.flink.runtime.execution.librarycache.LibraryCacheProfileRequest;
+import org.apache.flink.runtime.execution.librarycache.LibraryCacheProfileResponse;
+import org.apache.flink.runtime.execution.librarycache.LibraryCacheUpdate;
import org.apache.flink.runtime.ipc.RPC;
import org.apache.flink.runtime.jobgraph.JobID;
import org.apache.flink.runtime.jobmanager.scheduler.SlotAvailablilityListener;
@@ -186,6 +190,28 @@ public class Instance {
}
}
+ public void checkLibraryAvailability(JobID jobID) throws IOException {
+ String[] requiredLibraries = LibraryCacheManager.getRequiredJarFiles(jobID);
+
+ if (requiredLibraries == null) {
+ throw new IOException("No entry of required libraries for job " + jobID);
+ }
+
+ LibraryCacheProfileRequest request = new LibraryCacheProfileRequest();
+ request.setRequiredLibraries(requiredLibraries);
+
+ // Send the request
+ LibraryCacheProfileResponse response = getTaskManagerProxy().getLibraryCacheProfile(request);
+
+ // Check response and transfer libraries if necessary
+ for (int k = 0; k < requiredLibraries.length; k++) {
+ if (!response.isCached(k)) {
+ LibraryCacheUpdate update = new LibraryCacheUpdate(requiredLibraries[k]);
+ getTaskManagerProxy().updateLibraryCache(update);
+ }
+ }
+ }
+
// --------------------------------------------------------------------------------------------
// Heartbeats
// --------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
index 09c384a..5e139f3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
@@ -18,22 +18,289 @@
package org.apache.flink.runtime.instance;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
import java.util.Map;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.GlobalConfiguration;
/**
* Simple manager that keeps track of which TaskManager are available and alive.
*/
-public interface InstanceManager {
+public class InstanceManager {
+
+ private static final Log LOG = LogFactory.getLog(InstanceManager.class);
+
+ // ------------------------------------------------------------------------
+ // Fields
+ // ------------------------------------------------------------------------
+
+ /** Global lock */
+ private final Object lock = new Object();
+
+ /** Set of hosts known to run a task manager that are thus able to execute tasks (by ID). */
+ private final Map<InstanceID, Instance> registeredHostsById;
+
+ /** Set of hosts known to run a task manager that are thus able to execute tasks (by connection). */
+ private final Map<InstanceConnectionInfo, Instance> registeredHostsByConnection;
+
+ /** Set of hosts that were present once and have died */
+ private final Set<InstanceConnectionInfo> deadHosts;
+
+ /** Listeners that want to be notified about availability and disappearance of instances */
+ private final List<InstanceListener> instanceListeners = new ArrayList<InstanceListener>();
- InstanceID registerTaskManager(InstanceConnectionInfo instanceConnectionInfo, HardwareDescription resources, int numberOfTaskSlots);
+ /** Duration after which a task manager is considered dead if it did not send a heart-beat message. */
+ private final long heartbeatTimeout;
- boolean reportHeartBeat(InstanceID instance);
+ /** The total number of task slots that the system has */
+ private int totalNumberOfAliveTaskSlots;
+
+ /** Flag marking the system as shut down */
+ private volatile boolean shutdown;
+
+ // ------------------------------------------------------------------------
+ // Constructor and set-up
+ // ------------------------------------------------------------------------
+
+ /**
+ * Creates an instance manager, using the global configuration value for maximum interval between heartbeats
+ * where a task manager is still considered alive.
+ */
+ public InstanceManager() {
+ this(1000 * GlobalConfiguration.getLong(
+ ConfigConstants.JOB_MANAGER_DEAD_TASKMANAGER_TIMEOUT_KEY,
+ ConfigConstants.DEFAULT_JOB_MANAGER_DEAD_TASKMANAGER_TIMEOUT));
+ }
+
+ public InstanceManager(long heartbeatTimeout) {
+ this(heartbeatTimeout, heartbeatTimeout);
+ }
+
+ public InstanceManager(long heartbeatTimeout, long cleanupInterval) {
+ if (heartbeatTimeout <= 0 || cleanupInterval <= 0) {
+ throw new IllegalArgumentException("Heartbeat timeout and cleanup interval must be positive.");
+ }
+
+ this.registeredHostsById = new HashMap<InstanceID, Instance>();
+ this.registeredHostsByConnection = new HashMap<InstanceConnectionInfo, Instance>();
+ this.deadHosts = new HashSet<InstanceConnectionInfo>();
+ this.heartbeatTimeout = heartbeatTimeout;
+
+ new Timer(true).schedule(cleanupStaleMachines, cleanupInterval, cleanupInterval);
+ }
+
+ public void shutdown() {
+ synchronized (this.lock) {
+ if (this.shutdown) {
+ return;
+ }
+ this.shutdown = true;
+
+ this.cleanupStaleMachines.cancel();
- void shutdown();
+ for (Instance i : this.registeredHostsById.values()) {
+ i.markDead();
+ }
+
+ this.registeredHostsById.clear();
+ this.registeredHostsByConnection.clear();
+ this.deadHosts.clear();
+ this.totalNumberOfAliveTaskSlots = 0;
+ }
+ }
- Map<InstanceID, Instance> getAllRegisteredInstances();
+ public boolean reportHeartBeat(InstanceID instanceId) {
+ if (instanceId == null) {
+ throw new IllegalArgumentException("InstanceID may not be null.");
+ }
+
+ synchronized (this.lock) {
+ if (this.shutdown) {
+ throw new IllegalStateException("InstanceManager is shut down.");
+ }
+
+ Instance host = registeredHostsById.get(instanceId);
- int getNumberOfRegisteredTaskManagers();
+ if (host == null){
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Received hearbeat from unknown TaskManager with instance ID " + instanceId.toString() +
+ " Possibly TaskManager was maked as dead (timed-out) earlier. " +
+ "Reporting back that task manager is no longer known.");
+ }
+ return false;
+ }
- int getTotalNumberOfSlots();
+ host.reportHeartBeat();
+ return true;
+ }
+ }
+
+ public InstanceID registerTaskManager(InstanceConnectionInfo instanceConnectionInfo, HardwareDescription resources, int numberOfSlots){
+ synchronized(this.lock){
+ if (this.shutdown) {
+ throw new IllegalStateException("InstanceManager is shut down.");
+ }
+
+ Instance prior = registeredHostsByConnection.get(instanceConnectionInfo);
+ if (prior != null) {
+ LOG.error("Registration attempt from TaskManager with connection info " + instanceConnectionInfo +
+ ". This connection is already registered under ID " + prior.getId());
+ return null;
+ }
+
+ boolean wasDead = this.deadHosts.remove(instanceConnectionInfo);
+ if (wasDead) {
+ LOG.info("Registering TaskManager with connection info " + instanceConnectionInfo +
+ " which was marked as dead earlier because of a heart-beat timeout.");
+ }
+
+ InstanceID id = null;
+ do {
+ id = new InstanceID();
+ } while (registeredHostsById.containsKey(id));
+
+
+ Instance host = new Instance(instanceConnectionInfo, id, resources, numberOfSlots);
+
+ registeredHostsById.put(id, host);
+ registeredHostsByConnection.put(instanceConnectionInfo, host);
+
+ totalNumberOfAliveTaskSlots += numberOfSlots;
+
+ if (LOG.isInfoEnabled()) {
+ LOG.info(String.format("Registered TaskManager at %s as %s. Current number of registered hosts is %d.",
+ instanceConnectionInfo, id, registeredHostsById.size()));
+ }
+
+ host.reportHeartBeat();
+
+ // notify all listeners (for example the scheduler)
+ notifyNewInstance(host);
+
+ return id;
+ }
+ }
+
+ public int getNumberOfRegisteredTaskManagers() {
+ return this.registeredHostsById.size();
+ }
+
+ public int getTotalNumberOfSlots() {
+ return this.totalNumberOfAliveTaskSlots;
+ }
+
+ public Map<InstanceID, Instance> getAllRegisteredInstances() {
+ synchronized (this.lock) {
+ return new HashMap<InstanceID, Instance>(this.registeredHostsById);
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ public void addInstanceListener(InstanceListener listener) {
+ synchronized (this.instanceListeners) {
+ this.instanceListeners.add(listener);
+ }
+ }
+
+ public void removeInstanceListener(InstanceListener listener) {
+ synchronized (this.instanceListeners) {
+ this.instanceListeners.remove(listener);
+ }
+ }
+
+ private void notifyNewInstance(Instance instance) {
+ synchronized (this.instanceListeners) {
+ for (InstanceListener listener : this.instanceListeners) {
+ try {
+ listener.newInstanceAvailable(instance);
+ }
+ catch (Throwable t) {
+ LOG.error("Notification of new instance availability failed.", t);
+ }
+ }
+ }
+ }
+
+ private void notifyDeadInstance(Instance instance) {
+ synchronized (this.instanceListeners) {
+ for (InstanceListener listener : this.instanceListeners) {
+ try {
+ listener.instanceDied(instance);
+ }
+ catch (Throwable t) {
+ LOG.error("Notification of dead instance failed.", t);
+ }
+ }
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ private void checkForDeadInstances() {
+ final long now = System.currentTimeMillis();
+ final long timeout = InstanceManager.this.heartbeatTimeout;
+
+ synchronized (InstanceManager.this.lock) {
+ if (InstanceManager.this.shutdown) {
+ return;
+ }
+
+ final Iterator<Map.Entry<InstanceID, Instance>> entries = registeredHostsById.entrySet().iterator();
+
+ // check all hosts whether they did not send heart-beat messages.
+ while (entries.hasNext()) {
+
+ final Map.Entry<InstanceID, Instance> entry = entries.next();
+ final Instance host = entry.getValue();
+
+ if (!host.isStillAlive(now, timeout)) {
+
+ // remove from the living
+ entries.remove();
+ registeredHostsByConnection.remove(host.getInstanceConnectionInfo());
+
+ // add to the dead
+ deadHosts.add(host.getInstanceConnectionInfo());
+
+ host.markDead();
+
+ totalNumberOfAliveTaskSlots -= host.getTotalNumberOfSlots();
+
+ LOG.info(String.format("TaskManager %s at %s did not report a heartbeat for %d msecs - marking as dead. Current number of registered hosts is %d.",
+ host.getId(), host.getInstanceConnectionInfo(), heartbeatTimeout, registeredHostsById.size()));
+
+ // report to all listeners
+ notifyDeadInstance(host);
+ }
+ }
+ }
+ }
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * Periodic task that checks whether hosts have not sent their heart-beat
+ * messages and purges the hosts in this case.
+ */
+ private final TimerTask cleanupStaleMachines = new TimerTask() {
+ @Override
+ public void run() {
+ try {
+ checkForDeadInstances();
+ }
+ catch (Throwable t) {
+ LOG.error("Checking for dead instances failed.", t);
+ }
+ }
+ };
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/instance/LocalInstanceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/LocalInstanceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/LocalInstanceManager.java
index f8f41ae..832b8cd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/LocalInstanceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/LocalInstanceManager.java
@@ -18,7 +18,6 @@
package org.apache.flink.runtime.instance;
-
import java.util.ArrayList;
import java.util.List;
@@ -29,7 +28,11 @@ import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.runtime.ExecutionMode;
import org.apache.flink.runtime.taskmanager.TaskManager;
-public class LocalInstanceManager extends DefaultInstanceManager {
+/**
+ * A variant of the {@link InstanceManager} that internally spawn task managers as instances, rather than waiting for external
+ * TaskManagers to register.
+ */
+public class LocalInstanceManager extends InstanceManager {
private final List<TaskManager> taskManagers = new ArrayList<TaskManager>();
@@ -55,7 +58,7 @@ public class LocalInstanceManager extends DefaultInstanceManager {
GlobalConfiguration.includeConfiguration(tm);
}
- taskManagers.add(new TaskManager(execMode));
+ taskManagers.add(TaskManager.createTaskManager(execMode));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/ChannelReaderInputView.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/ChannelReaderInputView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/ChannelReaderInputView.java
index cd2e9ca..772a4f0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/ChannelReaderInputView.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/ChannelReaderInputView.java
@@ -16,7 +16,6 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.io.disk.iomanager;
import java.io.EOFException;
@@ -28,9 +27,9 @@ import java.util.concurrent.LinkedBlockingQueue;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.memorymanager.AbstractPagedInputView;
-
/**
- * A {@link DataInputView} that is backed by a {@link BlockChannelReader}, making it effectively a data input
+ * A {@link org.apache.flink.core.memory.DataInputView} that is backed by a
+ * {@link BlockChannelReader}, making it effectively a data input
* stream. The view reads it data in blocks from the underlying channel. The view can only read data that
* has been written by a {@link ChannelWriterOutputView}, due to block formatting.
*/
@@ -245,8 +244,7 @@ public class ChannelReaderInputView extends AbstractPagedInputView {
@Override
- protected int getLimitForSegment(MemorySegment segment)
- {
+ protected int getLimitForSegment(MemorySegment segment) {
return segment.getInt(ChannelWriterOutputView.HEAD_BLOCK_LENGTH_OFFSET);
}
@@ -257,8 +255,7 @@ public class ChannelReaderInputView extends AbstractPagedInputView {
* @param seg The segment to use for the read request.
* @throws IOException Thrown, if the reader is in error.
*/
- protected void sendReadRequest(MemorySegment seg) throws IOException
- {
+ protected void sendReadRequest(MemorySegment seg) throws IOException {
if (this.numRequestsRemaining != 0) {
this.reader.readBlock(seg);
if (this.numRequestsRemaining != -1) {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/ChannelWriterOutputView.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/ChannelWriterOutputView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/ChannelWriterOutputView.java
index 06b49ae..c87c308 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/ChannelWriterOutputView.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/ChannelWriterOutputView.java
@@ -16,7 +16,6 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.io.disk.iomanager;
import java.io.IOException;
@@ -27,15 +26,14 @@ import java.util.concurrent.LinkedBlockingQueue;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.memorymanager.AbstractPagedOutputView;
-
/**
- * A {@link DataOutputView} that is backed by a {@link BlockChannelWriter}, making it effectively a data output
+ * A {@link org.apache.flink.core.memory.DataOutputView} that is backed by a
+ * {@link BlockChannelWriter}, making it effectively a data output
* stream. The view writes it data in blocks to the underlying channel, adding a minimal header to each block.
* The data can be re-read by a {@link ChannelReaderInputView}, if it uses the same block size.
- *
*/
-public final class ChannelWriterOutputView extends AbstractPagedOutputView
-{
+public final class ChannelWriterOutputView extends AbstractPagedOutputView {
+
/**
* The magic number that identifies blocks as blocks from a ChannelWriterOutputView.
*/
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/Deserializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/Deserializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/Deserializer.java
deleted file mode 100644
index c6a2ebe..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/Deserializer.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.io.disk.iomanager;
-
-import java.io.DataInput;
-import java.io.IOException;
-
-/**
- * <p>
- * Provides a facility for deserializing objects of type <T> from an {@link InputStream}.
- * </p>
- * <p>
- * Deserializers are stateful, but must not buffer the input since other producers may read from the input between calls
- * to {@link #deserialize(Object)}.
- * </p>
- *
- * @param <T>
- */
-public interface Deserializer<T> {
- /**
- * <p>
- * Prepare the deserializer for reading.
- * </p>
- */
- void open(DataInput in) throws IOException;
-
- /**
- * <p>
- * Deserialize the next object from the underlying input stream. If the object <code>t</code> is non-null then this
- * deserializer <i>may</i> set its internal state to the next object read from the input stream. Otherwise, if the
- * object <code>t</code> is null a new deserialized object will be created.
- * </p>
- *
- * @return the deserialized object
- */
- T deserialize(T t) throws IOException;
-
- /**
- * <p>
- * Clear up any resources.
- * </p>
- */
- void close() throws IOException;
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/HeaderlessChannelReaderInputView.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/HeaderlessChannelReaderInputView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/HeaderlessChannelReaderInputView.java
index bd3665f..0147f73 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/HeaderlessChannelReaderInputView.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/HeaderlessChannelReaderInputView.java
@@ -16,7 +16,6 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.io.disk.iomanager;
import java.io.EOFException;
@@ -25,9 +24,9 @@ import java.util.List;
import org.apache.flink.core.memory.MemorySegment;
-
/**
- * A {@link DataInputView} that is backed by a {@link BlockChannelReader}, making it effectively a data input
+ * A {@link org.apache.flink.core.memory.DataInputView} that is backed by a
+ * {@link BlockChannelReader}, making it effectively a data input
* stream. This view is similar to the {@link ChannelReaderInputView}, but does not expect
* a header for each block, giving a direct stream abstraction over sequence of written
* blocks. It therefore requires specification of the number of blocks and the number of
@@ -73,8 +72,7 @@ public class HeaderlessChannelReaderInputView extends ChannelReaderInputView
@Override
- protected MemorySegment nextSegment(MemorySegment current) throws IOException
- {
+ protected MemorySegment nextSegment(MemorySegment current) throws IOException {
// check for end-of-stream
if (this.numBlocksRemaining <= 0) {
this.reader.close();
@@ -94,8 +92,7 @@ public class HeaderlessChannelReaderInputView extends ChannelReaderInputView
@Override
- protected int getLimitForSegment(MemorySegment segment)
- {
+ protected int getLimitForSegment(MemorySegment segment) {
return this.numBlocksRemaining > 0 ? segment.size() : this.lastBlockBytes;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
index 8fced32..fc39db2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
@@ -31,13 +31,10 @@ import org.apache.flink.core.memory.MemorySegment;
/**
* The facade for the provided I/O manager services.
- *
*/
-public final class IOManager implements UncaughtExceptionHandler
-{
- /**
- * Logging.
- */
+public class IOManager implements UncaughtExceptionHandler {
+
+ /** Logging */
private static final Logger LOG = LoggerFactory.getLogger(IOManager.class);
/**
@@ -85,7 +82,7 @@ public final class IOManager implements UncaughtExceptionHandler
/**
* Constructs a new IOManager.
*
- * @param path The base directory path for files underlying channels.
+ * @param tempDir The base directory path for files underlying channels.
*/
public IOManager(String tempDir) {
this(new String[] {tempDir});
@@ -94,12 +91,10 @@ public final class IOManager implements UncaughtExceptionHandler
/**
* Constructs a new IOManager.
*
- * @param path
- * the basic directory path for files underlying anonymous
- * channels.
+ * @param paths
+ * the basic directory paths for files underlying anonymous channels.
*/
- public IOManager(String[] paths)
- {
+ public IOManager(String[] paths) {
this.paths = paths;
this.random = new Random();
this.nextPath = 0;
@@ -199,7 +194,7 @@ public final class IOManager implements UncaughtExceptionHandler
@Override
public void uncaughtException(Thread t, Throwable e)
{
- LOG.error("IO Thread '" + t.getName() + "' terminated due to an exception. Closing I/O Manager.", e);
+ LOG.fatal("IO Thread '" + t.getName() + "' terminated due to an exception. Closing I/O Manager.", e);
shutdown();
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ChannelManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ChannelManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ChannelManager.java
index 3aae114..5571ccb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ChannelManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ChannelManager.java
@@ -26,7 +26,7 @@ import org.apache.flink.runtime.AbstractID;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.execution.RuntimeEnvironment;
-import org.apache.flink.runtime.executiongraph.ExecutionVertexID;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.instance.InstanceConnectionInfo;
import org.apache.flink.runtime.io.network.bufferprovider.BufferProvider;
import org.apache.flink.runtime.io.network.bufferprovider.BufferProviderBroker;
@@ -124,7 +124,7 @@ public class ChannelManager implements EnvelopeDispatcher, BufferProviderBroker
// Check if we can safely run this task with the given buffers
ensureBufferAvailability(task);
- RuntimeEnvironment environment = task.getRuntimeEnvironment();
+ RuntimeEnvironment environment = task.getEnvironment();
// -------------------------------------------------------------------------------------------------------------
// Register output channels
@@ -132,8 +132,8 @@ public class ChannelManager implements EnvelopeDispatcher, BufferProviderBroker
environment.registerGlobalBufferPool(this.globalBufferPool);
- if (this.localBuffersPools.containsKey(task.getVertexID())) {
- throw new IllegalStateException("Vertex " + task.getVertexID() + " has a previous buffer pool owner");
+ if (this.localBuffersPools.containsKey(task.getExecutionId())) {
+ throw new IllegalStateException("Execution " + task.getExecutionId() + " has a previous buffer pool owner");
}
for (OutputGate gate : environment.outputGates()) {
@@ -155,7 +155,7 @@ public class ChannelManager implements EnvelopeDispatcher, BufferProviderBroker
}
}
- this.localBuffersPools.put(task.getVertexID(), environment);
+ this.localBuffersPools.put(task.getExecutionId(), environment);
// -------------------------------------------------------------------------------------------------------------
// Register input channels
@@ -187,10 +187,10 @@ public class ChannelManager implements EnvelopeDispatcher, BufferProviderBroker
/**
* Unregisters the given task from the channel manager.
*
- * @param vertexId the ID of the task to be unregistered
+ * @param executionId the ID of the task to be unregistered
* @param task the task to be unregistered
*/
- public void unregister(ExecutionVertexID vertexId, Task task) {
+ public void unregister(ExecutionAttemptID executionId, Task task) {
final Environment environment = task.getEnvironment();
// destroy and remove OUTPUT channels from registered channels and cache
@@ -222,7 +222,7 @@ public class ChannelManager implements EnvelopeDispatcher, BufferProviderBroker
}
// clear and remove OUTPUT side buffer pool
- LocalBufferPoolOwner bufferPool = this.localBuffersPools.remove(vertexId);
+ LocalBufferPoolOwner bufferPool = this.localBuffersPools.remove(executionId);
if (bufferPool != null) {
bufferPool.clearLocalBufferPool();
}
@@ -249,7 +249,7 @@ public class ChannelManager implements EnvelopeDispatcher, BufferProviderBroker
int numChannels = this.channels.size() + env.getNumberOfOutputChannels() + env.getNumberOfInputChannels();
// need at least one buffer per channel
- if (numBuffers / numChannels < 1) {
+ if (numChannels > 0 && numBuffers / numChannels < 1) {
String msg = String.format("%s has not enough buffers to safely execute %s (%d buffers missing)",
this.connectionInfo.hostname(), env.getTaskName(), numChannels - numBuffers);
@@ -582,10 +582,6 @@ public class ChannelManager implements EnvelopeDispatcher, BufferProviderBroker
/**
*
* Upon an exception, this method frees the envelope.
- *
- * @param envelope
- * @return
- * @throws IOException
*/
private final EnvelopeReceiverList getReceiverListForEnvelope(Envelope envelope, boolean reportException) throws IOException {
try {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/gates/InputGate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/gates/InputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/gates/InputGate.java
index 238d1e7..c167093 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/gates/InputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/gates/InputGate.java
@@ -16,10 +16,10 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.io.network.gates;
import java.io.IOException;
+import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicReference;
@@ -43,7 +43,7 @@ import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobID;
/**
- * In Nephele input gates are a specialization of general gates and connect input channels and record readers. As
+ * Input gates are a specialization of general gates and connect input channels and record readers. As
* channels, input gates are always parameterized to a specific type of record which they can transport. In contrast to
* output gates input gates can be associated with a {@link DistributionPattern} object which dictates the concrete
* wiring between two groups of vertices.
@@ -103,14 +103,12 @@ public class InputGate<T extends IOReadableWritable> extends Gate<T> implements
@SuppressWarnings("unchecked")
public void initializeChannels(GateDeploymentDescriptor inputGateDescriptor){
- channels = new InputChannel[inputGateDescriptor.getNumberOfChannelDescriptors()];
-
- setChannelType(inputGateDescriptor.getChannelType());
-
- final int nicdd = inputGateDescriptor.getNumberOfChannelDescriptors();
+ List<ChannelDeploymentDescriptor> channelDescr = inputGateDescriptor.getChannels();
+
+ channels = new InputChannel[channelDescr.size()];
- for(int i = 0; i < nicdd; i++){
- final ChannelDeploymentDescriptor cdd = inputGateDescriptor.getChannelDescriptor(i);
+ for(int i = 0; i < channelDescr.size(); i++){
+ ChannelDeploymentDescriptor cdd = channelDescr.get(i);
channels[i] = new InputChannel<T>(this, i, cdd.getInputChannelID(),
cdd.getOutputChannelID(), getChannelType());
}