You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2014/06/10 21:35:17 UTC
[20/34] Offer buffer-oriented API for I/O (#25)
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/JobManager.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/JobManager.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/JobManager.java
index eba81a2..846ca2e 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/JobManager.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/JobManager.java
@@ -75,7 +75,7 @@ import eu.stratosphere.nephele.instance.InstanceManager;
import eu.stratosphere.nephele.instance.InstanceType;
import eu.stratosphere.nephele.instance.InstanceTypeDescription;
import eu.stratosphere.nephele.instance.local.LocalInstanceManager;
-import eu.stratosphere.nephele.io.channels.ChannelID;
+import eu.stratosphere.runtime.io.channels.ChannelID;
import eu.stratosphere.nephele.ipc.RPC;
import eu.stratosphere.nephele.ipc.Server;
import eu.stratosphere.nephele.jobgraph.AbstractJobVertex;
@@ -90,8 +90,6 @@ import eu.stratosphere.nephele.jobmanager.splitassigner.InputSplitManager;
import eu.stratosphere.nephele.jobmanager.splitassigner.InputSplitWrapper;
import eu.stratosphere.nephele.jobmanager.web.WebInfoServer;
import eu.stratosphere.nephele.managementgraph.ManagementGraph;
-import eu.stratosphere.nephele.managementgraph.ManagementVertexID;
-import eu.stratosphere.nephele.multicast.MulticastManager;
import eu.stratosphere.nephele.profiling.JobManagerProfiler;
import eu.stratosphere.nephele.profiling.ProfilingUtils;
import eu.stratosphere.nephele.protocols.AccumulatorProtocol;
@@ -103,11 +101,10 @@ import eu.stratosphere.nephele.services.accumulators.AccumulatorEvent;
import eu.stratosphere.nephele.taskmanager.AbstractTaskResult;
import eu.stratosphere.nephele.taskmanager.TaskCancelResult;
import eu.stratosphere.nephele.taskmanager.TaskExecutionState;
-import eu.stratosphere.nephele.taskmanager.TaskKillResult;
import eu.stratosphere.nephele.taskmanager.TaskSubmissionResult;
-import eu.stratosphere.nephele.taskmanager.bytebuffered.ConnectionInfoLookupResponse;
-import eu.stratosphere.nephele.taskmanager.bytebuffered.RemoteReceiver;
-import eu.stratosphere.nephele.taskmanager.runtime.ExecutorThreadFactory;
+import eu.stratosphere.runtime.io.network.ConnectionInfoLookupResponse;
+import eu.stratosphere.runtime.io.network.RemoteReceiver;
+import eu.stratosphere.nephele.taskmanager.ExecutorThreadFactory;
import eu.stratosphere.nephele.topology.NetworkTopology;
import eu.stratosphere.nephele.types.IntegerRecord;
import eu.stratosphere.nephele.util.SerializableArrayList;
@@ -141,8 +138,6 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
private final InputSplitManager inputSplitManager;
private final AbstractScheduler scheduler;
-
- private final MulticastManager multicastManager;
private AccumulatorManager accumulatorManager;
@@ -246,9 +241,6 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
throw new Exception("Unable to load scheduler " + schedulerClassName);
}
- // Create multicastManager
- this.multicastManager = new MulticastManager(this.scheduler);
-
// Load profiler if it should be used
if (GlobalConfiguration.getBoolean(ProfilingUtils.ENABLE_PROFILING_KEY, false)) {
final String profilerClassName = GlobalConfiguration.getString(ProfilingUtils.JOBMANAGER_CLASSNAME_KEY,
@@ -732,8 +724,7 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
@Override
- public ConnectionInfoLookupResponse lookupConnectionInfo(final InstanceConnectionInfo caller, final JobID jobID,
- final ChannelID sourceChannelID) {
+ public ConnectionInfoLookupResponse lookupConnectionInfo(InstanceConnectionInfo caller, JobID jobID, ChannelID sourceChannelID) {
final ExecutionGraph eg = this.scheduler.getExecutionGraphByID(jobID);
if (eg == null) {
@@ -754,7 +745,6 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
if (sourceChannelID.equals(edge.getInputChannelID())) {
// Request was sent from an input channel
-
final ExecutionVertex connectedVertex = edge.getOutputGate().getVertex();
final AbstractInstance assignedInstance = connectedVertex.getAllocatedResource().getInstance();
@@ -768,9 +758,11 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
// Check execution state
final ExecutionState executionState = connectedVertex.getExecutionState();
if (executionState == ExecutionState.FINISHED) {
- return ConnectionInfoLookupResponse.createReceiverFoundAndReady();
+ // that should not happen. if there is data pending, the receiver cannot be ready
+ return ConnectionInfoLookupResponse.createReceiverNotFound();
}
+ // running is common, finishing is happens when the lookup is for the close event
if (executionState != ExecutionState.RUNNING && executionState != ExecutionState.FINISHING) {
// LOG.info("Created receiverNotReady for " + connectedVertex + " in state " + executionState + " 2");
return ConnectionInfoLookupResponse.createReceiverNotReady();
@@ -781,74 +773,53 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
return ConnectionInfoLookupResponse.createReceiverFoundAndReady(edge.getOutputChannelID());
} else {
// Receiver runs on a different task manager
-
final InstanceConnectionInfo ici = assignedInstance.getInstanceConnectionInfo();
- final InetSocketAddress isa = new InetSocketAddress(ici.getAddress(), ici.getDataPort());
+ final InetSocketAddress isa = new InetSocketAddress(ici.address(), ici.dataPort());
- return ConnectionInfoLookupResponse.createReceiverFoundAndReady(new RemoteReceiver(isa, edge
- .getConnectionID()));
+ return ConnectionInfoLookupResponse.createReceiverFoundAndReady(new RemoteReceiver(isa, edge.getConnectionID()));
}
}
+ // else, the request is for an output channel
+ // Find vertex of connected input channel
+ final ExecutionVertex targetVertex = edge.getInputGate().getVertex();
- if (edge.isBroadcast()) {
+ // Check execution state
+ final ExecutionState executionState = targetVertex.getExecutionState();
- return multicastManager.lookupConnectionInfo(caller, jobID, sourceChannelID);
+ // check whether the task needs to be deployed
+ if (executionState != ExecutionState.RUNNING && executionState != ExecutionState.FINISHING && executionState != ExecutionState.FINISHED) {
- } else {
-
- // Find vertex of connected input channel
- final ExecutionVertex targetVertex = edge.getInputGate().getVertex();
-
- // Check execution state
- final ExecutionState executionState = targetVertex.getExecutionState();
-
- if (executionState != ExecutionState.RUNNING && executionState != ExecutionState.FINISHING
- && executionState != ExecutionState.FINISHED) {
-
- if (executionState == ExecutionState.ASSIGNED) {
-
- final Runnable command = new Runnable() {
-
- /**
- * {@inheritDoc}
- */
- @Override
- public void run() {
- scheduler.deployAssignedVertices(targetVertex);
- }
- };
-
- eg.executeCommand(command);
- }
-
- // LOG.info("Created receiverNotReady for " + targetVertex + " in state " + executionState + " 3");
- return ConnectionInfoLookupResponse.createReceiverNotReady();
- }
-
- final AbstractInstance assignedInstance = targetVertex.getAllocatedResource().getInstance();
- if (assignedInstance == null) {
- LOG.error("Cannot resolve lookup: vertex found for channel ID " + edge.getInputChannelID()
- + " but no instance assigned");
- // LOG.info("Created receiverNotReady for " + targetVertex + " in state " + executionState + " 4");
- return ConnectionInfoLookupResponse.createReceiverNotReady();
+ if (executionState == ExecutionState.ASSIGNED) {
+ final Runnable command = new Runnable() {
+ @Override
+ public void run() {
+ scheduler.deployAssignedVertices(targetVertex);
+ }
+ };
+ eg.executeCommand(command);
}
- if (assignedInstance.getInstanceConnectionInfo().equals(caller)) {
- // Receiver runs on the same task manager
- return ConnectionInfoLookupResponse.createReceiverFoundAndReady(edge.getInputChannelID());
- } else {
- // Receiver runs on a different task manager
- final InstanceConnectionInfo ici = assignedInstance.getInstanceConnectionInfo();
- final InetSocketAddress isa = new InetSocketAddress(ici.getAddress(), ici.getDataPort());
+ // LOG.info("Created receiverNotReady for " + targetVertex + " in state " + executionState + " 3");
+ return ConnectionInfoLookupResponse.createReceiverNotReady();
+ }
- return ConnectionInfoLookupResponse.createReceiverFoundAndReady(new RemoteReceiver(isa, edge
- .getConnectionID()));
- }
+ final AbstractInstance assignedInstance = targetVertex.getAllocatedResource().getInstance();
+ if (assignedInstance == null) {
+ LOG.error("Cannot resolve lookup: vertex found for channel ID " + edge.getInputChannelID() + " but no instance assigned");
+ // LOG.info("Created receiverNotReady for " + targetVertex + " in state " + executionState + " 4");
+ return ConnectionInfoLookupResponse.createReceiverNotReady();
}
- // LOG.error("Receiver(s) not found");
+ if (assignedInstance.getInstanceConnectionInfo().equals(caller)) {
+ // Receiver runs on the same task manager
+ return ConnectionInfoLookupResponse.createReceiverFoundAndReady(edge.getInputChannelID());
+ } else {
+ // Receiver runs on a different task manager
+ final InstanceConnectionInfo ici = assignedInstance.getInstanceConnectionInfo();
+ final InetSocketAddress isa = new InetSocketAddress(ici.address(), ici.dataPort());
- // return ConnectionInfoLookupResponse.createReceiverNotFound();
+ return ConnectionInfoLookupResponse.createReceiverFoundAndReady(new RemoteReceiver(isa, edge.getConnectionID()));
+ }
}
/**
@@ -921,40 +892,6 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
return eventList;
}
-
- @Override
- public void killTask(final JobID jobID, final ManagementVertexID id) throws IOException {
-
- final ExecutionGraph eg = this.scheduler.getExecutionGraphByID(jobID);
- if (eg == null) {
- LOG.error("Cannot find execution graph for job " + jobID);
- return;
- }
-
- final ExecutionVertex vertex = eg.getVertexByID(ExecutionVertexID.fromManagementVertexID(id));
- if (vertex == null) {
- LOG.error("Cannot find execution vertex with ID " + id);
- return;
- }
-
- LOG.info("Killing task " + vertex + " of job " + jobID);
-
- final Runnable runnable = new Runnable() {
-
- @Override
- public void run() {
-
- final TaskKillResult result = vertex.killTask();
- if (result.getReturnCode() != AbstractTaskResult.ReturnCode.SUCCESS) {
- LOG.error(result.getDescription());
- }
- }
- };
-
- eg.executeCommand(runnable);
- }
-
-
@Override
public void killInstance(final StringRecord instanceName) throws IOException {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/scheduler/AbstractScheduler.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/scheduler/AbstractScheduler.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/scheduler/AbstractScheduler.java
index c9e02d3..24e2970 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/scheduler/AbstractScheduler.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/scheduler/AbstractScheduler.java
@@ -210,7 +210,7 @@ public abstract class AbstractScheduler implements InstanceListener {
case NETWORK:
deployTarget = false;
break;
- case INMEMORY:
+ case IN_MEMORY:
deployTarget = true;
break;
default:
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/scheduler/RecoveryLogic.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/scheduler/RecoveryLogic.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/scheduler/RecoveryLogic.java
index efbaf93..762b494 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/scheduler/RecoveryLogic.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/scheduler/RecoveryLogic.java
@@ -32,8 +32,7 @@ import eu.stratosphere.nephele.executiongraph.ExecutionVertex;
import eu.stratosphere.nephele.executiongraph.ExecutionVertexID;
import eu.stratosphere.nephele.instance.AbstractInstance;
import eu.stratosphere.nephele.instance.DummyInstance;
-import eu.stratosphere.nephele.io.channels.ChannelID;
-import eu.stratosphere.nephele.taskmanager.AbstractTaskResult.ReturnCode;
+import eu.stratosphere.runtime.io.channels.ChannelID;
import eu.stratosphere.nephele.taskmanager.TaskCancelResult;
import eu.stratosphere.nephele.util.SerializableHashSet;
import eu.stratosphere.util.StringUtils;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/managementgraph/ManagementEdge.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/managementgraph/ManagementEdge.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/managementgraph/ManagementEdge.java
index 8ecd99f..58486da 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/managementgraph/ManagementEdge.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/managementgraph/ManagementEdge.java
@@ -13,7 +13,7 @@
package eu.stratosphere.nephele.managementgraph;
-import eu.stratosphere.nephele.io.channels.ChannelType;
+import eu.stratosphere.runtime.io.channels.ChannelType;
/**
* This class implements a directed edge of a {@link ManagementGraph}. The edge is derived from a channel of the actual
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/managementgraph/ManagementEdgeID.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/managementgraph/ManagementEdgeID.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/managementgraph/ManagementEdgeID.java
index 20107a7..d263999 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/managementgraph/ManagementEdgeID.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/managementgraph/ManagementEdgeID.java
@@ -13,8 +13,8 @@
package eu.stratosphere.nephele.managementgraph;
-import eu.stratosphere.nephele.io.AbstractID;
-import eu.stratosphere.nephele.io.channels.ChannelID;
+import eu.stratosphere.nephele.AbstractID;
+import eu.stratosphere.runtime.io.channels.ChannelID;
/**
* A management edge ID uniquely identifies a {@link ManagementEdge}.
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/managementgraph/ManagementGateID.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/managementgraph/ManagementGateID.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/managementgraph/ManagementGateID.java
index 73548be..63aa6f7 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/managementgraph/ManagementGateID.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/managementgraph/ManagementGateID.java
@@ -13,7 +13,7 @@
package eu.stratosphere.nephele.managementgraph;
-import eu.stratosphere.nephele.io.AbstractID;
+import eu.stratosphere.nephele.AbstractID;
/**
* A management gate ID uniquely identifies a {@link ManagementGate}.
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/managementgraph/ManagementGraph.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/managementgraph/ManagementGraph.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/managementgraph/ManagementGraph.java
index f4a9855..374656b 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/managementgraph/ManagementGraph.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/managementgraph/ManagementGraph.java
@@ -28,7 +28,7 @@ import java.util.Map;
import eu.stratosphere.core.io.IOReadableWritable;
import eu.stratosphere.core.io.StringRecord;
-import eu.stratosphere.nephele.io.channels.ChannelType;
+import eu.stratosphere.runtime.io.channels.ChannelType;
import eu.stratosphere.nephele.jobgraph.JobID;
import eu.stratosphere.nephele.util.EnumUtils;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/managementgraph/ManagementGroupEdge.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/managementgraph/ManagementGroupEdge.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/managementgraph/ManagementGroupEdge.java
index 01a0903..7e6d554 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/managementgraph/ManagementGroupEdge.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/managementgraph/ManagementGroupEdge.java
@@ -13,7 +13,7 @@
package eu.stratosphere.nephele.managementgraph;
-import eu.stratosphere.nephele.io.channels.ChannelType;
+import eu.stratosphere.runtime.io.channels.ChannelType;
/**
* This class implements a directed edge of between two {@link ManagementGroupVertex} objects. The edge is derived from
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/managementgraph/ManagementGroupVertex.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/managementgraph/ManagementGroupVertex.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/managementgraph/ManagementGroupVertex.java
index bbf5fd2..b98a153 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/managementgraph/ManagementGroupVertex.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/managementgraph/ManagementGroupVertex.java
@@ -24,7 +24,7 @@ import java.util.Map;
import eu.stratosphere.core.io.IOReadableWritable;
import eu.stratosphere.nephele.execution.ExecutionState;
-import eu.stratosphere.nephele.io.channels.ChannelType;
+import eu.stratosphere.runtime.io.channels.ChannelType;
import eu.stratosphere.nephele.util.EnumUtils;
import eu.stratosphere.util.StringUtils;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/managementgraph/ManagementGroupVertexID.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/managementgraph/ManagementGroupVertexID.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/managementgraph/ManagementGroupVertexID.java
index b5c1055..aef64af 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/managementgraph/ManagementGroupVertexID.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/managementgraph/ManagementGroupVertexID.java
@@ -15,7 +15,7 @@ package eu.stratosphere.nephele.managementgraph;
import javax.xml.bind.DatatypeConverter;
-import eu.stratosphere.nephele.io.AbstractID;
+import eu.stratosphere.nephele.AbstractID;
/**
* A management group vertex ID uniquely identifies a {@link ManagementGroupVertex}.
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/managementgraph/ManagementVertexID.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/managementgraph/ManagementVertexID.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/managementgraph/ManagementVertexID.java
index 716ae7f..486b3fa 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/managementgraph/ManagementVertexID.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/managementgraph/ManagementVertexID.java
@@ -13,7 +13,7 @@
package eu.stratosphere.nephele.managementgraph;
-import eu.stratosphere.nephele.io.AbstractID;
+import eu.stratosphere.nephele.AbstractID;
/**
* A management vertex ID uniquely identifies a {@link ManagementVertex}.
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/multicast/MulticastCluster.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/multicast/MulticastCluster.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/multicast/MulticastCluster.java
deleted file mode 100644
index 87e0181..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/multicast/MulticastCluster.java
+++ /dev/null
@@ -1,210 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.multicast;
-
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedList;
-
-/**
- * This class represents a cluster of hosts within a multicast-tree.
- *
- */
-
-public class MulticastCluster {
- TreeNode master = null;
-
- HashSet<TreeNode> clusternodes = new HashSet<TreeNode>();
-
- public void addNode(TreeNode node) {
- this.clusternodes.add(node);
- }
-
- public int getSize() {
- return this.clusternodes.size();
- }
-
- public HashSet<TreeNode> getNodes() {
- return this.clusternodes;
- }
-
- /**
- * Returns the master-node of the current cluster.
- *
- * @return
- */
- public TreeNode getMaster() {
-
- if (this.master == null) {
-
- // TODO: topology-aware!!
- if (clusternodes.size() != 0) {
- this.master = clusternodes.iterator().next();
- } else {
- System.out.println("cluster is empty.");
- return null;
- }
-
- }
-
- return this.master;
- }
-
- /**
- * Splits the cluster into an arbitrary number of clusters not exceeding maxsize.
- *
- * @param maxsize
- * @return
- */
- public HashSet<MulticastCluster> splitCluster(int maxsize) {
- // TODO: topology-aware!
- HashSet<MulticastCluster> newClusters = new HashSet<MulticastCluster>();
-
- MulticastCluster actualcluster = new MulticastCluster();
-
- for (Iterator<TreeNode> i = this.clusternodes.iterator(); i.hasNext();) {
- if (actualcluster.getSize() < maxsize) {
- actualcluster.addNode(i.next());
- } else {
- // cluster is full.. add old cluster to list
- newClusters.add(actualcluster);
- // and create new cluster object
- actualcluster = new MulticastCluster();
- actualcluster.addNode(i.next());
- }
- }
-
- newClusters.add(actualcluster);
-
- return newClusters;
-
- }
-
- public static MulticastCluster createInitialCluster(Collection<TreeNode> nodes) {
- // TODO: topology-aware? in here?
-
- MulticastCluster cluster = new MulticastCluster();
-
- for (TreeNode n : nodes) {
- cluster.addNode(n);
- }
-
- return cluster;
-
- }
-
- public static MulticastForwardingTable createClusteredTree(LinkedList<TreeNode> nodes, int maxclustersize) {
-
- return null;
- /*
- // List to store all levels of the clustered multicast tree
- LinkedList<HashSet<MulticastCluster>> clusterlist = new LinkedList<HashSet<MulticastCluster>>();
-
- // Poll off the sending node first..
- TreeNode source = nodes.pollFirst();
-
- // Create an initital multicast cluster containing all receivers
- MulticastCluster initialcluster = createInitialCluster(nodes);
-
- // Create a first layer of clusters with arbitrary size by splitting the initital cluster
- HashSet<MulticastCluster> firstlaycluster = initialcluster.splitCluster(maxclustersize);
-
- // add to the list of cluster layers
- clusterlist.add(firstlaycluster);
-
- // we want the top layer to consist of max. maxclustersize clusters
- while (clusterlist.getFirst().size() > maxclustersize) {
-
- // creating a new cluster-layer...
- MulticastCluster nextlayercluster = new MulticastCluster();
-
- HashSet<MulticastCluster> lowerlayer = clusterlist.getFirst();
-
- // add all master nodes from current layer to next-layer cluster...
- for (MulticastCluster c : lowerlayer) {
- nextlayercluster.addNode(c.getMaster());
- }
-
- // if our next-layer cluster is still too big, we need to split it again
- HashSet<MulticastCluster> nextlayerclusters = nextlayercluster.splitCluster(maxclustersize);
-
- // and finally ad the new layer of clusters
- clusterlist.addFirst(nextlayerclusters);
-
- }
-
- // now we can create the tree...
-
- MulticastForwardingTable table = new MulticastForwardingTable();
-
- HashSet<MulticastCluster> initialclusterlevel = clusterlist.getFirst();
-
- ConnectionInfoLookupResponse sourceentry = ConnectionInfoLookupResponse.createReceiverFoundAndReady();
-
- // add all local targets
- for (ChannelID id : source.getLocalTargets()) {
- System.out.println("local target: " + id);
- sourceentry.addLocalTarget(id);
- }
-
- // connect source node with all master nodes in top-level clusters
- for (MulticastCluster c : initialclusterlevel) {
- sourceentry.addRemoteTarget(c.getMaster().getConnectionInfo());
- }
-
- table.addConnectionInfo(source.getConnectionInfo(), sourceentry);
- System.out.println("forwards for node: " + source.getConnectionInfo());
- System.out.println(sourceentry);
- // now we have connected the source node to the initial cluster layer. iterate through cluster layers and
- // connect
-
- while (clusterlist.size() > 0) {
- HashSet<MulticastCluster> actualclusterlevel = clusterlist.pollFirst();
-
- // add remote targets!
-
- for (MulticastCluster c : actualclusterlevel) {
- TreeNode master = c.getMaster();
- for (Iterator<TreeNode> i = c.getNodes().iterator(); i.hasNext();) {
- TreeNode actualnode = i.next();
- if (!actualnode.equals(master)) {
- // add remote target at master of current cluster
- master.addRemoteTarget(actualnode.getConnectionInfo());
- }
- }
- }
-
- }
-
- // now iterate through all nodes and create forwarding table...
- // we already have the entry for the source node..
- for (TreeNode n : nodes) {
- ConnectionInfoLookupResponse actualentry = ConnectionInfoLookupResponse.createReceiverFoundAndReady();
- for (ChannelID localTarget : n.getLocalTargets()) {
- actualentry.addLocalTarget(localTarget);
- }
- for (InstanceConnectionInfo remoteTarget : n.getRemoteTargets()) {
- actualentry.addRemoteTarget(remoteTarget);
- }
- table.addConnectionInfo(n.getConnectionInfo(), actualentry);
- System.out.println("forwards for node: " + n.getConnectionInfo());
- System.out.println(actualentry);
- }
-
- return table;
- */
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/multicast/MulticastForwardingTable.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/multicast/MulticastForwardingTable.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/multicast/MulticastForwardingTable.java
deleted file mode 100644
index d08821e..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/multicast/MulticastForwardingTable.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.multicast;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import eu.stratosphere.nephele.instance.InstanceConnectionInfo;
-import eu.stratosphere.nephele.taskmanager.bytebuffered.ConnectionInfoLookupResponse;
-
-/**
- * This class contains ConnectionInfoLookupResponse objects containing local, as well as remote receivers for all
- * instances within a certain job-specific multicast tree.
- *
- */
-public class MulticastForwardingTable {
-
- private final Map<InstanceConnectionInfo, ConnectionInfoLookupResponse> forwardingTable = new HashMap<InstanceConnectionInfo, ConnectionInfoLookupResponse>();
-
- /**
- * Returns the related ConnectionInfoLookupResponse for the calling Instance.
- *
- * @param caller
- * @return
- */
- public ConnectionInfoLookupResponse getConnectionInfo(InstanceConnectionInfo caller) {
- if (this.forwardingTable.containsKey(caller)) {
- return this.forwardingTable.get(caller);
- } else {
- return null;
- }
- }
-
- protected void addConnectionInfo(InstanceConnectionInfo caller, ConnectionInfoLookupResponse response) {
- this.forwardingTable.put(caller, response);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/multicast/MulticastManager.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/multicast/MulticastManager.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/multicast/MulticastManager.java
deleted file mode 100644
index 0a7b471..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/multicast/MulticastManager.java
+++ /dev/null
@@ -1,463 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.multicast;
-
-import java.io.BufferedReader;
-import java.io.DataInputStream;
-import java.io.FileInputStream;
-import java.io.InputStreamReader;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import eu.stratosphere.configuration.GlobalConfiguration;
-import eu.stratosphere.nephele.execution.ExecutionState;
-import eu.stratosphere.nephele.executiongraph.ExecutionEdge;
-import eu.stratosphere.nephele.executiongraph.ExecutionGate;
-import eu.stratosphere.nephele.executiongraph.ExecutionGraph;
-import eu.stratosphere.nephele.executiongraph.ExecutionVertex;
-import eu.stratosphere.nephele.instance.InstanceConnectionInfo;
-import eu.stratosphere.nephele.io.channels.ChannelID;
-import eu.stratosphere.nephele.jobgraph.JobID;
-import eu.stratosphere.nephele.jobmanager.JobManager;
-import eu.stratosphere.nephele.jobmanager.scheduler.AbstractScheduler;
-import eu.stratosphere.nephele.protocols.ChannelLookupProtocol;
-import eu.stratosphere.nephele.taskmanager.bytebuffered.ConnectionInfoLookupResponse;
-
-/**
- * The MulticastManager is responsible for the creation and storage of application-layer multicast trees used to
- * broadcast records to multiple target vertices.
- *
- */
-
-public final class MulticastManager implements ChannelLookupProtocol {
-
- /**
- * The log object used to report errors and warnings.
- */
- private static final Log LOG = LogFactory.getLog(JobManager.class);
-
- /**
- * Indicates if the arrangement of nodes within the overlay-tree should be randomized or not. If set to false,
- * arrangement of the same set of receiver nodes is guaranteed to be the same
- */
- private final boolean randomized;
-
- /**
- * Indicates if the tree should be constructed with a given topology stored in a file.
- */
- private final boolean useHardCodedTree;
-
- /**
- * File containing the hard-coded tree topology, if desired should contain node names (e.g. hostnames) with
- * corresponding children per line.
- * For example, a line "vm1.local vm2.local vm3.local" would result in vm1.local connecting to vm2.local and
- * vm3.local as children no further checking for connectivity of the given topology is done!
- */
- private final String hardCodedTreeFilePath;
-
- /**
- * Indicates the desired branching of the generated multicast-tree. 0 means unicast transmisison, 1 sequential tree,
- * 2 binomial tree, 3+ clustered tree
- */
- private final int treeBranching;
-
- /**
- * Reference to the scheduler.
- */
- private final AbstractScheduler scheduler;
-
- /**
- * Map caching already computed multicast forwarding tables.
- */
- private final Map<ChannelID, MulticastForwardingTable> cachedTrees = new HashMap<ChannelID, MulticastForwardingTable>();
-
- /**
- * Constructs a new multicast manager.
- *
- * @param scheduler
- * reference to the scheduler
- */
- public MulticastManager(final AbstractScheduler scheduler) {
-
- this.scheduler = scheduler;
-
- this.randomized = GlobalConfiguration.getBoolean("multicast.randomize", false);
- this.treeBranching = GlobalConfiguration.getInteger("multicast.branching", 1);
- this.useHardCodedTree = GlobalConfiguration.getBoolean("multicast.usehardcodedtree", false);
- this.hardCodedTreeFilePath = GlobalConfiguration.getString("multicast.hardcodedtreefile", null);
- }
-
- /**
- * Retrieves all recipients of a data for the given <code>sourceChannelID</code>. Returns both local recipients as
- * well as next-hop remote instances within the multicast-tree.
- *
- * @param caller
- * the {@link InstanceConnectionInfo} object of the task manager which calls this method
- * @param jobID
- * the ID of the job the channel ID belongs to
- * @param sourceChannelID
- * the ID of the channel to resolve
- * @return the lookup response containing the connection info and a return code
- */
- public synchronized ConnectionInfoLookupResponse lookupConnectionInfo(final InstanceConnectionInfo caller,
- final JobID jobID, final ChannelID sourceChannelID) {
-
- if (LOG.isInfoEnabled()) {
- LOG.info("Receiving multicast receiver request from " + caller + " channel ID: " + sourceChannelID);
- }
-
- // Check if the tree is already created and cached
- if (this.cachedTrees.containsKey(sourceChannelID)) {
-
- LOG.info("Replying with cached entry...");
- return cachedTrees.get(sourceChannelID).getConnectionInfo(caller);
-
- } else {
-
- // No tree exists, so we assume that this is the sending node initiating a multicast
-
- if (!checkIfAllTargetVerticesReady(caller, jobID, sourceChannelID)) {
- LOG.info("Received multicast request but not all receivers ready.");
-
- return ConnectionInfoLookupResponse.createReceiverNotReady();
- }
-
- // Receivers are up and running.. extract tree nodes...
- LinkedList<TreeNode> treeNodes = extractTreeNodes(caller, jobID, sourceChannelID, this.randomized);
-
- // Do we want to use a hard-coded tree topology?
- if (this.useHardCodedTree) {
- LOG.info("Creating a hard-coded tree topology from file: " + hardCodedTreeFilePath);
- cachedTrees.put(sourceChannelID, createHardCodedTree(treeNodes));
- return cachedTrees.get(sourceChannelID).getConnectionInfo(caller);
- }
-
- // Otherwise we create a default tree and put it into the tree-cache
- cachedTrees.put(sourceChannelID, createDefaultTree(treeNodes, this.treeBranching));
- return cachedTrees.get(sourceChannelID).getConnectionInfo(caller);
-
- }
-
- }
-
- /**
- * Returns and removes the TreeNode which is closest to the given indicator.
- *
- * @param indicator
- * @param nodes
- * @return
- */
- private TreeNode pollClosestNode(final TreeNode indicator, final LinkedList<TreeNode> nodes) {
-
- TreeNode closestnode = getClosestNode(indicator, nodes);
-
- nodes.remove(closestnode);
-
- return closestnode;
-
- }
-
- /**
- * Returns the TreeNode which is closest to the given indicator Node. Proximity is determined
- * either using topology-information (if given), penalty information (if given) or it returns
- * the first node in the list.
- *
- * @param indicator
- * @param nodes
- * @return
- */
- private TreeNode getClosestNode(final TreeNode indicator, final LinkedList<TreeNode> nodes) {
-
- if (indicator == null) {
- return nodes.getFirst();
- }
-
- TreeNode closestNode = null;
- for (TreeNode n : nodes) {
- if (closestNode == null || n.getDistance(indicator) < closestNode.getDistance(indicator)) {
- closestNode = n;
- }
- }
-
- return closestNode;
- }
-
- /**
- * This method creates a tree with an arbitrary fan out (two means binary tree).
- * If topology information or penalties are available, it considers that.
- * If fanout is set to 1, it creates a sequential tree.
- * if fanout is set to Integer.MAXVALUE, it creates a unicast tree.
- *
- * @param nodes
- * @param fanout
- * @return
- */
- private MulticastForwardingTable createDefaultTree(LinkedList<TreeNode> nodes, int fanout) {
-
- // Store nodes that already have a parent, but no children
- LinkedList<TreeNode> connectedNodes = new LinkedList<TreeNode>();
-
- final TreeNode rootnode = nodes.pollFirst();
- TreeNode actualnode = rootnode;
-
- while (nodes.size() > 0) { // We still have unconnected nodes...
-
- for (int i = 0; i < fanout; i++) {
-
- if (nodes.size() > 0) {
- // pick the closest one and attach to actualnode
- TreeNode child = pollClosestNode(actualnode, nodes);
- actualnode.addChild(child);
-
- // The child is now connected and can be used as forwarder in the next iteration..
- connectedNodes.add(child);
- } else {
- break;
- }
- }
-
- // OK.. take the next node to attach children to it..
- // TODO: Optimization? "pollBest()" ?
- actualnode = connectedNodes.pollFirst();
-
- }
- LOG.info("created multicast tree with following topology:\n" + rootnode.printTree());
-
- return rootnode.createForwardingTable();
-
- }
-
- /**
- * Reads a hard-coded tree topology from file and creates a tree according to the hard-coded
- * topology from the file.
- *
- * @param nodes
- * @return
- */
- private MulticastForwardingTable createHardCodedTree(LinkedList<TreeNode> nodes) {
- try {
- FileInputStream fstream = new FileInputStream(this.hardCodedTreeFilePath);
- DataInputStream in = new DataInputStream(fstream);
- BufferedReader br = new BufferedReader(new InputStreamReader(in));
- String strLine;
- while ((strLine = br.readLine()) != null) {
- String[] values = strLine.split(" ");
- String actualhostname = values[0];
- for (TreeNode n : nodes) {
- if (n.toString().equals(actualhostname)) {
- // we found the node.. connect the children
- for (int i = 1; i < values.length; i++) {
- for (TreeNode childnode : nodes) {
- if (childnode.toString().equals(values[i])) {
- n.addChild(childnode);
- }
- }
- }
- }
- }
- }
- br.close();
- // First node is root.. create tree. easy
- return nodes.getFirst().createForwardingTable();
-
- } catch (Exception e) {
- System.out.println("Error reading hard-coded topology file for multicast tree: " + e.getMessage());
- return null;
- }
- }
-
- /**
- * Checks, if all target vertices for multicast transmisison are ready. If vertices are in state ASSIGNED, it will
- * deploy those vertices.
- *
- * @param caller
- * @param jobID
- * @param sourceChannelID
- * @return
- */
- private boolean checkIfAllTargetVerticesReady(InstanceConnectionInfo caller, JobID jobID, ChannelID sourceChannelID) {
-
- final ExecutionGraph eg = this.scheduler.getExecutionGraphByID(jobID);
-
- final ExecutionEdge outputChannel = eg.getEdgeByID(sourceChannelID);
-
- final ExecutionGate broadcastGate = outputChannel.getOutputGate();
-
- List<ExecutionVertex> verticesToDeploy = null;
-
- // get all broadcast output channels
- final int numberOfOutputChannels = broadcastGate.getNumberOfEdges();
- for (int i = 0; i < numberOfOutputChannels; ++i) {
-
- final ExecutionEdge c = broadcastGate.getEdge(i);
-
- if (c.isBroadcast()) {
-
- final ExecutionVertex targetVertex = c.getInputGate().getVertex();
-
- if (targetVertex.getExecutionState() == ExecutionState.ASSIGNED) {
- if (verticesToDeploy == null) {
- verticesToDeploy = new ArrayList<ExecutionVertex>();
- }
- verticesToDeploy.add(targetVertex);
- } else {
-
- if (targetVertex.getExecutionState() != ExecutionState.RUNNING
- && targetVertex.getExecutionState() != ExecutionState.FINISHING) {
- return false;
- }
- }
- }
- }
-
- if (verticesToDeploy != null) {
- this.scheduler.deployAssignedVertices(verticesToDeploy);
- return false;
- }
-
- return true;
- }
-
- /**
- * Returns a list of (physical) Nodes (=hosts) within the multicast tree. Each node contains the local ChannelIDs,
- * records
- * must be forwarded to. The first node in the List is the only multicast sender.
- *
- * @param sourceChannelID
- * @return
- */
- private LinkedList<TreeNode> extractTreeNodes(final InstanceConnectionInfo source, final JobID jobID,
- final ChannelID sourceChannelID, final boolean randomize) {
-
- final ExecutionGraph eg = this.scheduler.getExecutionGraphByID(jobID);
-
- final ExecutionEdge outputChannel = eg.getEdgeByID(sourceChannelID);
-
- final ExecutionGate broadcastGate = outputChannel.getOutputGate();
-
- final LinkedList<ExecutionEdge> outputChannels = new LinkedList<ExecutionEdge>();
-
- // Get all broadcast output channels
- final int numberOfOutputChannels = broadcastGate.getNumberOfEdges();
- for (int i = 0; i < numberOfOutputChannels; ++i) {
- final ExecutionEdge c = broadcastGate.getEdge(i);
-
- if (c.isBroadcast()) {
- outputChannels.add(c);
- }
- }
-
- final LinkedList<TreeNode> treeNodes = new LinkedList<TreeNode>();
-
- LinkedList<ChannelID> actualLocalTargets = new LinkedList<ChannelID>();
-
- int firstConnectionID = 0;
- // search for local targets for the tree node
- for (Iterator<ExecutionEdge> iter = outputChannels.iterator(); iter.hasNext();) {
-
- final ExecutionEdge actualOutputChannel = iter.next();
-
- // the connection ID should not be needed for the root node (as it is not set as remote receiver)
- // but in order to maintain consistency, it also gets the connectionID of the first channel pointing to it
- firstConnectionID = actualOutputChannel.getConnectionID();
-
- final ExecutionVertex targetVertex = actualOutputChannel.getInputGate().getVertex();
-
- // is the target vertex running on the same instance?
- if (targetVertex.getAllocatedResource().getInstance().getInstanceConnectionInfo().equals(source)) {
-
- actualLocalTargets.add(actualOutputChannel.getInputChannelID());
- iter.remove();
- }
-
- }
-
- // create sender node (root) with source instance
- TreeNode actualNode = new TreeNode(eg.getVertexByChannelID(sourceChannelID).getAllocatedResource()
- .getInstance(), source, firstConnectionID, actualLocalTargets);
-
- treeNodes.add(actualNode);
-
- // now we have the root-node.. lets extract all other nodes
-
- LinkedList<TreeNode> receiverNodes = new LinkedList<TreeNode>();
-
- while (outputChannels.size() > 0) {
-
- final ExecutionEdge firstChannel = outputChannels.pollFirst();
-
- // each receiver nodes' endpoint is associated with the connection ID
- // of the first channel pointing to this node.
- final int connectionID = firstChannel.getConnectionID();
-
- final ExecutionVertex firstTarget = firstChannel.getInputGate().getVertex();
-
- final InstanceConnectionInfo actualInstance = firstTarget.getAllocatedResource().getInstance()
- .getInstanceConnectionInfo();
-
- actualLocalTargets = new LinkedList<ChannelID>();
-
- // add first local target
- actualLocalTargets.add(firstChannel.getInputChannelID());
-
- // now we iterate through the remaining channels to find other local targets...
- for (Iterator<ExecutionEdge> iter = outputChannels.iterator(); iter.hasNext();) {
-
- final ExecutionEdge actualOutputChannel = iter.next();
-
- final ExecutionVertex actualTarget = actualOutputChannel.getInputGate().getVertex();
-
- // is the target vertex running on the same instance?
- if (actualTarget.getAllocatedResource().getInstance().getInstanceConnectionInfo()
- .equals(actualInstance)) {
- actualLocalTargets.add(actualOutputChannel.getInputChannelID());
-
- iter.remove();
-
- }
-
- }// end for
-
- // create tree node for current instance
- actualNode = new TreeNode(firstTarget.getAllocatedResource().getInstance(), actualInstance, connectionID,
- actualLocalTargets);
-
- receiverNodes.add(actualNode);
-
- }// end while
-
- // Do we want to shuffle the receiver nodes?
- // Only randomize the receivers, as the sender (the first one) has to stay the same
- if (randomize) {
- Collections.shuffle(receiverNodes);
- } else {
- // Sort Tree Nodes according to host name..
- Collections.sort(receiverNodes);
- }
-
- treeNodes.addAll(receiverNodes);
-
- return treeNodes;
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/multicast/TopologyInformationSupplier.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/multicast/TopologyInformationSupplier.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/multicast/TopologyInformationSupplier.java
deleted file mode 100644
index 0e80ba2..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/multicast/TopologyInformationSupplier.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.multicast;
-
-public class TopologyInformationSupplier {
-
-
- /**
- * Returns a float representing the network distance between two nodes.
- * @param node1
- * @param node2
- * @return
- */
- public float getNetworkDistance(String node1, String node2){
- //TODO: Implement me
- return 0;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/multicast/TreeNode.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/multicast/TreeNode.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/multicast/TreeNode.java
deleted file mode 100644
index 12ee998..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/multicast/TreeNode.java
+++ /dev/null
@@ -1,246 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.multicast;
-
-import java.net.InetSocketAddress;
-import java.util.LinkedList;
-
-import eu.stratosphere.nephele.instance.AbstractInstance;
-import eu.stratosphere.nephele.instance.InstanceConnectionInfo;
-import eu.stratosphere.nephele.io.channels.ChannelID;
-import eu.stratosphere.nephele.taskmanager.bytebuffered.ConnectionInfoLookupResponse;
-import eu.stratosphere.nephele.taskmanager.bytebuffered.RemoteReceiver;
-
-/**
- * Each physical node (instance) within a multicast tree is represented by a TreeNode object.
- * It contains the connection info for the certain node and a list of the local output channels.
- *
- */
-
-public class TreeNode implements Comparable<TreeNode> {
-
- private TreeNode parentnode = null;
-
- private final AbstractInstance instance;
-
- private final InstanceConnectionInfo nodeConnectionInfo;
-
- private final int connectionID;
-
- private final LinkedList<ChannelID> localTargets;
-
- private final LinkedList<TreeNode> children = new LinkedList<TreeNode>();
-
- private final LinkedList<IntegerProperty> properties = new LinkedList<TreeNode.IntegerProperty>();
-
- private int penalty = 0;
-
- public TreeNode(AbstractInstance instance, InstanceConnectionInfo nodeConnectionInfo, int connectionID,
- LinkedList<ChannelID> localTargets) {
- this.instance = instance;
- this.connectionID = connectionID;
- this.nodeConnectionInfo = nodeConnectionInfo;
- this.localTargets = localTargets;
- }
-
- public void setProperty(String key, int value) {
- boolean exists = false;
- for (IntegerProperty property : this.properties) {
- if (property.getKey().equals(key)) {
- property.setValue(value);
- exists = true;
- break;
- }
- }
- if (!exists) {
- this.properties.add(new IntegerProperty(key, value));
- }
- }
-
- public int getProperty(String key) {
- for (IntegerProperty property : this.properties) {
- if (property.getKey().equals(key)) {
- return property.getValue();
- }
- }
- return -1;
- }
-
- public void removeChild(TreeNode child) {
- if (this.children.contains(child)) {
- child.setParent(null);
- this.children.remove(child);
- }
- }
-
- public void addChild(TreeNode child) {
- this.children.add(child);
- child.setParent(this);
- }
-
- public LinkedList<TreeNode> getChildren() {
- return this.children;
- }
-
- public TreeNode getParent() {
- return this.parentnode;
- }
-
- private InstanceConnectionInfo getConnectionInfo() {
- return this.nodeConnectionInfo;
- }
-
- private int getConnectionID() {
- return this.connectionID;
- }
-
- private void setParent(TreeNode parent) {
- this.parentnode = parent;
- }
-
- @Override
- public int compareTo(TreeNode o) {
- return this.nodeConnectionInfo.compareTo(o.nodeConnectionInfo);
- }
-
- @Override
- public boolean equals(Object o) {
- if (o instanceof TreeNode) {
- return this.nodeConnectionInfo.equals(((TreeNode) o).nodeConnectionInfo);
- } else {
- return false;
- }
- }
-
- public int getDistance(TreeNode o) {
- return this.instance.getDistance(o.instance);
- }
-
- public String toString() {
- return this.nodeConnectionInfo.toString();
- }
-
- public int getPenalty() {
- return this.penalty;
- }
-
- public void setPenalty(int penalty) {
- this.penalty = penalty;
- }
-
- /**
- * This method should be called on the root node (sender node).
- * It traverses the Tree and returns a full forwarding table
- * including all local and remote receivers.
- *
- * @return
- */
- public MulticastForwardingTable createForwardingTable() {
- MulticastForwardingTable table = new MulticastForwardingTable();
- this.generateRecursiveForwardingTable(table);
- return table;
- }
-
- /**
- * Private recursive method to generate forwarding table
- *
- * @param table
- */
- private void generateRecursiveForwardingTable(MulticastForwardingTable table) {
-
- final ConnectionInfoLookupResponse lookupResponse = ConnectionInfoLookupResponse.createReceiverFoundAndReady();
-
- // add local targets
- for (final ChannelID i : this.localTargets) {
- lookupResponse.addLocalTarget(i);
- }
-
- // add remote targets
- for (final TreeNode n : this.children) {
-
- // Instance Connection info associated with the remote target
- final InstanceConnectionInfo ici = n.getConnectionInfo();
-
- // get the connection ID associated with the remote target endpoint
- final int icid = n.getConnectionID();
-
- final InetSocketAddress isa = new InetSocketAddress(ici.getAddress(), ici.getDataPort());
-
- lookupResponse.addRemoteTarget(new RemoteReceiver(isa, icid));
- }
-
- table.addConnectionInfo(this.nodeConnectionInfo, lookupResponse);
-
- for (final TreeNode n : this.children) {
- n.generateRecursiveForwardingTable(table);
- }
- }
-
- /**
- * Prints the tree in a human readable format, starting with the actual node as root.
- *
- * @return
- */
- public String printTree() {
-
- StringBuilder sb = new StringBuilder();
- this.printRecursiveTree(sb);
- return sb.toString();
- }
-
- private void printRecursiveTree(StringBuilder sb) {
-
- if (this.children.size() > 0) {
- sb.append("STRUCT ");
-
- sb.append(this.nodeConnectionInfo);
-
- for (TreeNode n : this.children) {
- sb.append(" ");
- sb.append(n.getConnectionInfo().toString());
- }
-
- sb.append("\n");
-
- for (TreeNode n : this.children) {
- n.printRecursiveTree(sb);
- }
- }
- }
-
- private static class IntegerProperty {
-
- private String key = null;
-
- private int value = 0;
-
- public IntegerProperty(final String key, final int value) {
- this.key = key;
- this.value = value;
- }
-
- public String getKey() {
- return this.key;
- }
-
- public int getValue() {
- return this.value;
- }
-
- public void setValue(final int value) {
- this.value = value;
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/profiling/TaskManagerProfiler.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/profiling/TaskManagerProfiler.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/profiling/TaskManagerProfiler.java
index b1d6ff6..302192b 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/profiling/TaskManagerProfiler.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/profiling/TaskManagerProfiler.java
@@ -15,7 +15,7 @@ package eu.stratosphere.nephele.profiling;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.executiongraph.ExecutionVertexID;
-import eu.stratosphere.nephele.taskmanager.runtime.RuntimeTask;
+import eu.stratosphere.nephele.taskmanager.Task;
/**
* This interface must be implemented by profiling components
@@ -32,8 +32,7 @@ public interface TaskManagerProfiler {
* @param jobConfiguration
* the job configuration sent with the task
*/
- void registerExecutionListener(RuntimeTask task, Configuration jobConfiguration);
-
+ void registerExecutionListener(Task task, Configuration jobConfiguration);
/**
* Unregisters all previously register {@link ExecutionListener} objects for
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/profiling/impl/TaskManagerProfilerImpl.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/profiling/impl/TaskManagerProfilerImpl.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/profiling/impl/TaskManagerProfilerImpl.java
index 49fa8d1..58f0f38 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/profiling/impl/TaskManagerProfilerImpl.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/profiling/impl/TaskManagerProfilerImpl.java
@@ -13,20 +13,6 @@
package eu.stratosphere.nephele.profiling.impl;
-import java.io.IOException;
-import java.lang.management.ManagementFactory;
-import java.lang.management.ThreadMXBean;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Timer;
-import java.util.TimerTask;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.configuration.GlobalConfiguration;
import eu.stratosphere.nephele.execution.Environment;
@@ -40,8 +26,21 @@ import eu.stratosphere.nephele.profiling.TaskManagerProfiler;
import eu.stratosphere.nephele.profiling.impl.types.InternalExecutionVertexThreadProfilingData;
import eu.stratosphere.nephele.profiling.impl.types.InternalInstanceProfilingData;
import eu.stratosphere.nephele.profiling.impl.types.ProfilingDataContainer;
-import eu.stratosphere.nephele.taskmanager.runtime.RuntimeTask;
+import eu.stratosphere.nephele.taskmanager.Task;
import eu.stratosphere.util.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.lang.management.ThreadMXBean;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
public class TaskManagerProfilerImpl extends TimerTask implements TaskManagerProfiler {
@@ -99,7 +98,7 @@ public class TaskManagerProfilerImpl extends TimerTask implements TaskManagerPro
@Override
- public void registerExecutionListener(final RuntimeTask task, final Configuration jobConfiguration) {
+ public void registerExecutionListener(final Task task, final Configuration jobConfiguration) {
// Register profiling hook for the environment
task.registerExecutionListener(new EnvironmentListenerImpl(this, task.getRuntimeEnvironment()));
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/protocols/ChannelLookupProtocol.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/protocols/ChannelLookupProtocol.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/protocols/ChannelLookupProtocol.java
index 3f1d669..8fdaf55 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/protocols/ChannelLookupProtocol.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/protocols/ChannelLookupProtocol.java
@@ -17,9 +17,9 @@ import java.io.IOException;
import eu.stratosphere.core.protocols.VersionedProtocol;
import eu.stratosphere.nephele.instance.InstanceConnectionInfo;
-import eu.stratosphere.nephele.io.channels.ChannelID;
+import eu.stratosphere.runtime.io.channels.ChannelID;
import eu.stratosphere.nephele.jobgraph.JobID;
-import eu.stratosphere.nephele.taskmanager.bytebuffered.ConnectionInfoLookupResponse;
+import eu.stratosphere.runtime.io.network.ConnectionInfoLookupResponse;
/**
* The channel lookup protocol can be used to resolve the ID of an output channel to all recipients which shall receive
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/protocols/ExtendedManagementProtocol.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/protocols/ExtendedManagementProtocol.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/protocols/ExtendedManagementProtocol.java
index a2578a5..461c797 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/protocols/ExtendedManagementProtocol.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/protocols/ExtendedManagementProtocol.java
@@ -24,7 +24,6 @@ import eu.stratosphere.nephele.instance.InstanceType;
import eu.stratosphere.nephele.instance.InstanceTypeDescription;
import eu.stratosphere.nephele.jobgraph.JobID;
import eu.stratosphere.nephele.managementgraph.ManagementGraph;
-import eu.stratosphere.nephele.managementgraph.ManagementVertexID;
import eu.stratosphere.nephele.topology.NetworkTopology;
/**
@@ -82,18 +81,6 @@ public interface ExtendedManagementProtocol extends JobManagementProtocol {
List<AbstractEvent> getEvents(JobID jobID) throws IOException;
/**
- * Kills the task with the given vertex ID.
- *
- * @param jobID
- * the ID of the job the vertex to be killed belongs to
- * @param id
- * the vertex ID which identified the task be killed
- * @throws IOException
- * thrown if an error occurs while transmitting the kill request
- */
- void killTask(JobID jobID, ManagementVertexID id) throws IOException;
-
- /**
* Kills the instance with the given name (i.e. shuts down its task manager).
*
* @param instanceName
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/protocols/TaskOperationProtocol.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/protocols/TaskOperationProtocol.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/protocols/TaskOperationProtocol.java
index 63509ab..19522db 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/protocols/TaskOperationProtocol.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/protocols/TaskOperationProtocol.java
@@ -23,9 +23,8 @@ import eu.stratosphere.nephele.execution.librarycache.LibraryCacheProfileRequest
import eu.stratosphere.nephele.execution.librarycache.LibraryCacheProfileResponse;
import eu.stratosphere.nephele.execution.librarycache.LibraryCacheUpdate;
import eu.stratosphere.nephele.executiongraph.ExecutionVertexID;
-import eu.stratosphere.nephele.io.channels.ChannelID;
+import eu.stratosphere.runtime.io.channels.ChannelID;
import eu.stratosphere.nephele.taskmanager.TaskCancelResult;
-import eu.stratosphere.nephele.taskmanager.TaskKillResult;
import eu.stratosphere.nephele.taskmanager.TaskSubmissionResult;
/**
@@ -59,17 +58,6 @@ public interface TaskOperationProtocol extends VersionedProtocol {
TaskCancelResult cancelTask(ExecutionVertexID id) throws IOException;
/**
- * Advises the task manager to kill the task with the given ID.
- *
- * @param id
- * the ID of the task to kill
- * @return the result of the task kill attempt
- * @throws IOException
- * thrown if an error occurs during this remote procedure call
- */
- TaskKillResult killTask(ExecutionVertexID id) throws IOException;
-
- /**
* Queries the task manager about the cache status of the libraries stated in the {@link LibraryCacheProfileRequest}
* object.
*
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/services/iomanager/BlockChannelAccess.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/services/iomanager/BlockChannelAccess.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/services/iomanager/BlockChannelAccess.java
index be24922..8fdaa5d 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/services/iomanager/BlockChannelAccess.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/services/iomanager/BlockChannelAccess.java
@@ -134,9 +134,7 @@ public abstract class BlockChannelAccess<R extends IORequest, C extends Collecti
this.closeLock.wait(1000);
checkErroneous();
}
- catch (InterruptedException iex) {
- throw new IOException("Block channel access was interrupted while closing.");
- }
+ catch (InterruptedException iex) {}
}
}
finally {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/services/memorymanager/MemoryManager.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/services/memorymanager/MemoryManager.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/services/memorymanager/MemoryManager.java
index 7680843..a8fe096 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/services/memorymanager/MemoryManager.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/services/memorymanager/MemoryManager.java
@@ -56,8 +56,7 @@ public interface MemoryManager {
/**
* Releases all memory segments for the given task.
- *
- * @param <T> The type of memory segment.
+ *
* @param task The task whose memory segments are to be released.
*/
void releaseAll(AbstractInvokable task);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/services/memorymanager/spi/DefaultMemoryManager.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/services/memorymanager/spi/DefaultMemoryManager.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/services/memorymanager/spi/DefaultMemoryManager.java
index 5a74bab..8bc7b13 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/services/memorymanager/spi/DefaultMemoryManager.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/services/memorymanager/spi/DefaultMemoryManager.java
@@ -13,7 +13,6 @@
package eu.stratosphere.nephele.services.memorymanager.spi;
-
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
@@ -125,7 +124,6 @@ public class DefaultMemoryManager implements MemoryManager {
}
}
-
@Override
public void shutdown() {
// -------------------- BEGIN CRITICAL SECTION -------------------
@@ -150,7 +148,6 @@ public class DefaultMemoryManager implements MemoryManager {
}
// -------------------- END CRITICAL SECTION -------------------
}
-
public boolean verifyEmpty() {
synchronized (this.lock) {
@@ -161,7 +158,7 @@ public class DefaultMemoryManager implements MemoryManager {
// ------------------------------------------------------------------------
// MemoryManager interface implementation
// ------------------------------------------------------------------------
-
+
@Override
public List<MemorySegment> allocatePages(AbstractInvokable owner, int numPages) throws MemoryAllocationException {
final ArrayList<MemorySegment> segs = new ArrayList<MemorySegment>(numPages);
@@ -212,7 +209,6 @@ public class DefaultMemoryManager implements MemoryManager {
}
// ------------------------------------------------------------------------
-
@Override
public void release(MemorySegment segment) {
@@ -254,7 +250,6 @@ public class DefaultMemoryManager implements MemoryManager {
// -------------------- END CRITICAL SECTION -------------------
}
-
@Override
public <T extends MemorySegment> void release(Collection<T> segments) {
@@ -317,7 +312,6 @@ public class DefaultMemoryManager implements MemoryManager {
// -------------------- END CRITICAL SECTION -------------------
}
-
@Override
public void releaseAll(AbstractInvokable owner) {
// -------------------- BEGIN CRITICAL SECTION -------------------
@@ -326,28 +320,27 @@ public class DefaultMemoryManager implements MemoryManager {
if (this.isShutDown) {
throw new IllegalStateException("Memory manager has been shut down.");
}
-
+
// get all segments
final Set<DefaultMemorySegment> segments = this.allocatedSegments.remove(owner);
-
+
// all segments may have been freed previously individually
if (segments == null || segments.isEmpty()) {
return;
}
-
+
// free each segment
for (DefaultMemorySegment seg : segments) {
final byte[] buffer = seg.destroy();
this.freeSegments.add(buffer);
}
-
+
segments.clear();
}
// -------------------- END CRITICAL SECTION -------------------
}
// ------------------------------------------------------------------------
-
@Override
public int getPageSize() {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/ExecutorThreadFactory.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/ExecutorThreadFactory.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/ExecutorThreadFactory.java
new file mode 100644
index 0000000..bfad346
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/ExecutorThreadFactory.java
@@ -0,0 +1,35 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+package eu.stratosphere.nephele.taskmanager;
+
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class ExecutorThreadFactory implements ThreadFactory {
+
+ public static final ExecutorThreadFactory INSTANCE = new ExecutorThreadFactory();
+
+ private static final String THREAD_NAME = "Nephele Executor Thread ";
+
+ private final AtomicInteger threadNumber = new AtomicInteger(1);
+
+
+ private ExecutorThreadFactory() {}
+
+
+ public Thread newThread(Runnable target) {
+ Thread t = new Thread(target, THREAD_NAME + threadNumber.getAndIncrement());
+ t.setDaemon(true);
+ return t;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/Task.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/Task.java
index df9bbde..06eec0c 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/Task.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/Task.java
@@ -15,42 +15,92 @@ package eu.stratosphere.nephele.taskmanager;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.execution.Environment;
+import eu.stratosphere.nephele.execution.ExecutionListener;
+import eu.stratosphere.nephele.execution.ExecutionObserver;
import eu.stratosphere.nephele.execution.ExecutionState;
+import eu.stratosphere.nephele.execution.ExecutionStateTransition;
+import eu.stratosphere.nephele.execution.RuntimeEnvironment;
import eu.stratosphere.nephele.executiongraph.ExecutionVertexID;
import eu.stratosphere.nephele.jobgraph.JobID;
import eu.stratosphere.nephele.profiling.TaskManagerProfiler;
import eu.stratosphere.nephele.services.memorymanager.MemoryManager;
-import eu.stratosphere.nephele.taskmanager.bufferprovider.LocalBufferPoolOwner;
-import eu.stratosphere.nephele.taskmanager.bytebuffered.TaskContext;
-import eu.stratosphere.nephele.taskmanager.transferenvelope.TransferEnvelopeDispatcher;
+import eu.stratosphere.nephele.template.AbstractInvokable;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.util.Iterator;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+public final class Task implements ExecutionObserver {
+
+ /**
+ * The log object used for debugging.
+ */
+ private static final Log LOG = LogFactory.getLog(Task.class);
+
+ private final ExecutionVertexID vertexID;
+
+ private final RuntimeEnvironment environment;
+
+ private final TaskManager taskManager;
+
+ /**
+ * Stores whether the task has been canceled.
+ */
+ private volatile boolean isCanceled = false;
+
+ /**
+ * The current execution state of the task
+ */
+ private volatile ExecutionState executionState = ExecutionState.STARTING;
+
+ private Queue<ExecutionListener> registeredListeners = new ConcurrentLinkedQueue<ExecutionListener>();
+
+ public Task(final ExecutionVertexID vertexID, final RuntimeEnvironment environment,
+ final TaskManager taskManager) {
+
+ this.vertexID = vertexID;
+ this.environment = environment;
+ this.taskManager = taskManager;
+
+ this.environment.setExecutionObserver(this);
+ }
-public interface Task {
/**
* Returns the ID of the job this task belongs to.
*
* @return the ID of the job this task belongs to
*/
- JobID getJobID();
+ public JobID getJobID() {
+ return this.environment.getJobID();
+ }
/**
* Returns the ID of this task.
*
* @return the ID of this task
*/
- ExecutionVertexID getVertexID();
+ public ExecutionVertexID getVertexID() {
+ return this.vertexID;
+ }
/**
* Returns the environment associated with this task.
*
* @return the environment associated with this task
*/
- Environment getEnvironment();
+ public Environment getEnvironment() {
+ return this.environment;
+ }
/**
* Marks the task as failed and triggers the appropriate state changes.
*/
- void markAsFailed();
+ public void markAsFailed() {
+ executionStateChanged(ExecutionState.FAILED, "Execution thread died unexpectedly");
+ }
/**
* Checks if the state of the thread which is associated with this task is <code>TERMINATED</code>.
@@ -58,22 +108,72 @@ public interface Task {
* @return <code>true</code> if the state of this thread which is associated with this task is
* <code>TERMINATED</code>, <code>false</code> otherwise
*/
- boolean isTerminated();
+ public boolean isTerminated() {
+ final Thread executingThread = this.environment.getExecutingThread();
+ if (executingThread.getState() == Thread.State.TERMINATED) {
+ return true;
+ }
+
+ return false;
+ }
/**
* Starts the execution of this task.
*/
- void startExecution();
+ public void startExecution() {
+
+ final Thread thread = this.environment.getExecutingThread();
+ thread.start();
+ }
/**
* Cancels the execution of the task (i.e. interrupts the execution thread).
*/
- void cancelExecution();
+ public void cancelExecution() {
+ final Thread executingThread = this.environment.getExecutingThread();
+
+ if (executingThread == null) {
+ return;
+ }
+
+ LOG.info("Canceling " + this.environment.getTaskNameWithIndex());
+
+ this.isCanceled = true;
+ // Change state
+ executionStateChanged(ExecutionState.CANCELING, null);
+
+ // Request user code to shut down
+ try {
+ final AbstractInvokable invokable = this.environment.getInvokable();
+ if (invokable != null) {
+ invokable.cancel();
+ }
+ } catch (Throwable e) {
+ LOG.error("Error while canceling task", e);
+ }
+
+ // Continuously interrupt the user thread until it changed to state CANCELED
+ while (true) {
+ executingThread.interrupt();
+
+ if (!executingThread.isAlive()) {
+ break;
+ }
+
+ try {
+ executingThread.join(1000);
+ } catch (InterruptedException e) {}
+
+ if (!executingThread.isAlive()) {
+ break;
+ }
+
+ if (LOG.isDebugEnabled())
+ LOG.debug("Sending repeated canceling signal to " +
+ this.environment.getTaskName() + " with state " + this.executionState);
+ }
+ }
- /**
- * Kills the task (i.e. interrupts the execution thread).
- */
- void killExecution();
/**
* Registers the task manager profiler with the task.
@@ -83,7 +183,9 @@ public interface Task {
* @param jobConfiguration
* the configuration attached to the job
*/
- void registerProfiler(TaskManagerProfiler taskManagerProfiler, Configuration jobConfiguration);
+ public void registerProfiler(final TaskManagerProfiler taskManagerProfiler, final Configuration jobConfiguration) {
+ taskManagerProfiler.registerExecutionListener(this, jobConfiguration);
+ }
/**
* Unregisters the task from the central memory manager.
@@ -91,7 +193,11 @@ public interface Task {
* @param memoryManager
* the central memory manager
*/
- void unregisterMemoryManager(MemoryManager memoryManager);
+ public void unregisterMemoryManager(final MemoryManager memoryManager) {
+ if (memoryManager != null) {
+ memoryManager.releaseAll(this.environment.getInvokable());
+ }
+ }
/**
* Unregisters the task from the task manager profiler.
@@ -99,15 +205,124 @@ public interface Task {
* @param taskManagerProfiler
* the task manager profiler
*/
- void unregisterProfiler(TaskManagerProfiler taskManagerProfiler);
+ public void unregisterProfiler(final TaskManagerProfiler taskManagerProfiler) {
+ if (taskManagerProfiler != null) {
+ taskManagerProfiler.unregisterExecutionListener(this.vertexID);
+ }
+ }
/**
* Returns the current execution state of the task.
*
* @return the current execution state of the task
*/
- ExecutionState getExecutionState();
+ public ExecutionState getExecutionState() {
+ return this.executionState;
+ }
+
+ // -----------------------------------------------------------------------------------------------------------------
+ // ExecutionObserver methods
+ // -----------------------------------------------------------------------------------------------------------------
+ @Override
+ public void executionStateChanged(final ExecutionState newExecutionState, final String optionalMessage) {
+
+ // Check the state transition
+ ExecutionStateTransition.checkTransition(false, getTaskName(), this.executionState, newExecutionState);
+
+ // Make sure the reason for a transition to FAILED appears in the log files
+ if (newExecutionState == ExecutionState.FAILED) {
+ LOG.error(optionalMessage);
+ }
+
+ // Notify all listener objects
+ final Iterator<ExecutionListener> it = this.registeredListeners.iterator();
+ while (it.hasNext()) {
+ it.next().executionStateChanged(this.environment.getJobID(), this.vertexID, newExecutionState,
+ optionalMessage);
+ }
+
+ // Store the new execution state
+ this.executionState = newExecutionState;
+
+ // Finally propagate the state change to the job manager
+ this.taskManager.executionStateChanged(this.environment.getJobID(), this.vertexID, newExecutionState,
+ optionalMessage);
+ }
+
+ /**
+ * Returns the name of the task associated with this observer object.
+ *
+ * @return the name of the task associated with this observer object
+ */
+ private String getTaskName() {
+
+ return this.environment.getTaskName() + " (" + (this.environment.getIndexInSubtaskGroup() + 1) + "/"
+ + this.environment.getCurrentNumberOfSubtasks() + ")";
+ }
+
+
+ @Override
+ public void userThreadStarted(final Thread userThread) {
+
+ // Notify the listeners
+ final Iterator<ExecutionListener> it = this.registeredListeners.iterator();
+ while (it.hasNext()) {
+ it.next().userThreadStarted(this.environment.getJobID(), this.vertexID, userThread);
+ }
+ }
+
+
+ @Override
+ public void userThreadFinished(final Thread userThread) {
+
+ // Notify the listeners
+ final Iterator<ExecutionListener> it = this.registeredListeners.iterator();
+ while (it.hasNext()) {
+ it.next().userThreadFinished(this.environment.getJobID(), this.vertexID, userThread);
+ }
+ }
+
+ /**
+ * Registers the {@link ExecutionListener} object for this task. This object
+ * will be notified about important events during the task execution.
+ *
+ * @param executionListener
+ * the object to be notified for important events during the task execution
+ */
+
+ public void registerExecutionListener(final ExecutionListener executionListener) {
+
+ this.registeredListeners.add(executionListener);
+ }
+
+ /**
+ * Unregisters the {@link ExecutionListener} object for this environment. This object
+ * will no longer be notified about important events during the task execution.
+ *
+ * @param executionListener
+ * the lister object to be unregistered
+ */
+
+ public void unregisterExecutionListener(final ExecutionListener executionListener) {
+
+ this.registeredListeners.remove(executionListener);
+ }
+
+
+ @Override
+ public boolean isCanceled() {
+
+ return this.isCanceled;
+ }
+
+ /**
+ * Returns the runtime environment associated with this task.
+ *
+ * @return the runtime environment associated with this task
+ */
+ public RuntimeEnvironment getRuntimeEnvironment() {
+
+ return this.environment;
+ }
- TaskContext createTaskContext(TransferEnvelopeDispatcher transferEnvelopeDispatcher,
- LocalBufferPoolOwner previousBufferPoolOwner);
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskKillResult.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskKillResult.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskKillResult.java
deleted file mode 100644
index 3c0002c..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskKillResult.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.taskmanager;
-
-import eu.stratosphere.nephele.executiongraph.ExecutionVertexID;
-
-/**
- * A <code>TaskKillResult</code> is used to report the results
- * of a task kill attempt. It contains the ID of the task to be killed, a return code and
- * a description. In case of an error during the kill operation the description includes an error message.
- *
- */
-public class TaskKillResult extends AbstractTaskResult {
-
- /**
- * Constructs a new task kill result.
- *
- * @param vertexID
- * the task ID this result belongs to
- * @param returnCode
- * the return code of the kill
- */
- public TaskKillResult(final ExecutionVertexID vertexID, final ReturnCode returnCode) {
- super(vertexID, returnCode);
- }
-
- /**
- * Constructs an empty task kill result.
- */
- public TaskKillResult() {
- super();
- }
-}