You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/06/22 23:47:27 UTC
[06/22] Rework the Taskmanager to a slot based model and remove
legacy cloud code
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/scheduler/RecoveryLogic.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/scheduler/RecoveryLogic.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/scheduler/RecoveryLogic.java
deleted file mode 100644
index e369613..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/scheduler/RecoveryLogic.java
+++ /dev/null
@@ -1,248 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-package eu.stratosphere.nephele.jobmanager.scheduler;
-
-import java.io.IOException;
-import java.util.ArrayDeque;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Queue;
-import java.util.Set;
-
-import eu.stratosphere.nephele.taskmanager.AbstractTaskResult;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import eu.stratosphere.nephele.execution.ExecutionState;
-import eu.stratosphere.nephele.executiongraph.ExecutionEdge;
-import eu.stratosphere.nephele.executiongraph.ExecutionGate;
-import eu.stratosphere.nephele.executiongraph.ExecutionGraph;
-import eu.stratosphere.nephele.executiongraph.ExecutionVertex;
-import eu.stratosphere.nephele.executiongraph.ExecutionVertexID;
-import eu.stratosphere.nephele.instance.AbstractInstance;
-import eu.stratosphere.nephele.instance.DummyInstance;
-import eu.stratosphere.runtime.io.channels.ChannelID;
-import eu.stratosphere.nephele.taskmanager.TaskCancelResult;
-import eu.stratosphere.nephele.util.SerializableHashSet;
-import eu.stratosphere.util.StringUtils;
-
-public final class RecoveryLogic {
-
- /**
- * The logger to report information and problems.
- */
- private static final Log LOG = LogFactory.getLog(RecoveryLogic.class);
-
- /**
- * Private constructor so class cannot be instantiated.
- */
- private RecoveryLogic() {
- }
-
- public static boolean recover(final ExecutionVertex failedVertex,
- final Map<ExecutionVertexID, ExecutionVertex> verticesToBeRestarted,
- final Set<ExecutionVertex> assignedVertices) {
-
- // Perform initial sanity check
- if (failedVertex.getExecutionState() != ExecutionState.FAILED) {
- LOG.error("Vertex " + failedVertex + " is requested to be recovered, but is not failed");
- return false;
- }
-
- final ExecutionGraph eg = failedVertex.getExecutionGraph();
- synchronized (eg) {
-
- LOG.info("Starting recovery for failed vertex " + failedVertex);
-
- final Set<ExecutionVertex> verticesToBeCanceled = new HashSet<ExecutionVertex>();
-
- findVerticesToRestart(failedVertex, verticesToBeCanceled);
-
- // Restart all predecessors without checkpoint
- final Iterator<ExecutionVertex> cancelIterator = verticesToBeCanceled.iterator();
- while (cancelIterator.hasNext()) {
-
- final ExecutionVertex vertex = cancelIterator.next();
-
- if (vertex.compareAndUpdateExecutionState(ExecutionState.FINISHED, getStateToUpdate(vertex))) {
- LOG.info("Vertex " + vertex + " has already finished and will not be canceled");
- if (vertex.getExecutionState() == ExecutionState.ASSIGNED) {
- assignedVertices.add(vertex);
- }
- continue;
- }
-
- LOG.info(vertex + " is canceled by recovery logic");
- verticesToBeRestarted.put(vertex.getID(), vertex);
- final TaskCancelResult cancelResult = vertex.cancelTask();
-
- if (cancelResult.getReturnCode() != AbstractTaskResult.ReturnCode.SUCCESS
- && cancelResult.getReturnCode() != AbstractTaskResult.ReturnCode.TASK_NOT_FOUND) {
-
- verticesToBeRestarted.remove(vertex.getID());
- LOG.error("Unable to cancel vertex" + cancelResult.getDescription());
- return false;
- }
- }
-
- LOG.info("Starting cache invalidation");
-
- // Invalidate the lookup caches
- if (!invalidateReceiverLookupCaches(failedVertex, verticesToBeCanceled)) {
- return false;
- }
-
- LOG.info("Cache invalidation complete");
-
- // Restart failed vertex
- failedVertex.updateExecutionState(getStateToUpdate(failedVertex));
- if (failedVertex.getExecutionState() == ExecutionState.ASSIGNED) {
- assignedVertices.add(failedVertex);
- }
- }
-
- return true;
- }
-
- static boolean hasInstanceAssigned(final ExecutionVertex vertex) {
-
- return !(vertex.getAllocatedResource().getInstance() instanceof DummyInstance);
- }
-
- private static ExecutionState getStateToUpdate(final ExecutionVertex vertex) {
-
- if (hasInstanceAssigned(vertex)) {
- return ExecutionState.ASSIGNED;
- }
-
- return ExecutionState.CREATED;
- }
-
- private static void findVerticesToRestart(final ExecutionVertex failedVertex,
- final Set<ExecutionVertex> verticesToBeCanceled) {
-
- final Queue<ExecutionVertex> verticesToTest = new ArrayDeque<ExecutionVertex>();
- final Set<ExecutionVertex> visited = new HashSet<ExecutionVertex>();
- verticesToTest.add(failedVertex);
-
- while (!verticesToTest.isEmpty()) {
-
- final ExecutionVertex vertex = verticesToTest.poll();
-
- // Predecessors must be either checkpoints or need to be restarted, too
- for (int j = 0; j < vertex.getNumberOfPredecessors(); j++) {
- final ExecutionVertex predecessor = vertex.getPredecessor(j);
-
- if (hasInstanceAssigned(predecessor)) {
- verticesToBeCanceled.add(predecessor);
- }
-
- if (!visited.contains(predecessor)) {
- verticesToTest.add(predecessor);
- }
- }
- visited.add(vertex);
- }
- }
-
- private static final boolean invalidateReceiverLookupCaches(final ExecutionVertex failedVertex,
- final Set<ExecutionVertex> verticesToBeCanceled) {
-
- final Map<AbstractInstance, Set<ChannelID>> entriesToInvalidate = new HashMap<AbstractInstance, Set<ChannelID>>();
-
- collectCacheEntriesToInvalidate(failedVertex, entriesToInvalidate);
- for (final Iterator<ExecutionVertex> it = verticesToBeCanceled.iterator(); it.hasNext();) {
- collectCacheEntriesToInvalidate(it.next(), entriesToInvalidate);
- }
-
- final Iterator<Map.Entry<AbstractInstance, Set<ChannelID>>> it = entriesToInvalidate.entrySet().iterator();
-
- while (it.hasNext()) {
-
- final Map.Entry<AbstractInstance, Set<ChannelID>> entry = it.next();
- final AbstractInstance instance = entry.getKey();
-
- try {
- instance.invalidateLookupCacheEntries(entry.getValue());
- } catch (IOException ioe) {
- LOG.error(StringUtils.stringifyException(ioe));
- return false;
- }
- }
-
- return true;
- }
-
- private static void collectCacheEntriesToInvalidate(final ExecutionVertex vertex,
- final Map<AbstractInstance, Set<ChannelID>> entriesToInvalidate) {
-
- final int numberOfOutputGates = vertex.getNumberOfOutputGates();
- for (int i = 0; i < numberOfOutputGates; ++i) {
-
- final ExecutionGate outputGate = vertex.getOutputGate(i);
- for (int j = 0; j < outputGate.getNumberOfEdges(); ++j) {
-
- final ExecutionEdge outputChannel = outputGate.getEdge(j);
-
- final ExecutionVertex connectedVertex = outputChannel.getInputGate().getVertex();
- if (connectedVertex == null) {
- LOG.error("Connected vertex is null");
- continue;
- }
-
- final AbstractInstance instance = connectedVertex.getAllocatedResource().getInstance();
- if (instance instanceof DummyInstance) {
- continue;
- }
-
- Set<ChannelID> channelIDs = entriesToInvalidate.get(instance);
- if (channelIDs == null) {
- channelIDs = new SerializableHashSet<ChannelID>();
- entriesToInvalidate.put(instance, channelIDs);
- }
-
- channelIDs.add(outputChannel.getInputChannelID());
- }
- }
-
- for (int i = 0; i < vertex.getNumberOfInputGates(); ++i) {
-
- final ExecutionGate inputGate = vertex.getInputGate(i);
- for (int j = 0; j < inputGate.getNumberOfEdges(); ++j) {
-
- final ExecutionEdge inputChannel = inputGate.getEdge(j);
-
- final ExecutionVertex connectedVertex = inputChannel.getOutputGate().getVertex();
- if (connectedVertex == null) {
- LOG.error("Connected vertex is null");
- continue;
- }
-
- final AbstractInstance instance = connectedVertex.getAllocatedResource().getInstance();
- if (instance instanceof DummyInstance) {
- continue;
- }
-
- Set<ChannelID> channelIDs = entriesToInvalidate.get(instance);
- if (channelIDs == null) {
- channelIDs = new SerializableHashSet<ChannelID>();
- entriesToInvalidate.put(instance, channelIDs);
- }
-
- channelIDs.add(inputChannel.getOutputChannelID());
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/scheduler/local/LocalExecutionListener.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/scheduler/local/LocalExecutionListener.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/scheduler/local/LocalExecutionListener.java
deleted file mode 100644
index 9ae5635..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/scheduler/local/LocalExecutionListener.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.jobmanager.scheduler.local;
-
-import eu.stratosphere.nephele.executiongraph.ExecutionVertex;
-import eu.stratosphere.nephele.jobmanager.scheduler.AbstractExecutionListener;
-
-/**
- * This is a wrapper class for the {@link LocalScheduler} to receive
- * notifications about state changes of vertices belonging
- * to scheduled jobs.
- * <p>
- * This class is thread-safe.
- *
- */
-public class LocalExecutionListener extends AbstractExecutionListener {
-
- public LocalExecutionListener(final LocalScheduler scheduler, final ExecutionVertex executionVertex) {
- super(scheduler, executionVertex);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/scheduler/local/LocalScheduler.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/scheduler/local/LocalScheduler.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/scheduler/local/LocalScheduler.java
deleted file mode 100644
index b731965..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/scheduler/local/LocalScheduler.java
+++ /dev/null
@@ -1,213 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.jobmanager.scheduler.local;
-
-import java.util.ArrayDeque;
-import java.util.Deque;
-import java.util.Iterator;
-import java.util.Map;
-
-import eu.stratosphere.nephele.execution.ExecutionState;
-import eu.stratosphere.nephele.executiongraph.ExecutionGraph;
-import eu.stratosphere.nephele.executiongraph.ExecutionGraphIterator;
-import eu.stratosphere.nephele.executiongraph.ExecutionStage;
-import eu.stratosphere.nephele.executiongraph.ExecutionStageListener;
-import eu.stratosphere.nephele.executiongraph.ExecutionVertex;
-import eu.stratosphere.nephele.executiongraph.InternalJobStatus;
-import eu.stratosphere.nephele.executiongraph.JobStatusListener;
-import eu.stratosphere.nephele.instance.InstanceException;
-import eu.stratosphere.nephele.instance.InstanceManager;
-import eu.stratosphere.nephele.instance.InstanceRequestMap;
-import eu.stratosphere.nephele.instance.InstanceType;
-import eu.stratosphere.nephele.instance.InstanceTypeDescription;
-import eu.stratosphere.nephele.jobgraph.JobID;
-import eu.stratosphere.nephele.jobmanager.DeploymentManager;
-import eu.stratosphere.nephele.jobmanager.scheduler.AbstractScheduler;
-import eu.stratosphere.nephele.jobmanager.scheduler.SchedulingException;
-import eu.stratosphere.util.StringUtils;
-
-public class LocalScheduler extends AbstractScheduler implements JobStatusListener, ExecutionStageListener {
-
- /**
- * The job queue of the scheduler
- */
- private Deque<ExecutionGraph> jobQueue = new ArrayDeque<ExecutionGraph>();
-
- /**
- * Constructs a new local scheduler.
- *
- * @param deploymentManager
- * the deployment manager assigned to this scheduler
- * @param instanceManager
- * the instance manager to be used with this scheduler
- */
- public LocalScheduler(final DeploymentManager deploymentManager, final InstanceManager instanceManager) {
- super(deploymentManager, instanceManager);
- }
-
- void removeJobFromSchedule(final ExecutionGraph executionGraphToRemove) {
-
- boolean removedFromQueue = false;
-
- synchronized (this.jobQueue) {
-
- final Iterator<ExecutionGraph> it = this.jobQueue.iterator();
- while (it.hasNext()) {
-
- final ExecutionGraph executionGraph = it.next();
- // Field jobID of executionGraph is immutable, so no synchronization needed
- if (executionGraph.getJobID().equals(executionGraphToRemove.getJobID())) {
- removedFromQueue = true;
- it.remove();
- break;
- }
-
- }
- }
-
- if (!removedFromQueue) {
- LOG.error("Cannot find job " + executionGraphToRemove.getJobName() + " ("
- + executionGraphToRemove.getJobID() + ") to remove");
- }
-
- // TODO: Remove vertices from restart map
- }
-
-
- @Override
- public void schedulJob(final ExecutionGraph executionGraph) throws SchedulingException {
-
- // Get Map of all available Instance types
- final Map<InstanceType, InstanceTypeDescription> availableInstances = getInstanceManager()
- .getMapOfAvailableInstanceTypes();
-
- final Iterator<ExecutionStage> stageIt = executionGraph.iterator();
- while (stageIt.hasNext()) {
-
- final InstanceRequestMap instanceRequestMap = new InstanceRequestMap();
- final ExecutionStage stage = stageIt.next();
- stage.collectRequiredInstanceTypes(instanceRequestMap, ExecutionState.CREATED);
-
- // Iterator over required Instances
- final Iterator<Map.Entry<InstanceType, Integer>> it = instanceRequestMap.getMinimumIterator();
- while (it.hasNext()) {
-
- final Map.Entry<InstanceType, Integer> entry = it.next();
-
- final InstanceTypeDescription descr = availableInstances.get(entry.getKey());
- if (descr == null) {
- throw new SchedulingException("Unable to schedule job: No instance of type " + entry.getKey()
- + " available");
- }
-
- if (descr.getMaximumNumberOfAvailableInstances() != -1
- && descr.getMaximumNumberOfAvailableInstances() < entry.getValue().intValue()) {
- throw new SchedulingException("Unable to schedule job: " + entry.getValue().intValue()
- + " instances of type " + entry.getKey() + " required, but only "
- + descr.getMaximumNumberOfAvailableInstances() + " are available");
- }
- }
- }
-
- // Subscribe to job status notifications
- executionGraph.registerJobStatusListener(this);
-
- // Set state of each vertex for scheduled
- final ExecutionGraphIterator it2 = new ExecutionGraphIterator(executionGraph, true);
- while (it2.hasNext()) {
-
- final ExecutionVertex vertex = it2.next();
- vertex.registerExecutionListener(new LocalExecutionListener(this, vertex));
- }
-
- // Register the scheduler as an execution stage listener
- executionGraph.registerExecutionStageListener(this);
-
- // Add job to the job queue (important to add job to queue before requesting instances)
- synchronized (this.jobQueue) {
- this.jobQueue.add(executionGraph);
- }
-
- // Request resources for the first stage of the job
-
- final ExecutionStage executionStage = executionGraph.getCurrentExecutionStage();
- try {
- requestInstances(executionStage);
- } catch (InstanceException e) {
- final String exceptionMessage = StringUtils.stringifyException(e);
- LOG.error(exceptionMessage);
- this.jobQueue.remove(executionGraph);
- throw new SchedulingException(exceptionMessage);
- }
- }
-
-
- @Override
- public ExecutionGraph getExecutionGraphByID(final JobID jobID) {
-
- synchronized (this.jobQueue) {
-
- final Iterator<ExecutionGraph> it = this.jobQueue.iterator();
- while (it.hasNext()) {
-
- final ExecutionGraph executionGraph = it.next();
- if (executionGraph.getJobID().equals(jobID)) {
- return executionGraph;
- }
- }
- }
-
- return null;
- }
-
-
- @Override
- public void shutdown() {
-
- synchronized (this.jobQueue) {
- this.jobQueue.clear();
- }
-
- }
-
-
- @Override
- public void jobStatusHasChanged(final ExecutionGraph executionGraph, final InternalJobStatus newJobStatus,
- final String optionalMessage) {
-
- if (newJobStatus == InternalJobStatus.FAILED || newJobStatus == InternalJobStatus.FINISHED
- || newJobStatus == InternalJobStatus.CANCELED) {
- removeJobFromSchedule(executionGraph);
- }
- }
-
-
- @Override
- public void nextExecutionStageEntered(final JobID jobID, final ExecutionStage executionStage) {
-
- // Request new instances if necessary
- try {
- requestInstances(executionStage);
- } catch (InstanceException e) {
- // TODO: Handle this error correctly
- LOG.error(StringUtils.stringifyException(e));
- }
-
- // Deploy the assigned vertices
- deployAssignedInputVertices(executionStage.getExecutionGraph());
-
- // Initialize the replay of the previous stage's checkpoints
- replayCheckpointsFromPreviousStage(executionStage.getExecutionGraph());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/scheduler/queue/QueueExecutionListener.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/scheduler/queue/QueueExecutionListener.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/scheduler/queue/QueueExecutionListener.java
deleted file mode 100644
index 1d37edc..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/scheduler/queue/QueueExecutionListener.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.jobmanager.scheduler.queue;
-
-import eu.stratosphere.nephele.executiongraph.ExecutionVertex;
-import eu.stratosphere.nephele.jobmanager.scheduler.AbstractExecutionListener;
-
-/**
- * This is a wrapper class for the {@link QueueScheduler} to receive
- * notifications about state changes of vertices belonging
- * to scheduled jobs.
- * <p>
- * This class is thread-safe.
- *
- */
-public final class QueueExecutionListener extends AbstractExecutionListener {
-
- /**
- * Constructs a new queue execution listener.
- *
- * @param scheduler
- * the scheduler this listener is connected with
- * @param executionVertex
- * the execution vertex this listener is created for
- */
- public QueueExecutionListener(final QueueScheduler scheduler, final ExecutionVertex executionVertex) {
- super(scheduler, executionVertex);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/scheduler/queue/QueueScheduler.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/scheduler/queue/QueueScheduler.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/scheduler/queue/QueueScheduler.java
deleted file mode 100644
index cd76f04..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/scheduler/queue/QueueScheduler.java
+++ /dev/null
@@ -1,216 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.jobmanager.scheduler.queue;
-
-import java.util.ArrayDeque;
-import java.util.Deque;
-import java.util.Iterator;
-import java.util.Map;
-
-import eu.stratosphere.nephele.execution.ExecutionState;
-import eu.stratosphere.nephele.executiongraph.ExecutionGraph;
-import eu.stratosphere.nephele.executiongraph.ExecutionGraphIterator;
-import eu.stratosphere.nephele.executiongraph.ExecutionStage;
-import eu.stratosphere.nephele.executiongraph.ExecutionStageListener;
-import eu.stratosphere.nephele.executiongraph.ExecutionVertex;
-import eu.stratosphere.nephele.executiongraph.InternalJobStatus;
-import eu.stratosphere.nephele.executiongraph.JobStatusListener;
-import eu.stratosphere.nephele.instance.InstanceException;
-import eu.stratosphere.nephele.instance.InstanceManager;
-import eu.stratosphere.nephele.instance.InstanceRequestMap;
-import eu.stratosphere.nephele.instance.InstanceType;
-import eu.stratosphere.nephele.instance.InstanceTypeDescription;
-import eu.stratosphere.nephele.jobgraph.JobID;
-import eu.stratosphere.nephele.jobmanager.DeploymentManager;
-import eu.stratosphere.nephele.jobmanager.scheduler.AbstractScheduler;
-import eu.stratosphere.nephele.jobmanager.scheduler.SchedulingException;
-import eu.stratosphere.util.StringUtils;
-
-/**
- * The queue scheduler mains of queue of all submitted jobs and executes one job at a time.
- *
- */
-public class QueueScheduler extends AbstractScheduler implements JobStatusListener, ExecutionStageListener {
-
- /**
- * The job queue where all submitted jobs go to.
- */
- private Deque<ExecutionGraph> jobQueue = new ArrayDeque<ExecutionGraph>();
-
- /**
- * Constructs a new queue scheduler.
- *
- * @param deploymentManager
- * the deployment manager assigned to this scheduler
- * @param instanceManager
- * the instance manager to be used with this scheduler
- */
- public QueueScheduler(final DeploymentManager deploymentManager, final InstanceManager instanceManager) {
- super(deploymentManager, instanceManager);
- }
-
- /**
- * Removes the job represented by the given {@link ExecutionGraph} from the scheduler.
- *
- * @param executionGraphToRemove
- * the job to be removed
- */
- void removeJobFromSchedule(final ExecutionGraph executionGraphToRemove) {
-
- boolean removedFromQueue = false;
-
- synchronized (this.jobQueue) {
-
- final Iterator<ExecutionGraph> it = this.jobQueue.iterator();
- while (it.hasNext()) {
-
- final ExecutionGraph executionGraph = it.next();
- if (executionGraph.getJobID().equals(executionGraphToRemove.getJobID())) {
- removedFromQueue = true;
- it.remove();
- break;
- }
- }
- }
-
- if (!removedFromQueue) {
- LOG.error("Cannot find job " + executionGraphToRemove.getJobName() + " ("
- + executionGraphToRemove.getJobID() + ") to remove");
- }
- }
-
-
- @Override
- public void schedulJob(final ExecutionGraph executionGraph) throws SchedulingException {
-
- // Get Map of all available Instance types
- final Map<InstanceType, InstanceTypeDescription> availableInstances = getInstanceManager()
- .getMapOfAvailableInstanceTypes();
-
- final Iterator<ExecutionStage> stageIt = executionGraph.iterator();
- while (stageIt.hasNext()) {
-
- final InstanceRequestMap instanceRequestMap = new InstanceRequestMap();
- final ExecutionStage stage = stageIt.next();
- stage.collectRequiredInstanceTypes(instanceRequestMap, ExecutionState.CREATED);
-
- // Iterator over required Instances
- final Iterator<Map.Entry<InstanceType, Integer>> it = instanceRequestMap.getMinimumIterator();
- while (it.hasNext()) {
-
- final Map.Entry<InstanceType, Integer> entry = it.next();
-
- final InstanceTypeDescription descr = availableInstances.get(entry.getKey());
- if (descr == null) {
- throw new SchedulingException("Unable to schedule job: No instance of type " + entry.getKey()
- + " available");
- }
-
- if (descr.getMaximumNumberOfAvailableInstances() != -1
- && descr.getMaximumNumberOfAvailableInstances() < entry.getValue().intValue()) {
- throw new SchedulingException("Unable to schedule job: " + entry.getValue().intValue()
- + " instances of type " + entry.getKey() + " required, but only "
- + descr.getMaximumNumberOfAvailableInstances() + " are available");
- }
- }
- }
-
- // Subscribe to job status notifications
- executionGraph.registerJobStatusListener(this);
-
- // Register execution listener for each vertex
- final ExecutionGraphIterator it2 = new ExecutionGraphIterator(executionGraph, true);
- while (it2.hasNext()) {
-
- final ExecutionVertex vertex = it2.next();
- vertex.registerExecutionListener(new QueueExecutionListener(this, vertex));
- }
-
- // Register the scheduler as an execution stage listener
- executionGraph.registerExecutionStageListener(this);
-
- // Add job to the job queue (important to add job to queue before requesting instances)
- synchronized (this.jobQueue) {
- this.jobQueue.add(executionGraph);
- }
-
- // Request resources for the first stage of the job
-
- final ExecutionStage executionStage = executionGraph.getCurrentExecutionStage();
- try {
- requestInstances(executionStage);
- } catch (InstanceException e) {
- final String exceptionMessage = StringUtils.stringifyException(e);
- LOG.error(exceptionMessage);
- this.jobQueue.remove(executionGraph);
- throw new SchedulingException(exceptionMessage);
- }
- }
-
-
- @Override
- public ExecutionGraph getExecutionGraphByID(final JobID jobID) {
-
- synchronized (this.jobQueue) {
-
- final Iterator<ExecutionGraph> it = this.jobQueue.iterator();
- while (it.hasNext()) {
-
- final ExecutionGraph executionGraph = it.next();
- if (executionGraph.getJobID().equals(jobID)) {
- return executionGraph;
- }
- }
- }
-
- return null;
- }
-
-
- @Override
- public void shutdown() {
-
- synchronized (this.jobQueue) {
- this.jobQueue.clear();
- }
-
- }
-
-
- @Override
- public void jobStatusHasChanged(final ExecutionGraph executionGraph, final InternalJobStatus newJobStatus,
- final String optionalMessage) {
-
- if (newJobStatus == InternalJobStatus.FAILED || newJobStatus == InternalJobStatus.FINISHED
- || newJobStatus == InternalJobStatus.CANCELED) {
- removeJobFromSchedule(executionGraph);
- }
- }
-
-
- @Override
- public void nextExecutionStageEntered(final JobID jobID, final ExecutionStage executionStage) {
-
- // Request new instances if necessary
- try {
- requestInstances(executionStage);
- } catch (InstanceException e) {
- // TODO: Handle error correctly
- LOG.error(StringUtils.stringifyException(e));
- }
-
- // Deploy the assigned vertices
- deployAssignedInputVertices(executionStage.getExecutionGraph());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/splitassigner/InputSplitManager.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/splitassigner/InputSplitManager.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/splitassigner/InputSplitManager.java
index eea78d8..bbef991 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/splitassigner/InputSplitManager.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/splitassigner/InputSplitManager.java
@@ -37,7 +37,7 @@ import eu.stratosphere.util.StringUtils;
/**
* The input split manager is responsible for serving input splits to {@link AbstractInputTask} objects at runtime.
- * Before passed on to the {@link AbstractScheduler}, an {@link ExecutionGraph} is registered with the input split
+ * Before passed on to the {@link eu.stratosphere.nephele.jobmanager.scheduler.DefaultScheduler}, an {@link ExecutionGraph} is registered with the input split
* manager and all included input vertices of the graph register their generated input splits with the manager. Each
* type of input split can be assigned to a specific {@link InputSplitAssigner} which is loaded by the input split
* manager at runtime.
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/splitassigner/LocatableInputSplitAssigner.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/splitassigner/LocatableInputSplitAssigner.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/splitassigner/LocatableInputSplitAssigner.java
index 85df81a..3717fbf 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/splitassigner/LocatableInputSplitAssigner.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/splitassigner/LocatableInputSplitAssigner.java
@@ -16,6 +16,7 @@ package eu.stratosphere.nephele.jobmanager.splitassigner;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import eu.stratosphere.nephele.instance.Instance;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -23,7 +24,6 @@ import eu.stratosphere.core.io.InputSplit;
import eu.stratosphere.core.io.LocatableInputSplit;
import eu.stratosphere.nephele.executiongraph.ExecutionGroupVertex;
import eu.stratosphere.nephele.executiongraph.ExecutionVertex;
-import eu.stratosphere.nephele.instance.AbstractInstance;
import eu.stratosphere.nephele.template.AbstractInputTask;
import eu.stratosphere.nephele.template.AbstractInvokable;
@@ -115,7 +115,7 @@ public final class LocatableInputSplitAssigner implements InputSplitAssigner {
return null;
}
- final AbstractInstance instance = vertex.getAllocatedResource().getInstance();
+ final Instance instance = vertex.getAllocatedResource().getInstance();
if (instance == null) {
LOG.error("Instance is null, returning random split");
return null;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/splitassigner/LocatableInputSplitList.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/splitassigner/LocatableInputSplitList.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/splitassigner/LocatableInputSplitList.java
index c830a6f..7647fae 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/splitassigner/LocatableInputSplitList.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/splitassigner/LocatableInputSplitList.java
@@ -21,16 +21,16 @@ import java.util.PriorityQueue;
import java.util.Queue;
import java.util.Set;
+import eu.stratosphere.nephele.instance.Instance;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import eu.stratosphere.core.io.LocatableInputSplit;
-import eu.stratosphere.nephele.instance.AbstractInstance;
/**
* The locatable input split list stores the locatable input splits for an input vertex that are still expected to be
* consumed. Besides simply storing the splits, the locatable input split list also computes the distance all
- * {@link AbstractInstance} objects which request an input split and its nearest storage location with respect to the
+ * {@link eu.stratosphere.nephele.instance.Instance} objects which request an input split and its nearest storage location with respect to the
* underlying network topology. That way input splits are always given to consuming vertices in a way that data locality
* is preserved as well as possible.
* <p>
@@ -50,13 +50,13 @@ public final class LocatableInputSplitList {
private Set<LocatableInputSplit> masterSet = new HashSet<LocatableInputSplit>();
/**
- * The map caching the specific file input split lists for each {@link AbstractInstance}.
+ * The map caching the specific file input split lists for each {@link eu.stratosphere.nephele.instance.Instance}.
*/
- private Map<AbstractInstance, Queue<QueueElem>> instanceMap = new HashMap<AbstractInstance, Queue<QueueElem>>();
+ private Map<Instance, Queue<QueueElem>> instanceMap = new HashMap<Instance, Queue<QueueElem>>();
/**
* This is an auxiliary class to store the minimum distance between a file input split's storage locations and an
- * {@link AbstractInstance}.
+ * {@link eu.stratosphere.nephele.instance.Instance}.
*
*/
private final class QueueElem implements Comparable<QueueElem> {
@@ -120,7 +120,7 @@ public final class LocatableInputSplitList {
/**
* Returns the next locatable input split to be consumed by the given instance. The returned input split is selected
* in a
- * way that the distance between the split's storage location and the requesting {@link AbstractInstance} is as
+ * way that the distance between the split's storage location and the requesting {@link eu.stratosphere.nephele.instance.Instance} is as
* short as possible.
*
* @param instance
@@ -128,7 +128,7 @@ public final class LocatableInputSplitList {
* @return the next input split to be consumed by the given instance or <code>null</code> if all input splits have
* already been consumed.
*/
- synchronized LocatableInputSplit getNextInputSplit(final AbstractInstance instance) {
+ synchronized LocatableInputSplit getNextInputSplit(final Instance instance) {
final Queue<QueueElem> instanceSplitList = getInstanceSplitList(instance);
@@ -157,16 +157,16 @@ public final class LocatableInputSplitList {
}
/**
- * Returns a list of locatable input splits specifically ordered for the given {@link AbstractInstance}. When the
+ * Returns a list of locatable input splits specifically ordered for the given {@link eu.stratosphere.nephele.instance.Instance}. When the
* list is initially created, it contains all the unconsumed located input splits at that point in time, ascendingly
* ordered
- * by the minimum distance between the input splits' storage locations and the given {@link AbstractInstance}.
+ * by the minimum distance between the input splits' storage locations and the given {@link eu.stratosphere.nephele.instance.Instance}.
*
* @param instance
* the instance for which the locatable input split list has been computed
* @return the list of file input splits ordered specifically for the given instance
*/
- private Queue<QueueElem> getInstanceSplitList(final AbstractInstance instance) {
+ private Queue<QueueElem> getInstanceSplitList(final Instance instance) {
Queue<QueueElem> instanceSplitList = this.instanceMap.get(instance);
if (instanceSplitList == null) {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/splitassigner/file/FileInputSplitAssigner.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/splitassigner/file/FileInputSplitAssigner.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/splitassigner/file/FileInputSplitAssigner.java
index 938fb48..7894334 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/splitassigner/file/FileInputSplitAssigner.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/splitassigner/file/FileInputSplitAssigner.java
@@ -16,6 +16,7 @@ package eu.stratosphere.nephele.jobmanager.splitassigner.file;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import eu.stratosphere.nephele.instance.Instance;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -23,7 +24,6 @@ import eu.stratosphere.core.fs.FileInputSplit;
import eu.stratosphere.core.io.InputSplit;
import eu.stratosphere.nephele.executiongraph.ExecutionGroupVertex;
import eu.stratosphere.nephele.executiongraph.ExecutionVertex;
-import eu.stratosphere.nephele.instance.AbstractInstance;
import eu.stratosphere.nephele.jobmanager.splitassigner.InputSplitAssigner;
import eu.stratosphere.nephele.template.AbstractInputTask;
import eu.stratosphere.nephele.template.AbstractInvokable;
@@ -117,7 +117,7 @@ public final class FileInputSplitAssigner implements InputSplitAssigner {
return null;
}
- final AbstractInstance instance = vertex.getAllocatedResource().getInstance();
+ final Instance instance = vertex.getAllocatedResource().getInstance();
if (instance == null) {
LOG.error("Instance is null, returning random split");
return null;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/splitassigner/file/FileInputSplitList.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/splitassigner/file/FileInputSplitList.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/splitassigner/file/FileInputSplitList.java
index db84a91..ae9898a 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/splitassigner/file/FileInputSplitList.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/splitassigner/file/FileInputSplitList.java
@@ -21,15 +21,15 @@ import java.util.PriorityQueue;
import java.util.Queue;
import java.util.Set;
+import eu.stratosphere.nephele.instance.Instance;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import eu.stratosphere.core.fs.FileInputSplit;
-import eu.stratosphere.nephele.instance.AbstractInstance;
/**
* The file input split list stores the file input splits for an input vertex that are still expected to be consumed.
- * Besides simply storing the splits, the file input split list also computes the distance all {@link AbstractInstance}
+ * Besides simply storing the splits, the file input split list also computes the distance all {@link eu.stratosphere.nephele.instance.Instance}
* objects which request a input split and its nearest storage location with respect to the underlying network topology.
* That way input splits are always given to consuming vertices in a way that data locality is preserved as well as
* possible.
@@ -50,13 +50,13 @@ public final class FileInputSplitList {
private Set<FileInputSplit> masterSet = new HashSet<FileInputSplit>();
/**
- * The map caching the specific file input split lists for each {@link AbstractInstance}.
+ * The map caching the specific file input split lists for each {@link eu.stratosphere.nephele.instance.Instance}.
*/
- private Map<AbstractInstance, Queue<QueueElem>> instanceMap = new HashMap<AbstractInstance, Queue<QueueElem>>();
+ private Map<Instance, Queue<QueueElem>> instanceMap = new HashMap<Instance, Queue<QueueElem>>();
/**
* This is an auxiliary class to store the minimum distance between a file input split's storage locations and an
- * {@link AbstractInstance}.
+ * {@link eu.stratosphere.nephele.instance.Instance}.
*
*/
private final class QueueElem implements Comparable<QueueElem> {
@@ -119,7 +119,7 @@ public final class FileInputSplitList {
/**
* Returns the next file input split to be consumed by the given instance. The returned input split is selected in a
- * way that the distance between the split's storage location and the requesting {@link AbstractInstance} is as
+ * way that the distance between the split's storage location and the requesting {@link eu.stratosphere.nephele.instance.Instance} is as
* short as possible.
*
* @param instance
@@ -127,7 +127,7 @@ public final class FileInputSplitList {
* @return the next input split to be consumed by the given instance or <code>null</code> if all input splits have
* already been consumed.
*/
- synchronized FileInputSplit getNextInputSplit(final AbstractInstance instance) {
+ synchronized FileInputSplit getNextInputSplit(final Instance instance) {
final Queue<QueueElem> instanceSplitList = getInstanceSplitList(instance);
@@ -156,15 +156,15 @@ public final class FileInputSplitList {
}
/**
- * Returns a list of file input splits specifically ordered for the given {@link AbstractInstance}. When the list is
+ * Returns a list of file input splits specifically ordered for the given {@link eu.stratosphere.nephele.instance.Instance}. When the list is
* initially created, it contains all the unconsumed file input splits at that point in time, ascendingly ordered by
- * the minimum distance between the input splits' storage locations and the given {@link AbstractInstance}.
+ * the minimum distance between the input splits' storage locations and the given {@link eu.stratosphere.nephele.instance.Instance}.
*
* @param instance
* the instance for which the file input split list has been computed
* @return the list of file input splits ordered specifically for the given instance
*/
- private Queue<QueueElem> getInstanceSplitList(final AbstractInstance instance) {
+ private Queue<QueueElem> getInstanceSplitList(final Instance instance) {
Queue<QueueElem> instanceSplitList = this.instanceMap.get(instance);
if (instanceSplitList == null) {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/managementgraph/ManagementGraph.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/managementgraph/ManagementGraph.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/managementgraph/ManagementGraph.java
index 374656b..fab720d 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/managementgraph/ManagementGraph.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/managementgraph/ManagementGraph.java
@@ -445,9 +445,8 @@ public final class ManagementGraph extends ManagementAttachment implements IORea
groupVertexID.read(in);
final ManagementGroupVertex groupVertex = this.getGroupVertexByID(groupVertexID);
final String instanceName = StringRecord.readString(in);
- final String instanceType = StringRecord.readString(in);
final int indexInGroup = in.readInt();
- final ManagementVertex vertex = new ManagementVertex(groupVertex, vertexID, instanceName, instanceType, indexInGroup);
+ final ManagementVertex vertex = new ManagementVertex(groupVertex, vertexID, instanceName, indexInGroup);
vertex.read(in);
}
@@ -523,7 +522,6 @@ public final class ManagementGraph extends ManagementAttachment implements IORea
managementVertex.getID().write(out);
managementVertex.getGroupVertex().getID().write(out);
StringRecord.writeString(out, managementVertex.getInstanceName());
- StringRecord.writeString(out, managementVertex.getInstanceType());
out.writeInt(managementVertex.getIndexInGroup());
managementVertex.write(out);
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/managementgraph/ManagementVertex.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/managementgraph/ManagementVertex.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/managementgraph/ManagementVertex.java
index 639b1e9..eaececc 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/managementgraph/ManagementVertex.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/managementgraph/ManagementVertex.java
@@ -65,11 +65,6 @@ public final class ManagementVertex extends ManagementAttachment implements IORe
private String instanceName;
/**
- * The type of the instance the vertex represented by this management vertex currently runs on.
- */
- private String instanceType;
-
- /**
* The index of this vertex in the management group vertex it belongs to.
*/
private final int indexInGroup;
@@ -88,19 +83,14 @@ public final class ManagementVertex extends ManagementAttachment implements IORe
* the ID of the new management vertex
* @param instanceName
* the name of the instance the vertex represented by this new management vertex currently runs on
- * @param instanceType
- * the type of the instance the vertex represented by this new management vertex currently runs on
- * @param checkpointState
- * the state of the vertex's checkpoint
* @param indexInGroup
* the index of this vertex in the management group vertex it belongs to
*/
public ManagementVertex(final ManagementGroupVertex groupVertex, final ManagementVertexID id,
- final String instanceName, final String instanceType, final int indexInGroup) {
+ final String instanceName, final int indexInGroup) {
this.groupVertex = groupVertex;
this.id = id;
this.instanceName = instanceName;
- this.instanceType = instanceType;
this.indexInGroup = indexInGroup;
@@ -132,15 +122,6 @@ public final class ManagementVertex extends ManagementAttachment implements IORe
}
/**
- * Returns the type of the instance the vertex represented by this management vertex currently runs on.
- *
- * @return the type of the instance the vertex represented by this management vertex currently runs on
- */
- public String getInstanceType() {
- return this.instanceType;
- }
-
- /**
* Returns the number of input gates this management vertex contains.
*
* @return the number of input gates this management vertex contains
@@ -276,16 +257,6 @@ public final class ManagementVertex extends ManagementAttachment implements IORe
this.instanceName = instanceName;
}
- /**
- * Sets the type of instance this vertex currently runs on.
- *
- * @param instanceType
- * the type of instance this vertex currently runs on
- */
- public void setInstanceType(final String instanceType) {
- this.instanceType = instanceType;
- }
-
public void setOptMessage(final String optMessage) {
this.optMessage = optMessage;
}
@@ -294,7 +265,6 @@ public final class ManagementVertex extends ManagementAttachment implements IORe
return this.optMessage;
}
-
@Override
public void read(final DataInput in) throws IOException {
@@ -314,7 +284,6 @@ public final class ManagementVertex extends ManagementAttachment implements IORe
}
this.instanceName = StringRecord.readString(in);
- this.instanceType = StringRecord.readString(in);
}
@@ -331,7 +300,6 @@ public final class ManagementVertex extends ManagementAttachment implements IORe
out.writeInt(this.outputGates.size());
StringRecord.writeString(out, this.instanceName);
- StringRecord.writeString(out, this.instanceType);
}
@Override
@@ -351,7 +319,6 @@ public final class ManagementVertex extends ManagementAttachment implements IORe
json.append("\"vertexname\": \"" + StringUtils.escapeHtml(this.toString()) + "\",");
json.append("\"vertexstatus\": \"" + this.getExecutionState() + "\",");
json.append("\"vertexinstancename\": \"" + this.getInstanceName() + "\",");
- json.append("\"vertexinstancetype\": \"" + this.getInstanceType() + "\"");
json.append("}");
return json.toString();
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/net/NetUtils.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/net/NetUtils.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/net/NetUtils.java
index 35979dd..cb08c3a 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/net/NetUtils.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/net/NetUtils.java
@@ -221,6 +221,7 @@ public class NetUtils {
* @return InputStream for reading from the socket.
* @throws IOException
*/
+ @SuppressWarnings("resource")
public static InputStream getInputStream(Socket socket, long timeout) throws IOException {
return (socket.getChannel() == null) ? socket.getInputStream() : new SocketInputStream(socket, timeout);
}
@@ -266,6 +267,7 @@ public class NetUtils {
* @return OutputStream for writing to the socket.
* @throws IOException
*/
+ @SuppressWarnings("resource")
public static OutputStream getOutputStream(Socket socket, long timeout) throws IOException {
return (socket.getChannel() == null) ? socket.getOutputStream() : new SocketOutputStream(socket, timeout);
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/profiling/impl/JobProfilingData.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/profiling/impl/JobProfilingData.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/profiling/impl/JobProfilingData.java
index 7d0c980..81b4134 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/profiling/impl/JobProfilingData.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/profiling/impl/JobProfilingData.java
@@ -23,7 +23,7 @@ import eu.stratosphere.nephele.executiongraph.ExecutionGraph;
import eu.stratosphere.nephele.executiongraph.ExecutionGroupVertex;
import eu.stratosphere.nephele.executiongraph.ExecutionGroupVertexIterator;
import eu.stratosphere.nephele.executiongraph.ExecutionVertex;
-import eu.stratosphere.nephele.instance.AbstractInstance;
+import eu.stratosphere.nephele.instance.Instance;
import eu.stratosphere.nephele.instance.DummyInstance;
import eu.stratosphere.nephele.instance.InstanceConnectionInfo;
import eu.stratosphere.nephele.profiling.impl.types.InternalInstanceProfilingData;
@@ -75,7 +75,7 @@ public class JobProfilingData {
public InstanceSummaryProfilingEvent getInstanceSummaryProfilingData(long timestamp) {
- final Set<AbstractInstance> tempSet = new HashSet<AbstractInstance>();
+ final Set<Instance> tempSet = new HashSet<Instance>();
// First determine the number of allocated instances in the current stage
final ExecutionGroupVertexIterator it = new ExecutionGroupVertexIterator(this.executionGraph, true,
this.executionGraph.getIndexOfCurrentExecutionStage());
@@ -84,7 +84,7 @@ public class JobProfilingData {
final ExecutionGroupVertex groupVertex = it.next();
for (int i = 0; i < groupVertex.getCurrentNumberOfGroupMembers(); i++) {
final ExecutionVertex executionVertex = groupVertex.getGroupMember(i);
- final AbstractInstance instance = executionVertex.getAllocatedResource().getInstance();
+ final Instance instance = executionVertex.getAllocatedResource().getInstance();
if(!(instance instanceof DummyInstance)) {
tempSet.add(instance);
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/protocols/ExtendedManagementProtocol.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/protocols/ExtendedManagementProtocol.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/protocols/ExtendedManagementProtocol.java
index 59ec15d..c731285 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/protocols/ExtendedManagementProtocol.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/protocols/ExtendedManagementProtocol.java
@@ -15,13 +15,10 @@ package eu.stratosphere.nephele.protocols;
import java.io.IOException;
import java.util.List;
-import java.util.Map;
import eu.stratosphere.core.io.StringRecord;
import eu.stratosphere.nephele.event.job.AbstractEvent;
import eu.stratosphere.nephele.event.job.RecentJobEvent;
-import eu.stratosphere.nephele.instance.InstanceType;
-import eu.stratosphere.nephele.instance.InstanceTypeDescription;
import eu.stratosphere.nephele.jobgraph.JobID;
import eu.stratosphere.nephele.managementgraph.ManagementGraph;
import eu.stratosphere.nephele.managementgraph.ManagementVertexID;
@@ -104,19 +101,6 @@ public interface ExtendedManagementProtocol extends JobManagementProtocol {
void killInstance(StringRecord instanceName) throws IOException;
/**
- * Returns a map of all instance types which are currently available to Nephele. The map contains a description of
- * the hardware characteristics for each instance type as provided in the configuration file. Moreover, it contains
- * the actual hardware description as reported by task managers running on the individual instances. If available,
- * the map also contains the maximum number instances Nephele can allocate of each instance type (i.e. if no other
- * job occupies instances).
- *
- * @return a list of all instance types available to Nephele
- * @throws IOException
- * thrown if an error occurs while transmitting the list
- */
- Map<InstanceType, InstanceTypeDescription> getMapOfAvailableInstanceTypes() throws IOException;
-
- /**
* Triggers all task managers involved in processing the job with the given job ID to write the utilization of
* their read and write buffers to their log files. This method is primarily for debugging purposes.
*
@@ -126,4 +110,11 @@ public interface ExtendedManagementProtocol extends JobManagementProtocol {
* throws if an error occurs while transmitting the request
*/
void logBufferUtilization(JobID jobID) throws IOException;
+
+ /**
+ * Returns the number of available slots among the registered task managers
+ * @return number of available slots
+ * @throws IOException
+ */
+ int getAvailableSlots() throws IOException;
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/protocols/JobManagerProtocol.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/protocols/JobManagerProtocol.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/protocols/JobManagerProtocol.java
index 8cd5e26..5070b51 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/protocols/JobManagerProtocol.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/protocols/JobManagerProtocol.java
@@ -19,6 +19,8 @@ import eu.stratosphere.core.protocols.VersionedProtocol;
import eu.stratosphere.nephele.instance.HardwareDescription;
import eu.stratosphere.nephele.instance.InstanceConnectionInfo;
import eu.stratosphere.nephele.taskmanager.TaskExecutionState;
+import eu.stratosphere.nephele.taskmanager.transferenvelope.RegisterTaskManagerResult;
+import eu.stratosphere.nephele.types.IntegerRecord;
/**
* The job manager protocol is implemented by the job manager and offers functionality
@@ -33,12 +35,23 @@ public interface JobManagerProtocol extends VersionedProtocol {
*
* @param instanceConnectionInfo
* the information the job manager requires to connect to the instance's task manager
- * @param hardwareDescription
- * a hardware description with details on the instance's compute resources.
* @throws IOException
* thrown if an error occurs during this remote procedure call
*/
- void sendHeartbeat(InstanceConnectionInfo instanceConnectionInfo, HardwareDescription hardwareDescription)
+ void sendHeartbeat(InstanceConnectionInfo instanceConnectionInfo)
+ throws IOException;
+
+ /**
+ * Registers a task manager at the JobManager.
+ *
+ * @param instanceConnectionInfo the information the job manager requires to connect to the instance's task manager
+ * @param hardwareDescription a hardware description with details on the instance's compute resources.
+ * @throws IOException
+ *
+ * @return whether the task manager was successfully registered
+ */
+ RegisterTaskManagerResult registerTaskManager(InstanceConnectionInfo instanceConnectionInfo,
+ HardwareDescription hardwareDescription,IntegerRecord numberOfSlots)
throws IOException;
/**
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/services/iomanager/ChannelAccess.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/services/iomanager/ChannelAccess.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/services/iomanager/ChannelAccess.java
index ccbc64a..85432eb 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/services/iomanager/ChannelAccess.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/services/iomanager/ChannelAccess.java
@@ -75,6 +75,7 @@ public abstract class ChannelAccess<T, R extends IORequest>
this.requestQueue = requestQueue;
try {
+ @SuppressWarnings("resource")
RandomAccessFile file = new RandomAccessFile(id.getPath(), writeEnabled ? "rw" : "r");
this.fileChannel = file.getChannel();
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/services/memorymanager/MemoryManager.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/services/memorymanager/MemoryManager.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/services/memorymanager/MemoryManager.java
index a8fe096..8b20c75 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/services/memorymanager/MemoryManager.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/services/memorymanager/MemoryManager.java
@@ -69,16 +69,29 @@ public interface MemoryManager {
* @return The size of the pages handled by the memory manager.
*/
int getPageSize();
+
+ /**
+ * Returns the total size of memory.
+ * @return
+ */
+ long getMemorySize();
/**
* Computes to how many pages the given number of bytes corresponds. If the given number of bytes is not an
* exact multiple of a page size, the result is rounded down, such that a portion of the memory (smaller
* than the page size) is not included.
*
- * @param numBytes The number of bytes to convert to a page count.
+ * @param fraction the fraction of the total memory per slot
* @return The number of pages to which
*/
- int computeNumberOfPages(long numBytes);
+ int computeNumberOfPages(double fraction);
+
+ /**
+ * Computes the memory size of the fraction per slot.
+ * @param fraction
+ * @return
+ */
+ long computeMemorySize(double fraction);
/**
* Rounds the given value down to a multiple of the memory manager's page size.
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/services/memorymanager/spi/DefaultMemoryManager.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/services/memorymanager/spi/DefaultMemoryManager.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/services/memorymanager/spi/DefaultMemoryManager.java
index 8bc7b13..d4a2b36 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/services/memorymanager/spi/DefaultMemoryManager.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/services/memorymanager/spi/DefaultMemoryManager.java
@@ -66,6 +66,13 @@ public class DefaultMemoryManager implements MemoryManager {
private boolean isShutDown; // flag whether the close() has already been invoked.
+ /**
+ * Number of slots of the task manager
+ */
+ private final int numberOfSlots;
+
+ private final long memorySize;
+
// ------------------------------------------------------------------------
// Constructors / Destructors
// ------------------------------------------------------------------------
@@ -75,8 +82,8 @@ public class DefaultMemoryManager implements MemoryManager {
*
* @param memorySize The total size of the memory to be managed by this memory manager.
*/
- public DefaultMemoryManager(long memorySize) {
- this(memorySize, DEFAULT_PAGE_SIZE);
+ public DefaultMemoryManager(long memorySize, int numberOfSlots) {
+ this(memorySize, numberOfSlots, DEFAULT_PAGE_SIZE);
}
/**
@@ -85,7 +92,7 @@ public class DefaultMemoryManager implements MemoryManager {
* @param memorySize The total size of the memory to be managed by this memory manager.
* @param pageSize The size of the pages handed out by the memory manager.
*/
- public DefaultMemoryManager(long memorySize, int pageSize) {
+ public DefaultMemoryManager(long memorySize, int numberOfSlots, int pageSize) {
// sanity checks
if (memorySize <= 0) {
throw new IllegalArgumentException("Size of total memory must be positive.");
@@ -97,6 +104,10 @@ public class DefaultMemoryManager implements MemoryManager {
// not a power of two
throw new IllegalArgumentException("The given page size is not a power of two.");
}
+
+ this.memorySize = memorySize;
+
+ this.numberOfSlots = numberOfSlots;
// assign page size and bit utilities
this.pageSize = pageSize;
@@ -348,8 +359,18 @@ public class DefaultMemoryManager implements MemoryManager {
}
@Override
- public int computeNumberOfPages(long numBytes) {
- return getNumPages(numBytes);
+ public long getMemorySize() {
+ return this.memorySize;
+ }
+
+ @Override
+ public int computeNumberOfPages(double fraction) {
+ return getRelativeNumPages(fraction);
+ }
+
+ @Override
+ public long computeMemorySize(double fraction) {
+ return this.pageSize*computeNumberOfPages(fraction);
}
@Override
@@ -371,6 +392,14 @@ public class DefaultMemoryManager implements MemoryManager {
throw new IllegalArgumentException("The given number of bytes correstponds to more than MAX_INT pages.");
}
}
+
+ private final int getRelativeNumPages(double fraction){
+ if(fraction < 0){
+ throw new IllegalArgumentException("The fraction of memory to allocate must not be negative.");
+ }
+
+ return (int)(this.totalNumPages * fraction / this.numberOfSlots);
+ }
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskManager.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskManager.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskManager.java
index ef0f6ab..5966cf9 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskManager.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskManager.java
@@ -45,6 +45,10 @@ import eu.stratosphere.nephele.ExecutionMode;
import eu.stratosphere.runtime.io.network.LocalConnectionManager;
import eu.stratosphere.runtime.io.network.NetworkConnectionManager;
import eu.stratosphere.runtime.io.network.netty.NettyConnectionManager;
+import eu.stratosphere.nephele.instance.Hardware;
+import eu.stratosphere.nephele.taskmanager.transferenvelope.RegisterTaskManagerResult;
+import eu.stratosphere.nephele.types.IntegerRecord;
+
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.GnuParser;
@@ -148,7 +152,9 @@ public class TaskManager implements TaskOperationProtocol {
private final IOManager ioManager;
- private static HardwareDescription hardwareDescription = null;
+ private final HardwareDescription hardwareDescription;
+
+ private final int numberOfSlots;
private final Thread heartbeatThread;
@@ -156,10 +162,10 @@ public class TaskManager implements TaskOperationProtocol {
/** Stores whether the task manager has already been shut down. */
private volatile boolean shutdownComplete;
-
+
/**
* Constructs a new task manager, starts its IPC service and attempts to discover the job manager to
- * receive an initial configuration. All parameters are obtained from the
+ * receive an initial configuration. All parameters are obtained from the
* {@link GlobalConfiguration}, which must be loaded prior to instantiating the task manager.
*/
public TaskManager(ExecutionMode executionMode) throws Exception {
@@ -169,30 +175,31 @@ public class TaskManager implements TaskOperationProtocol {
LOG.info("Execution mode: " + executionMode);
// IMPORTANT! At this point, the GlobalConfiguration must have been read!
-
+
final InetSocketAddress jobManagerAddress;
{
LOG.info("Reading location of job manager from configuration");
-
+
final String address = GlobalConfiguration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null);
final int port = GlobalConfiguration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT);
-
+
if (address == null) {
throw new Exception("Job manager address not configured in the GlobalConfiguration.");
}
-
+
// Try to convert configured address to {@link InetAddress}
try {
final InetAddress tmpAddress = InetAddress.getByName(address);
jobManagerAddress = new InetSocketAddress(tmpAddress, port);
- } catch (UnknownHostException e) {
+ }
+ catch (UnknownHostException e) {
LOG.fatal("Could not resolve JobManager host name.");
throw new Exception("Could not resolve JobManager host name: " + e.getMessage(), e);
}
-
+
LOG.info("Connecting to JobManager at: " + jobManagerAddress);
}
-
+
// Create RPC connection to the JobManager
try {
this.jobManager = RPC.getProxy(JobManagerProtocol.class, jobManagerAddress, NetUtils.getSocketFactory());
@@ -200,7 +207,7 @@ public class TaskManager implements TaskOperationProtocol {
LOG.fatal("Could not connect to the JobManager: " + e.getMessage(), e);
throw new Exception("Failed to initialize connection to JobManager: " + e.getMessage(), e);
}
-
+
int ipcPort = GlobalConfiguration.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, -1);
int dataPort = GlobalConfiguration.getInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, -1);
if (ipcPort == -1) {
@@ -209,16 +216,17 @@ public class TaskManager implements TaskOperationProtocol {
if (dataPort == -1) {
dataPort = getAvailablePort();
}
-
+
// Determine our own public facing address and start the server
{
final InetAddress taskManagerAddress;
try {
taskManagerAddress = getTaskManagerAddress(jobManagerAddress);
- } catch (Exception e) {
+ }
+ catch (Exception e) {
throw new RuntimeException("The TaskManager failed to determine its own network address.", e);
}
-
+
this.localInstanceConnectionInfo = new InstanceConnectionInfo(taskManagerAddress, ipcPort, dataPort);
LOG.info("TaskManager connection information:" + this.localInstanceConnectionInfo);
@@ -231,7 +239,7 @@ public class TaskManager implements TaskOperationProtocol {
throw new Exception("Failed to start taskmanager server. " + e.getMessage(), e);
}
}
-
+
// Try to create local stub of the global input split provider
try {
this.globalInputSplitProvider = RPC.getProxy(InputSplitProviderProtocol.class, jobManagerAddress, NetUtils.getSocketFactory());
@@ -258,21 +266,19 @@ public class TaskManager implements TaskOperationProtocol {
// Load profiler if it should be used
if (GlobalConfiguration.getBoolean(ProfilingUtils.ENABLE_PROFILING_KEY, false)) {
-
+
final String profilerClassName = GlobalConfiguration.getString(ProfilingUtils.TASKMANAGER_CLASSNAME_KEY,
- "eu.stratosphere.nephele.profiling.impl.TaskManagerProfilerImpl");
-
+ "eu.stratosphere.nephele.profiling.impl.TaskManagerProfilerImpl");
+
this.profiler = ProfilingUtils.loadTaskManagerProfiler(profilerClassName, jobManagerAddress.getAddress(),
- this.localInstanceConnectionInfo);
-
+ this.localInstanceConnectionInfo);
+
if (this.profiler == null) {
LOG.error("Cannot find class name for the profiler.");
- }
- else {
+ } else {
LOG.info("Profiling of jobs is enabled.");
}
- }
- else {
+ } else {
this.profiler = null;
LOG.info("Profiling of jobs is disabled.");
}
@@ -282,10 +288,11 @@ public class TaskManager implements TaskOperationProtocol {
ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH).split(",|" + File.pathSeparator);
checkTempDirs(tmpDirPaths);
-
+
final int pageSize = GlobalConfiguration.getInteger(ConfigConstants.TASK_MANAGER_NETWORK_BUFFER_SIZE_KEY,
- ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_BUFFER_SIZE);
+ ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_BUFFER_SIZE);
+ // Initialize network buffer pool
int numBuffers = GlobalConfiguration.getInteger(
ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY,
ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_NUM_BUFFERS);
@@ -333,6 +340,8 @@ public class TaskManager implements TaskOperationProtocol {
{
HardwareDescription resources = HardwareDescriptionFactory.extractFromSystem();
+ numberOfSlots = GlobalConfiguration.getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS,
+ Hardware.getNumberCPUCores());
// Check whether the memory size has been explicitly configured. if so that overrides the default mechanism
// of taking as much as is mentioned in the hardware description
@@ -341,29 +350,30 @@ public class TaskManager implements TaskOperationProtocol {
if (memorySize > 0) {
// manually configured memory size. override the value in the hardware config
resources = HardwareDescriptionFactory.construct(resources.getNumberOfCPUCores(),
- resources.getSizeOfPhysicalMemory(), memorySize * 1024L * 1024L);
+ resources.getSizeOfPhysicalMemory(), memorySize * 1024L * 1024L);
}
this.hardwareDescription = resources;
// Initialize the memory manager
LOG.info("Initializing memory manager with " + (resources.getSizeOfFreeMemory() >>> 20) + " megabytes of memory. " +
"Page size is " + pageSize + " bytes.");
-
+
try {
@SuppressWarnings("unused")
final boolean lazyAllocation = GlobalConfiguration.getBoolean(ConfigConstants.TASK_MANAGER_MEMORY_LAZY_ALLOCATION_KEY,
- ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_LAZY_ALLOCATION);
-
- this.memoryManager = new DefaultMemoryManager(resources.getSizeOfFreeMemory(), pageSize);
+ ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_LAZY_ALLOCATION);
+
+ this.memoryManager = new DefaultMemoryManager(resources.getSizeOfFreeMemory(), this.numberOfSlots,
+ pageSize);
} catch (Throwable t) {
LOG.fatal("Unable to initialize memory manager with " + (resources.getSizeOfFreeMemory() >>> 20)
- + " megabytes of memory.", t);
+ + " megabytes of memory.", t);
throw new Exception("Unable to initialize memory manager.", t);
}
}
this.ioManager = new IOManager(tmpDirPaths);
-
+
this.heartbeatThread = new Thread() {
@Override
public void run() {
@@ -510,19 +520,33 @@ public class TaskManager implements TaskOperationProtocol {
ConfigConstants.TASK_MANAGER_HEARTBEAT_INTERVAL_KEY,
ConfigConstants.DEFAULT_TASK_MANAGER_HEARTBEAT_INTERVAL);
- while (!shutdownStarted.get()) {
- // send heart beat
- try {
- LOG.debug("heartbeat");
- this.jobManager.sendHeartbeat(this.localInstanceConnectionInfo, this.hardwareDescription);
- } catch (IOException e) {
- if (shutdownStarted.get()) {
+ try {
+ while(!shutdownStarted.get()){
+ RegisterTaskManagerResult result = this.jobManager.registerTaskManager(this
+ .localInstanceConnectionInfo,this.hardwareDescription,
+ new IntegerRecord(this.numberOfSlots));
+
+ if(result.getReturnCode() == RegisterTaskManagerResult.ReturnCode.SUCCESS){
break;
- } else {
- LOG.error("Sending the heart beat caused an exception: " + e.getMessage(), e);
+ }
+
+ try{
+ Thread.sleep(50);
+ }catch(InterruptedException e){
+ if (!shutdownStarted.get()) {
+ LOG.error("TaskManager register task manager loop was interrupted without shutdown.");
+ }
}
}
-
+
+ } catch (IOException e) {
+ if(!shutdownStarted.get()){
+ LOG.error("Registering task manager caused an exception: " + e.getMessage(), e);
+ }
+ return;
+ }
+
+ while (!shutdownStarted.get()) {
// sleep until the next heart beat
try {
Thread.sleep(interval);
@@ -532,9 +556,22 @@ public class TaskManager implements TaskOperationProtocol {
LOG.error("TaskManager heart beat loop was interrupted without shutdown.");
}
}
+
+ // send heart beat
+ try {
+ LOG.debug("heartbeat");
+ this.jobManager.sendHeartbeat(this.localInstanceConnectionInfo);
+ } catch (IOException e) {
+ if (shutdownStarted.get()) {
+ break;
+ } else {
+ LOG.error("Sending the heart beat caused an exception: " + e.getMessage(), e);
+ }
+ }
}
}
+
/**
* The states of address detection mechanism.
* There is only a state transition if the current state failed to determine the address.
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/transferenvelope/RegisterTaskManagerResult.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/transferenvelope/RegisterTaskManagerResult.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/transferenvelope/RegisterTaskManagerResult.java
new file mode 100644
index 0000000..b396edd
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/transferenvelope/RegisterTaskManagerResult.java
@@ -0,0 +1,50 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.nephele.taskmanager.transferenvelope;
+
+import eu.stratosphere.core.io.IOReadableWritable;
+import eu.stratosphere.nephele.util.EnumUtils;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+public class RegisterTaskManagerResult implements IOReadableWritable {
+ public enum ReturnCode{
+ SUCCESS, FAILURE
+ };
+
+ public RegisterTaskManagerResult(){
+ this.returnCode = ReturnCode.SUCCESS;
+ }
+
+ public RegisterTaskManagerResult(ReturnCode returnCode){
+ this.returnCode = returnCode;
+ }
+
+ private ReturnCode returnCode;
+
+ public ReturnCode getReturnCode() { return this.returnCode; }
+
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ EnumUtils.writeEnum(out, this.returnCode);
+ }
+
+ @Override
+ public void read(DataInput in) throws IOException {
+ this.returnCode = EnumUtils.readEnum(in, ReturnCode.class);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/topology/NetworkNode.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/topology/NetworkNode.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/topology/NetworkNode.java
index 09df691..9f6542b 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/topology/NetworkNode.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/topology/NetworkNode.java
@@ -33,8 +33,6 @@ public class NetworkNode implements IOReadableWritable {
private final List<NetworkNode> childNodes = new ArrayList<NetworkNode>();
- private Object attachment;
-
protected NetworkNode(final String name, final NetworkNode parentNode, final NetworkTopology networkTopology) {
this.name = name;
this.parentNode = parentNode;
@@ -119,14 +117,6 @@ public class NetworkNode implements IOReadableWritable {
return this.childNodes.size();
}
- public void setAttachment(final Object attachment) {
- this.attachment = attachment;
- }
-
- public Object getAttachment() {
- return this.attachment;
- }
-
public NetworkNode getChildNode(final int index) {
if (index < this.childNodes.size()) {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/util/IOUtils.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/util/IOUtils.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/util/IOUtils.java
index 0ca490b..554bac5 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/util/IOUtils.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/util/IOUtils.java
@@ -56,6 +56,7 @@ public final class IOUtils {
public static void copyBytes(final InputStream in, final OutputStream out, final int buffSize, final boolean close)
throws IOException {
+ @SuppressWarnings("resource")
final PrintStream ps = out instanceof PrintStream ? (PrintStream) out : null;
final byte[] buf = new byte[buffSize];
try {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/cache/FileCache.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/cache/FileCache.java b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/cache/FileCache.java
index e4f0a4b..fe63ebe 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/cache/FileCache.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/cache/FileCache.java
@@ -14,7 +14,6 @@
package eu.stratosphere.pact.runtime.cache;
import eu.stratosphere.api.common.cache.DistributedCache;
-
import java.io.File;
import java.io.IOException;
import java.util.HashMap;
@@ -138,14 +137,12 @@ public class FileCache {
* Asynchronous file copy process
*/
private class CopyProcess implements Callable<Path> {
+
private JobID jobID;
- @SuppressWarnings("unused")
- private String name;
private String filePath;
private Boolean executable;
public CopyProcess(String name, DistributedCacheEntry e, JobID jobID) {
- this.name = name;
this.filePath = e.filePath;
this.executable = e.isExecutable;
this.jobID = jobID;
@@ -168,15 +165,13 @@ public class FileCache {
* If no task is using this file after 5 seconds, clear it.
*/
private class DeleteProcess implements Runnable {
+
private String name;
- @SuppressWarnings("unused")
- private String filePath;
private JobID jobID;
private int oldCount;
public DeleteProcess(String name, DistributedCacheEntry e, JobID jobID, int c) {
this.name = name;
- this.filePath = e.filePath;
this.jobID = jobID;
this.oldCount = c;
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/hash/BuildFirstHashMatchIterator.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/hash/BuildFirstHashMatchIterator.java b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/hash/BuildFirstHashMatchIterator.java
index ddfa446..a060d28 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/hash/BuildFirstHashMatchIterator.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/hash/BuildFirstHashMatchIterator.java
@@ -60,7 +60,7 @@ public class BuildFirstHashMatchIterator<V1, V2, O> implements JoinTaskIterator<
TypeSerializer<V1> serializer1, TypeComparator<V1> comparator1,
TypeSerializer<V2> serializer2, TypeComparator<V2> comparator2,
TypePairComparator<V2, V1> pairComparator,
- MemoryManager memManager, IOManager ioManager, AbstractInvokable ownerTask, long totalMemory)
+ MemoryManager memManager, IOManager ioManager, AbstractInvokable ownerTask, double memoryFraction)
throws MemoryAllocationException
{
this.memManager = memManager;
@@ -73,7 +73,7 @@ public class BuildFirstHashMatchIterator<V1, V2, O> implements JoinTaskIterator<
this.probeCopy = serializer2.createInstance();
this.hashJoin = getHashJoin(serializer1, comparator1, serializer2, comparator2, pairComparator,
- memManager, ioManager, ownerTask, totalMemory);
+ memManager, ioManager, ownerTask, memoryFraction);
}
// --------------------------------------------------------------------------------------------
@@ -152,10 +152,10 @@ public class BuildFirstHashMatchIterator<V1, V2, O> implements JoinTaskIterator<
public <BT, PT> MutableHashTable<BT, PT> getHashJoin(TypeSerializer<BT> buildSideSerializer, TypeComparator<BT> buildSideComparator,
TypeSerializer<PT> probeSideSerializer, TypeComparator<PT> probeSideComparator,
TypePairComparator<PT, BT> pairComparator,
- MemoryManager memManager, IOManager ioManager, AbstractInvokable ownerTask, long totalMemory)
+ MemoryManager memManager, IOManager ioManager, AbstractInvokable ownerTask, double memoryFraction)
throws MemoryAllocationException
{
- final int numPages = memManager.computeNumberOfPages(totalMemory);
+ final int numPages = memManager.computeNumberOfPages(memoryFraction);
final List<MemorySegment> memorySegments = memManager.allocatePages(ownerTask, numPages);
return new MutableHashTable<BT, PT>(buildSideSerializer, probeSideSerializer, buildSideComparator, probeSideComparator, pairComparator, memorySegments, ioManager);
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/hash/BuildFirstReOpenableHashMatchIterator.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/hash/BuildFirstReOpenableHashMatchIterator.java b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/hash/BuildFirstReOpenableHashMatchIterator.java
index d699462..8c2b9ca 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/hash/BuildFirstReOpenableHashMatchIterator.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/hash/BuildFirstReOpenableHashMatchIterator.java
@@ -38,21 +38,21 @@ public class BuildFirstReOpenableHashMatchIterator<V1, V2, O> extends BuildFirst
TypeSerializer<V2> serializer2, TypeComparator<V2> comparator2,
TypePairComparator<V2, V1> pairComparator,
MemoryManager memManager, IOManager ioManager,
- AbstractInvokable ownerTask, long totalMemory)
+ AbstractInvokable ownerTask, double memoryFraction)
throws MemoryAllocationException {
super(firstInput, secondInput, serializer1, comparator1, serializer2,
comparator2, pairComparator, memManager, ioManager, ownerTask,
- totalMemory);
+ memoryFraction);
reopenHashTable = (ReOpenableMutableHashTable<V1, V2>) hashJoin;
}
public <BT, PT> MutableHashTable<BT, PT> getHashJoin(TypeSerializer<BT> buildSideSerializer, TypeComparator<BT> buildSideComparator,
TypeSerializer<PT> probeSideSerializer, TypeComparator<PT> probeSideComparator,
TypePairComparator<PT, BT> pairComparator,
- MemoryManager memManager, IOManager ioManager, AbstractInvokable ownerTask, long totalMemory)
+ MemoryManager memManager, IOManager ioManager, AbstractInvokable ownerTask, double memoryFraction)
throws MemoryAllocationException
{
- final int numPages = memManager.computeNumberOfPages(totalMemory);
+ final int numPages = memManager.computeNumberOfPages(memoryFraction);
final List<MemorySegment> memorySegments = memManager.allocatePages(ownerTask, numPages);
return new ReOpenableMutableHashTable<BT, PT>(buildSideSerializer, probeSideSerializer, buildSideComparator, probeSideComparator, pairComparator, memorySegments, ioManager);
}