You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2014/06/09 20:30:42 UTC
[07/30] git commit: Replace custom Java NIO TCP/IP code with Netty 4
library
Replace custom Java NIO TCP/IP code with Netty 4 library
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/4cd4a134
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/4cd4a134
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/4cd4a134
Branch: refs/heads/master
Commit: 4cd4a13415d609a2979c8fa3cf4b797c990ee8c2
Parents: 2db78a8
Author: uce <u....@fu-berlin.de>
Authored: Fri May 9 13:39:15 2014 +0200
Committer: StephanEwen <st...@tu-berlin.de>
Committed: Sat Jun 7 09:41:21 2014 +0200
----------------------------------------------------------------------
.../configuration/ConfigConstants.java | 58 +-
stratosphere-runtime/pom.xml | 8 +-
.../eu/stratosphere/nephele/AbstractID.java | 6 +
.../nephele/execution/CancelTaskException.java | 3 -
.../nephele/execution/Environment.java | 76 +-
.../execution/ExecutionStateTransition.java | 2 +
.../nephele/execution/RuntimeEnvironment.java | 170 ++--
.../nephele/executiongraph/ExecutionVertex.java | 36 +
.../nephele/instance/AbstractInstance.java | 16 +
.../eu/stratosphere/nephele/jobgraph/JobID.java | 11 +-
.../nephele/jobmanager/JobManager.java | 34 +
.../jobmanager/scheduler/RecoveryLogic.java | 5 +-
.../protocols/ExtendedManagementProtocol.java | 13 +
.../protocols/TaskOperationProtocol.java | 12 +
.../stratosphere/nephele/taskmanager/Task.java | 93 +-
.../nephele/taskmanager/TaskKillResult.java | 44 +
.../nephele/taskmanager/TaskManager.java | 32 +-
.../runtime/ExecutorThreadFactory.java | 35 +
.../task/AbstractIterativePactTask.java | 9 +-
.../iterative/task/IterationHeadPactTask.java | 3 +-
.../pact/runtime/shipping/OutputEmitter.java | 1 -
.../runtime/shipping/RecordOutputEmitter.java | 3 -
.../pact/runtime/task/DataSinkTask.java | 3 +-
.../pact/runtime/task/RegularPactTask.java | 8 +-
.../java/eu/stratosphere/runtime/io/Buffer.java | 5 +-
.../runtime/io/channels/Channel.java | 4 +-
.../runtime/io/channels/ChannelID.java | 10 +-
.../runtime/io/channels/ChannelType.java | 5 +
.../runtime/io/channels/InputChannel.java | 17 +-
.../runtime/io/channels/OutputChannel.java | 5 +-
.../runtime/io/gates/InputGate.java | 7 +-
.../runtime/io/gates/OutputGate.java | 1 -
.../runtime/io/network/ChannelManager.java | 50 +-
.../runtime/io/network/Envelope.java | 178 ++++
.../runtime/io/network/EnvelopeDispatcher.java | 46 +
.../io/network/EnvelopeReceiverList.java | 75 ++
.../io/network/NetworkConnectionManager.java | 176 ----
.../runtime/io/network/SenderHintEvent.java | 1 -
.../BufferAvailabilityListener.java | 17 +-
.../network/bufferprovider/BufferProvider.java | 18 +-
.../bufferprovider/DiscardBufferPool.java | 51 ++
.../network/bufferprovider/LocalBufferPool.java | 26 +-
.../bufferprovider/SerialSingleBufferPool.java | 77 --
.../runtime/io/network/envelope/Envelope.java | 169 ----
.../io/network/envelope/EnvelopeDispatcher.java | 46 -
.../io/network/envelope/EnvelopeReader.java | 212 -----
.../network/envelope/EnvelopeReceiverList.java | 75 --
.../io/network/envelope/EnvelopeWriter.java | 134 ---
.../envelope/NoBufferAvailableException.java | 53 --
.../network/netty/InboundEnvelopeDecoder.java | 344 ++++++++
.../netty/InboundEnvelopeDispatcherHandler.java | 41 +
.../network/netty/NettyConnectionManager.java | 251 ++++++
.../network/netty/OutboundConnectionQueue.java | 94 ++
.../network/netty/OutboundEnvelopeEncoder.java | 65 ++
.../io/network/tcp/IncomingConnection.java | 115 ---
.../network/tcp/IncomingConnectionThread.java | 226 -----
.../io/network/tcp/OutgoingConnection.java | 529 ------------
.../network/tcp/OutgoingConnectionThread.java | 276 ------
.../AdaptiveSpanningRecordDeserializer.java | 32 +-
.../io/serialization/DataInputDeserializer.java | 25 +-
.../io/serialization/DataOutputSerializer.java | 7 +-
.../serialization/SpanningRecordSerializer.java | 7 +-
.../nephele/util/TestBufferProvider.java | 2 +-
.../runtime/task/util/OutputEmitterTest.java | 1 -
.../task/util/RecordOutputEmitterTest.java | 2 -
.../pact/runtime/test/util/MockEnvironment.java | 11 +-
.../envelope/EnvelopeReaderWriterTest.java | 394 ---------
.../netty/InboundEnvelopeDecoderTest.java | 857 +++++++++++++++++++
.../netty/NettyConnectionManagerTest.java | 196 +++++
.../netty/OutboundEnvelopeEncoderTest.java | 97 +++
.../KMeansIterativeNepheleITCase.java | 3 +-
.../clients/examples/LocalExecutorITCase.java | 3 +-
.../exampleJavaPrograms/WordCountITCase.java | 5 +-
.../ConnectedComponentsNepheleITCase.java | 32 +-
.../IterationWithChainingNepheleITCase.java | 12 +-
.../WordCountUnionReduceITCase.java | 4 -
.../test/runtime/NetworkStackNepheleITCase.java | 286 +++++++
77 files changed, 3236 insertions(+), 2820 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cd4a134/stratosphere-core/src/main/java/eu/stratosphere/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/stratosphere-core/src/main/java/eu/stratosphere/configuration/ConfigConstants.java b/stratosphere-core/src/main/java/eu/stratosphere/configuration/ConfigConstants.java
index 3b9ba3d..96c8965 100644
--- a/stratosphere-core/src/main/java/eu/stratosphere/configuration/ConfigConstants.java
+++ b/stratosphere-core/src/main/java/eu/stratosphere/configuration/ConfigConstants.java
@@ -34,7 +34,6 @@ public final class ConfigConstants {
* The config parameter defining the maximal intra-node parallelism for jobs.
*/
public static final String PARALLELIZATION_MAX_INTRA_NODE_DEGREE_KEY = "parallelization.intra-node.default";
-
// -------------------------------- Runtime -------------------------------
@@ -98,6 +97,26 @@ public final class ConfigConstants {
* The config parameter defining the size of the buffers used in the network stack.
*/
public static final String TASK_MANAGER_NETWORK_BUFFER_SIZE_KEY = "taskmanager.network.bufferSizeInBytes";
+
+ /**
+ * The number of incoming connection threads used in NettyConnectionManager for the ServerBootstrap.
+ */
+ public static final String TASK_MANAGER_NETTY_NUM_IN_THREADS_KEY = "taskmanager.netty.numInThreads";
+
+ /**
+ * The number of outgoing connection threads used in NettyConnectionManager for the Bootstrap.
+ */
+ public static final String TASK_MANAGER_NETTY_NUM_OUT_THREADS_KEY = "taskmanager.netty.numOutThreads";
+
+ /**
+ * The low water mark used in NettyConnectionManager for the Bootstrap.
+ */
+ public static final String TASK_MANAGER_NETTY_LOW_WATER_MARK = "taskmanager.netty.lowWaterMark";
+
+ /**
+ * The high water mark used in NettyConnectionManager for the Bootstrap.
+ */
+ public static final String TASK_MANAGER_NETTY_HIGH_WATER_MARK = "taskmanager.netty.highWaterMark";
/**
* Parameter for the interval in which the RaskManager sends the periodic heart beat messages
@@ -134,10 +153,9 @@ public final class ConfigConstants {
* The parameter defining the polling interval (in seconds) for the JobClient.
*/
public static final String JOBCLIENT_POLLING_INTERVAL_KEY = "jobclient.polling.interval";
-
-
+
// ------------------------ Hadoop Configuration ------------------------
-
+
/**
* Path to hdfs-defaul.xml file
*/
@@ -153,7 +171,6 @@ public final class ConfigConstants {
*/
public static final String PATH_HADOOP_CONFIG = "fs.hdfs.hadoopconf";
-
// ------------------------ File System Bahavior ------------------------
/**
@@ -251,11 +268,7 @@ public final class ConfigConstants {
public static final String STRATOSPHERE_BASE_DIR_PATH_KEY = "stratosphere.base.dir.path";
public static final String STRATOSPHERE_JVM_OPTIONS = "env.java.opts";
-
-
-
-
// ------------------------------------------------------------------------
// Default Values
// ------------------------------------------------------------------------
@@ -318,7 +331,31 @@ public final class ConfigConstants {
* Default size of network stack buffers.
*/
public static final int DEFAULT_TASK_MANAGER_NETWORK_BUFFER_SIZE = 32768;
-
+
+ /**
+ * Default number of incoming connection threads used in NettyConnectionManager for the ServerBootstrap. If set
+ * to -1, NettyConnectionManager will pick a reasonable default depending on the number of cores of the machine.
+ */
+ public static final int DEFAULT_TASK_MANAGER_NETTY_NUM_IN_THREADS = -1;
+
+ /**
+ * Default number of outgoing connection threads used in NettyConnectionManager for the Bootstrap. If set
+ * to -1, NettyConnectionManager will pick a reasonable default depending on the number of cores of the machine.
+ */
+ public static final int DEFAULT_TASK_MANAGER_NETTY_NUM_OUT_THREADS = -1;
+
+ /**
+ * Default low water mark used in NettyConnectionManager for the Bootstrap. If set to -1, NettyConnectionManager
+ * will use half of the network buffer size as the low water mark.
+ */
+ public static final int DEFAULT_TASK_MANAGER_NETTY_LOW_WATER_MARK = -1;
+
+ /**
+ * Default high water mark used in NettyConnectionManager for the Bootstrap. If set to -1, NettyConnectionManager
+ * will use the network buffer size as the high water mark.
+ */
+ public static final int DEFAULT_TASK_MANAGER_NETTY_HIGH_WATER_MARK = -1;
+
/**
* The default interval for TaskManager heart beats (2000 msecs).
*/
@@ -452,7 +489,6 @@ public final class ConfigConstants {
*/
public static final int DEFAULT_DEFAULT_INSTANCE_TYPE_INDEX = 1;
-
// ------------------------------------------------------------------------
/**
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cd4a134/stratosphere-runtime/pom.xml
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/pom.xml b/stratosphere-runtime/pom.xml
index bb5ca2a..074725b 100644
--- a/stratosphere-runtime/pom.xml
+++ b/stratosphere-runtime/pom.xml
@@ -52,7 +52,13 @@
<artifactId>aws-java-sdk</artifactId>
<version>1.2.1</version>
</dependency>
-
+
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-all</artifactId>
+ <version>4.0.19.Final</version>
+ </dependency>
+
<dependency>
<groupId>eu.stratosphere</groupId>
<artifactId>stratosphere-java</artifactId>
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cd4a134/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/AbstractID.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/AbstractID.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/AbstractID.java
index 476e22a..648c8dc 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/AbstractID.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/AbstractID.java
@@ -20,6 +20,7 @@ import java.nio.ByteBuffer;
import eu.stratosphere.core.io.IOReadableWritable;
import eu.stratosphere.util.StringUtils;
+import io.netty.buffer.ByteBuf;
/**
* A statistically unique identification number.
@@ -167,6 +168,11 @@ public class AbstractID implements IOReadableWritable {
buffer.putLong(this.upperPart);
}
+ public void writeTo(ByteBuf buf) {
+ buf.writeLong(this.lowerPart);
+ buf.writeLong(this.upperPart);
+ }
+
@Override
public String toString() {
final byte[] ba = new byte[SIZE];
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cd4a134/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/CancelTaskException.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/CancelTaskException.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/CancelTaskException.java
index 6a09e89..aa196da 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/CancelTaskException.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/CancelTaskException.java
@@ -14,9 +14,6 @@
**********************************************************************************************************************/
package eu.stratosphere.nephele.execution;
-
-import eu.stratosphere.pact.runtime.task.chaining.ExceptionInChainedStubException;
-
/**
* Thrown to trigger a canceling of the executing task. Intended to cause a cancelled status, rather than a failed status.
*/
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cd4a134/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/Environment.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/Environment.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/Environment.java
index e0bcc70..05a420c 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/Environment.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/Environment.java
@@ -16,17 +16,21 @@ package eu.stratosphere.nephele.execution;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.core.fs.Path;
import eu.stratosphere.core.io.IOReadableWritable;
-import eu.stratosphere.runtime.io.channels.ChannelID;
-import eu.stratosphere.runtime.io.gates.GateID;
-import eu.stratosphere.runtime.io.gates.InputGate;
-import eu.stratosphere.runtime.io.gates.OutputGate;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.nephele.jobgraph.JobID;
import eu.stratosphere.nephele.protocols.AccumulatorProtocol;
import eu.stratosphere.nephele.services.iomanager.IOManager;
import eu.stratosphere.nephele.services.memorymanager.MemoryManager;
-import eu.stratosphere.runtime.io.network.bufferprovider.BufferProvider;
import eu.stratosphere.nephele.template.InputSplitProvider;
+import eu.stratosphere.runtime.io.channels.ChannelID;
+import eu.stratosphere.runtime.io.gates.GateID;
+import eu.stratosphere.runtime.io.gates.InputGate;
+import eu.stratosphere.runtime.io.gates.OutputGate;
+import eu.stratosphere.runtime.io.network.bufferprovider.BufferProvider;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.FutureTask;
/**
* The user code of every Nephele task runs inside an <code>Environment</code> object. The environment provides
@@ -38,114 +42,112 @@ public interface Environment {
* Returns the ID of the job from the original job graph. It is used by the library cache manager to find the
* required
* libraries for executing the assigned Nephele task.
- *
+ *
* @return the ID of the job from the original job graph
*/
JobID getJobID();
/**
* Returns the task configuration object which was attached to the original JobVertex.
- *
+ *
* @return the task configuration object which was attached to the original JobVertex.
*/
Configuration getTaskConfiguration();
/**
* Returns the job configuration object which was attached to the original {@link JobGraph}.
- *
+ *
* @return the job configuration object which was attached to the original {@link JobGraph}
*/
Configuration getJobConfiguration();
/**
* Returns the current number of subtasks the respective task is split into.
- *
+ *
* @return the current number of subtasks the respective task is split into
*/
int getCurrentNumberOfSubtasks();
/**
* Returns the index of this subtask in the subtask group.
- *
+ *
* @return the index of this subtask in the subtask group
*/
int getIndexInSubtaskGroup();
/**
* Sends a notification that objects that a new user thread has been started to the execution observer.
- *
- * @param userThread
- * the user thread which has been started
+ *
+ * @param userThread the user thread which has been started
*/
void userThreadStarted(Thread userThread);
/**
* Sends a notification that a user thread has finished to the execution observer.
- *
- * @param userThread
- * the user thread which has finished
+ *
+ * @param userThread the user thread which has finished
*/
void userThreadFinished(Thread userThread);
/**
* Returns the input split provider assigned to this environment.
- *
+ *
* @return the input split provider or <code>null</code> if no such provider has been assigned to this environment.
*/
InputSplitProvider getInputSplitProvider();
/**
* Returns the current {@link IOManager}.
- *
+ *
* @return the current {@link IOManager}.
*/
IOManager getIOManager();
/**
* Returns the current {@link MemoryManager}.
- *
+ *
* @return the current {@link MemoryManager}.
*/
MemoryManager getMemoryManager();
/**
* Returns the name of the task running in this environment.
- *
+ *
* @return the name of the task running in this environment
*/
String getTaskName();
/**
* Returns the next unbound input gate ID or <code>null</code> if no such ID exists
- *
+ *
* @return the next unbound input gate ID or <code>null</code> if no such ID exists
*/
GateID getNextUnboundInputGateID();
/**
* Returns the number of output gates registered with this environment.
- *
+ *
* @return the number of output gates registered with this environment
*/
int getNumberOfOutputGates();
/**
* Returns the number of input gates registered with this environment.
- *
+ *
* @return the number of input gates registered with this environment
*/
int getNumberOfInputGates();
/**
* Returns the number of output channels attached to this environment.
- *
+ *
* @return the number of output channels attached to this environment
*/
int getNumberOfOutputChannels();
/**
* Returns the number of input channels attached to this environment.
- *
+ *
* @return the number of input channels attached to this environment
*/
int getNumberOfInputChannels();
@@ -164,50 +166,48 @@ public interface Environment {
/**
* Returns the IDs of all output channels connected to this environment.
- *
+ *
* @return the IDs of all output channels connected to this environment
*/
Set<ChannelID> getOutputChannelIDs();
/**
* Returns the IDs of all input channels connected to this environment.
- *
+ *
* @return the IDs of all input channels connected to this environment
*/
Set<ChannelID> getInputChannelIDs();
/**
* Returns the IDs of all output gates connected to this environment.
- *
+ *
* @return the IDs of all output gates connected to this environment
*/
Set<GateID> getOutputGateIDs();
/**
* Returns the IDs of all input gates connected to this environment.
- *
+ *
* @return the IDs of all input gates connected to this environment
*/
Set<GateID> getInputGateIDs();
/**
* Returns the IDs of all the output channels connected to the gate with the given ID.
- *
- * @param gateID
- * the gate ID
+ *
+ * @param gateID the gate ID
* @return the IDs of all the output channels connected to the gate with the given ID
*/
Set<ChannelID> getOutputChannelIDsOfGate(GateID gateID);
/**
* Returns the IDs of all the input channels connected to the gate with the given ID.
- *
- * @param gateID
- * the gate ID
+ *
+ * @param gateID the gate ID
* @return the IDs of all the input channels connected to the gate with the given ID
*/
Set<ChannelID> getInputChannelIDsOfGate(GateID gateID);
-
+
/**
* Returns the proxy object for the accumulator protocol.
*/
@@ -215,11 +215,13 @@ public interface Environment {
/**
* Returns the buffer provider for this environment.
- * <p>
+ * <p/>
* The returned buffer provider is used by the output side of the network stack.
*
* @return Buffer provider for the output side of the network stack
* @see eu.stratosphere.runtime.io.api.RecordWriter
*/
BufferProvider getOutputBufferProvider();
+
+ Map<String, FutureTask<Path>> getCopyTask();
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cd4a134/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/ExecutionStateTransition.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/ExecutionStateTransition.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/ExecutionStateTransition.java
index 55f036a..87674c8 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/ExecutionStateTransition.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/ExecutionStateTransition.java
@@ -14,6 +14,8 @@
package eu.stratosphere.nephele.execution;
import static eu.stratosphere.nephele.execution.ExecutionState.FAILED;
+import static eu.stratosphere.nephele.execution.ExecutionState.CANCELED;
+import static eu.stratosphere.nephele.execution.ExecutionState.CANCELING;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cd4a134/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/RuntimeEnvironment.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/RuntimeEnvironment.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/RuntimeEnvironment.java
index 59787d2..29d6853 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/RuntimeEnvironment.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/RuntimeEnvironment.java
@@ -13,30 +13,11 @@
package eu.stratosphere.nephele.execution;
-import java.io.IOException;
-import java.util.ArrayDeque;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
-import java.util.Set;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.FutureTask;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.core.fs.Path;
import eu.stratosphere.core.io.IOReadableWritable;
-import eu.stratosphere.nephele.deployment.ChannelDeploymentDescriptor;
-import eu.stratosphere.nephele.deployment.GateDeploymentDescriptor;
import eu.stratosphere.nephele.deployment.TaskDeploymentDescriptor;
import eu.stratosphere.nephele.execution.librarycache.LibraryCacheManager;
-import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.nephele.jobgraph.JobID;
import eu.stratosphere.nephele.protocols.AccumulatorProtocol;
import eu.stratosphere.nephele.services.iomanager.IOManager;
@@ -45,7 +26,6 @@ import eu.stratosphere.nephele.template.AbstractInvokable;
import eu.stratosphere.nephele.template.InputSplitProvider;
import eu.stratosphere.runtime.io.Buffer;
import eu.stratosphere.runtime.io.channels.ChannelID;
-import eu.stratosphere.runtime.io.channels.ChannelType;
import eu.stratosphere.runtime.io.channels.OutputChannel;
import eu.stratosphere.runtime.io.gates.GateID;
import eu.stratosphere.runtime.io.gates.InputGate;
@@ -56,12 +36,27 @@ import eu.stratosphere.runtime.io.network.bufferprovider.GlobalBufferPool;
import eu.stratosphere.runtime.io.network.bufferprovider.LocalBufferPool;
import eu.stratosphere.runtime.io.network.bufferprovider.LocalBufferPoolOwner;
import eu.stratosphere.util.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.FutureTask;
/**
* The user code of every Nephele task runs inside a <code>RuntimeEnvironment</code> object. The environment provides
* important services to the task. It keeps track of setting up the communication channels and provides access to input
* splits, memory manager, etc.
- * <p>
+ * <p/>
* This class is thread-safe.
*/
public class RuntimeEnvironment implements Environment, BufferProvider, LocalBufferPoolOwner, Runnable {
@@ -141,7 +136,7 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
* The observer object for the task's execution.
*/
private volatile ExecutionObserver executionObserver = null;
-
+
/**
* The RPC proxy to report accumulators to JobManager
*/
@@ -164,26 +159,22 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
private LocalBufferPool outputBufferPool;
+ private Map<String,FutureTask<Path>> cacheCopyTasks = new HashMap<String, FutureTask<Path>>();
+
/**
* Creates a new runtime environment object which contains the runtime information for the encapsulated Nephele
* task.
- *
- * @param jobID
- * the ID of the original Nephele job
- * @param taskName
- * the name of task running in this environment
- * @param invokableClass
- * invokableClass the class that should be instantiated as a Nephele task
- * @param taskConfiguration
- * the configuration object which was attached to the original JobVertex
- * @param jobConfiguration
- * the configuration object which was attached to the original {@link JobGraph}
- * @throws Exception
- * thrown if an error occurs while instantiating the invokable class
+ *
+ * @param jobID the ID of the original Nephele job
+ * @param taskName the name of task running in this environment
+ * @param invokableClass invokableClass the class that should be instantiated as a Nephele task
+ * @param taskConfiguration the configuration object which was attached to the original JobVertex
+ * @param jobConfiguration the configuration object which was attached to the original JobGraph
+ * @throws Exception thrown if an error occurs while instantiating the invokable class
*/
public RuntimeEnvironment(final JobID jobID, final String taskName,
- final Class<? extends AbstractInvokable> invokableClass, final Configuration taskConfiguration,
- final Configuration jobConfiguration) throws Exception {
+ final Class<? extends AbstractInvokable> invokableClass, final Configuration taskConfiguration,
+ final Configuration jobConfiguration) throws Exception {
this.jobID = jobID;
this.taskName = taskName;
@@ -203,23 +194,18 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
/**
* Constructs a runtime environment from a task deployment description.
- *
- * @param tdd
- * the task deployment description
- * @param memoryManager
- * the task manager's memory manager component
- * @param ioManager
- * the task manager's I/O manager component
- * @param inputSplitProvider
- * the input split provider for this environment
- * @throws Exception
- * thrown if an error occurs while instantiating the invokable class
- */
- @SuppressWarnings({ "unchecked", "rawtypes" })
+ *
+ * @param tdd the task deployment description
+ * @param memoryManager the task manager's memory manager component
+ * @param ioManager the task manager's I/O manager component
+ * @param inputSplitProvider the input split provider for this environment
+ * @throws Exception thrown if an error occurs while instantiating the invokable class
+ */
+ @SuppressWarnings({"unchecked", "rawtypes"})
public RuntimeEnvironment(final TaskDeploymentDescriptor tdd,
- final MemoryManager memoryManager, final IOManager ioManager,
- final InputSplitProvider inputSplitProvider,
- AccumulatorProtocol accumulatorProtocolProxy, Map<String, FutureTask<Path>> cpTasks) throws Exception {
+ final MemoryManager memoryManager, final IOManager ioManager,
+ final InputSplitProvider inputSplitProvider,
+ AccumulatorProtocol accumulatorProtocolProxy, Map<String, FutureTask<Path>> cpTasks) throws Exception {
this.jobID = tdd.getJobID();
this.taskName = tdd.getTaskName();
@@ -246,14 +232,14 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
int numInputGates = tdd.getNumberOfInputGateDescriptors();
- for(int i = 0; i < numInputGates; i++){
+ for (int i = 0; i < numInputGates; i++) {
this.inputGates.get(i).initializeChannels(tdd.getInputGateDescriptor(i));
}
}
/**
* Returns the invokable object that represents the Nephele task.
- *
+ *
* @return the invokable object that represents the Nephele task
*/
public AbstractInvokable getInvokable() {
@@ -272,7 +258,7 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
@Override
public OutputGate createAndRegisterOutputGate() {
- OutputGate gate = new OutputGate(getJobID(), new GateID(), getNumberOfOutputGates());
+ OutputGate gate = new OutputGate(getJobID(), new GateID(), getNumberOfOutputGates());
this.outputGates.add(gate);
return gate;
@@ -318,7 +304,8 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
if (this.executionObserver.isCanceled() || t instanceof CancelTaskException) {
changeExecutionState(ExecutionState.CANCELED, null);
- } else {
+ }
+ else {
changeExecutionState(ExecutionState.FAILED, StringUtils.stringifyException(t));
}
@@ -347,7 +334,8 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
if (this.executionObserver.isCanceled() || t instanceof CancelTaskException) {
changeExecutionState(ExecutionState.CANCELED, null);
- } else {
+ }
+ else {
changeExecutionState(ExecutionState.FAILED, StringUtils.stringifyException(t));
}
@@ -400,9 +388,8 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
/**
* Returns the registered input gate with index <code>pos</code>.
- *
- * @param pos
- * the index of the input gate to return
+ *
+ * @param pos the index of the input gate to return
* @return the input gate at index <code>pos</code> or <code>null</code> if no such index exists
*/
public InputGate<? extends IOReadableWritable> getInputGate(final int pos) {
@@ -415,9 +402,8 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
/**
* Returns the registered output gate with index <code>pos</code>.
- *
- * @param index
- * the index of the output gate to return
+ *
+ * @param index the index of the output gate to return
* @return the output gate at index <code>pos</code> or <code>null</code> if no such index exists
*/
public OutputGate getOutputGate(int index) {
@@ -430,7 +416,7 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
/**
* Returns the thread which is assigned to execute the user code.
- *
+ *
* @return the thread which is assigned to execute the user code
*/
public Thread getExecutingThread() {
@@ -439,7 +425,8 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
if (this.executingThread == null) {
if (this.taskName == null) {
this.executingThread = new Thread(this);
- } else {
+ }
+ else {
this.executingThread = new Thread(this, getTaskNameWithIndex());
}
}
@@ -450,11 +437,9 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
/**
* Blocks until all output channels are closed.
- *
- * @throws IOException
- * thrown if an error occurred while closing the output channels
- * @throws InterruptedException
- * thrown if the thread waiting for the channels to be closed is interrupted
+ *
+ * @throws IOException thrown if an error occurred while closing the output channels
+ * @throws InterruptedException thrown if the thread waiting for the channels to be closed is interrupted
*/
private void waitForOutputChannelsToBeClosed() throws InterruptedException {
// Make sure, we leave this method with an InterruptedException when the task has been canceled
@@ -469,11 +454,9 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
/**
* Blocks until all input channels are closed.
- *
- * @throws IOException
- * thrown if an error occurred while closing the input channels
- * @throws InterruptedException
- * thrown if the thread waiting for the channels to be closed is interrupted
+ *
+ * @throws IOException thrown if an error occurred while closing the input channels
+ * @throws InterruptedException thrown if the thread waiting for the channels to be closed is interrupted
*/
private void waitForInputChannelsToBeClosed() throws IOException, InterruptedException {
// Wait for disconnection of all output gates
@@ -494,7 +477,8 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
if (allClosed) {
break;
- } else {
+ }
+ else {
Thread.sleep(SLEEPINTERVAL);
}
}
@@ -564,7 +548,7 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
/**
* Returns the name of the task with its index in the subtask group and the total number of subtasks.
- *
+ *
* @return the name of the task with its index in the subtask group and the total number of subtasks
*/
public String getTaskNameWithIndex() {
@@ -573,9 +557,8 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
/**
* Sets the execution observer for this environment.
- *
- * @param executionObserver
- * the execution observer for this environment
+ *
+ * @param executionObserver the execution observer for this environment
*/
public void setExecutionObserver(final ExecutionObserver executionObserver) {
this.executionObserver = executionObserver;
@@ -616,7 +599,7 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
@Override
public Set<ChannelID> getOutputChannelIDs() {
- Set<ChannelID> ids= new HashSet<ChannelID>();
+ Set<ChannelID> ids = new HashSet<ChannelID>();
for (OutputGate gate : this.outputGates) {
for (OutputChannel channel : gate.channels()) {
@@ -726,38 +709,47 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
public List<InputGate<? extends IOReadableWritable>> inputGates() {
return this.inputGates;
}
-
+
@Override
public AccumulatorProtocol getAccumulatorProtocolProxy() {
return accumulatorProtocolProxy;
}
+ public void addCopyTaskForCacheFile(String name, FutureTask<Path> copyTask) {
+ this.cacheCopyTasks.put(name, copyTask);
+ }
+
+ @Override
+ public Map<String, FutureTask<Path>> getCopyTask() {
+ return this.cacheCopyTasks;
+ }
+
@Override
public BufferProvider getOutputBufferProvider() {
return this;
}
-
+
// -----------------------------------------------------------------------------------------------------------------
// BufferProvider methods
// -----------------------------------------------------------------------------------------------------------------
-
+
@Override
public Buffer requestBuffer(int minBufferSize) throws IOException {
return this.outputBufferPool.requestBuffer(minBufferSize);
}
-
+
@Override
public Buffer requestBufferBlocking(int minBufferSize) throws IOException, InterruptedException {
return this.outputBufferPool.requestBufferBlocking(minBufferSize);
}
-
+
@Override
public int getBufferSize() {
return this.outputBufferPool.getBufferSize();
}
-
+
@Override
- public boolean registerBufferAvailabilityListener(BufferAvailabilityListener listener) {
+ public BufferAvailabilityRegistration registerBufferAvailabilityListener(BufferAvailabilityListener listener) {
return this.outputBufferPool.registerBufferAvailabilityListener(listener);
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cd4a134/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionVertex.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionVertex.java
index 57ff073..8e9395a 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionVertex.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionVertex.java
@@ -24,6 +24,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
+import eu.stratosphere.nephele.taskmanager.TaskKillResult;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -691,6 +692,41 @@ public final class ExecutionVertex {
}
/**
+ * Kills and removes the task represented by this vertex from the instance it is currently running on. If the
+ * corresponding task is not in the state <code>RUNNING</code>, this call will be ignored. If the call has been
+ * executed
+ * successfully, the task will change the state <code>FAILED</code>.
+ *
+ * @return the result of the task kill attempt
+ */
+ public TaskKillResult killTask() {
+
+ final ExecutionState state = this.executionState.get();
+
+ if (state != ExecutionState.RUNNING) {
+ final TaskKillResult result = new TaskKillResult(getID(), AbstractTaskResult.ReturnCode.ILLEGAL_STATE);
+ result.setDescription("Vertex " + this.toString() + " is in state " + state);
+ return result;
+ }
+
+ final AllocatedResource ar = this.allocatedResource.get();
+
+ if (ar == null) {
+ final TaskKillResult result = new TaskKillResult(getID(), AbstractTaskResult.ReturnCode.NO_INSTANCE);
+ result.setDescription("Assigned instance of vertex " + this.toString() + " is null!");
+ return result;
+ }
+
+ try {
+ return ar.getInstance().killTask(this.vertexID);
+ } catch (IOException e) {
+ final TaskKillResult result = new TaskKillResult(getID(), AbstractTaskResult.ReturnCode.IPC_ERROR);
+ result.setDescription(StringUtils.stringifyException(e));
+ return result;
+ }
+ }
+
+ /**
* Cancels and removes the task represented by this vertex
* from the instance it is currently running on. If the task
* is not currently running, its execution state is simply
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cd4a134/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/AbstractInstance.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/AbstractInstance.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/AbstractInstance.java
index 50e0e7f..56b4eae 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/AbstractInstance.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/AbstractInstance.java
@@ -24,6 +24,7 @@ import eu.stratosphere.nephele.execution.librarycache.LibraryCacheProfileRequest
import eu.stratosphere.nephele.execution.librarycache.LibraryCacheProfileResponse;
import eu.stratosphere.nephele.execution.librarycache.LibraryCacheUpdate;
import eu.stratosphere.nephele.executiongraph.ExecutionVertexID;
+import eu.stratosphere.nephele.taskmanager.TaskKillResult;
import eu.stratosphere.runtime.io.channels.ChannelID;
import eu.stratosphere.nephele.ipc.RPC;
import eu.stratosphere.nephele.jobgraph.JobID;
@@ -205,6 +206,21 @@ public abstract class AbstractInstance extends NetworkNode {
return getTaskManagerProxy().cancelTask(id);
}
+ /**
+ * Kills the task identified by the given ID at the instance's
+ * {@link eu.stratosphere.nephele.taskmanager.TaskManager}.
+ *
+ * @param id
+ * the ID identifying the task to be killed
+ * @throws IOException
+ * thrown if an error occurs while transmitting the request or receiving the response
+ * @return the result of the kill attempt
+ */
+ public synchronized TaskKillResult killTask(final ExecutionVertexID id) throws IOException {
+
+ return getTaskManagerProxy().killTask(id);
+ }
+
@Override
public boolean equals(final Object obj) {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cd4a134/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobID.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobID.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobID.java
index 2ec2ed6..e32df61 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobID.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobID.java
@@ -13,11 +13,10 @@
package eu.stratosphere.nephele.jobgraph;
-import java.nio.ByteBuffer;
+import eu.stratosphere.nephele.AbstractID;
import javax.xml.bind.DatatypeConverter;
-
-import eu.stratosphere.nephele.AbstractID;
+import java.nio.ByteBuffer;
public final class JobID extends AbstractID {
@@ -44,6 +43,12 @@ public final class JobID extends AbstractID {
return new JobID(bytes);
}
+ public static JobID fromByteBuffer(ByteBuffer buf) {
+ long lower = buf.getLong();
+ long upper = buf.getLong();
+ return new JobID(lower, upper);
+ }
+
public static JobID fromByteBuffer(ByteBuffer buf, int offset) {
long lower = buf.getLong(offset);
long upper = buf.getLong(offset + 8);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cd4a134/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/JobManager.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/JobManager.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/JobManager.java
index 846ca2e..3ae9f3b 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/JobManager.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/JobManager.java
@@ -31,6 +31,8 @@ import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import eu.stratosphere.nephele.managementgraph.ManagementVertexID;
+import eu.stratosphere.nephele.taskmanager.TaskKillResult;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.GnuParser;
@@ -893,6 +895,38 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
}
@Override
+ public void killTask(final JobID jobID, final ManagementVertexID id) throws IOException {
+
+ final ExecutionGraph eg = this.scheduler.getExecutionGraphByID(jobID);
+ if (eg == null) {
+ LOG.error("Cannot find execution graph for job " + jobID);
+ return;
+ }
+
+ final ExecutionVertex vertex = eg.getVertexByID(ExecutionVertexID.fromManagementVertexID(id));
+ if (vertex == null) {
+ LOG.error("Cannot find execution vertex with ID " + id);
+ return;
+ }
+
+ LOG.info("Killing task " + vertex + " of job " + jobID);
+
+ final Runnable runnable = new Runnable() {
+
+ @Override
+ public void run() {
+
+ final TaskKillResult result = vertex.killTask();
+ if (result.getReturnCode() != AbstractTaskResult.ReturnCode.SUCCESS) {
+ LOG.error(result.getDescription());
+ }
+ }
+ };
+
+ eg.executeCommand(runnable);
+ }
+
+ @Override
public void killInstance(final StringRecord instanceName) throws IOException {
final AbstractInstance instance = this.instanceManager.getInstanceByName(instanceName.toString());
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cd4a134/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/scheduler/RecoveryLogic.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/scheduler/RecoveryLogic.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/scheduler/RecoveryLogic.java
index 762b494..e369613 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/scheduler/RecoveryLogic.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/scheduler/RecoveryLogic.java
@@ -21,6 +21,7 @@ import java.util.Map;
import java.util.Queue;
import java.util.Set;
+import eu.stratosphere.nephele.taskmanager.AbstractTaskResult;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -87,8 +88,8 @@ public final class RecoveryLogic {
verticesToBeRestarted.put(vertex.getID(), vertex);
final TaskCancelResult cancelResult = vertex.cancelTask();
- if (cancelResult.getReturnCode() != ReturnCode.SUCCESS
- && cancelResult.getReturnCode() != ReturnCode.TASK_NOT_FOUND) {
+ if (cancelResult.getReturnCode() != AbstractTaskResult.ReturnCode.SUCCESS
+ && cancelResult.getReturnCode() != AbstractTaskResult.ReturnCode.TASK_NOT_FOUND) {
verticesToBeRestarted.remove(vertex.getID());
LOG.error("Unable to cancel vertex" + cancelResult.getDescription());
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cd4a134/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/protocols/ExtendedManagementProtocol.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/protocols/ExtendedManagementProtocol.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/protocols/ExtendedManagementProtocol.java
index 461c797..59ec15d 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/protocols/ExtendedManagementProtocol.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/protocols/ExtendedManagementProtocol.java
@@ -24,6 +24,7 @@ import eu.stratosphere.nephele.instance.InstanceType;
import eu.stratosphere.nephele.instance.InstanceTypeDescription;
import eu.stratosphere.nephele.jobgraph.JobID;
import eu.stratosphere.nephele.managementgraph.ManagementGraph;
+import eu.stratosphere.nephele.managementgraph.ManagementVertexID;
import eu.stratosphere.nephele.topology.NetworkTopology;
/**
@@ -81,6 +82,18 @@ public interface ExtendedManagementProtocol extends JobManagementProtocol {
List<AbstractEvent> getEvents(JobID jobID) throws IOException;
/**
+ * Kills the task with the given vertex ID.
+ *
+ * @param jobID
+ * the ID of the job the vertex to be killed belongs to
+ * @param id
+ * the vertex ID which identified the task be killed
+ * @throws IOException
+ * thrown if an error occurs while transmitting the kill request
+ */
+ void killTask(JobID jobID, ManagementVertexID id) throws IOException;
+
+ /**
* Kills the instance with the given name (i.e. shuts down its task manager).
*
* @param instanceName
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cd4a134/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/protocols/TaskOperationProtocol.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/protocols/TaskOperationProtocol.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/protocols/TaskOperationProtocol.java
index 19522db..93d8cdf 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/protocols/TaskOperationProtocol.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/protocols/TaskOperationProtocol.java
@@ -23,6 +23,7 @@ import eu.stratosphere.nephele.execution.librarycache.LibraryCacheProfileRequest
import eu.stratosphere.nephele.execution.librarycache.LibraryCacheProfileResponse;
import eu.stratosphere.nephele.execution.librarycache.LibraryCacheUpdate;
import eu.stratosphere.nephele.executiongraph.ExecutionVertexID;
+import eu.stratosphere.nephele.taskmanager.TaskKillResult;
import eu.stratosphere.runtime.io.channels.ChannelID;
import eu.stratosphere.nephele.taskmanager.TaskCancelResult;
import eu.stratosphere.nephele.taskmanager.TaskSubmissionResult;
@@ -58,6 +59,17 @@ public interface TaskOperationProtocol extends VersionedProtocol {
TaskCancelResult cancelTask(ExecutionVertexID id) throws IOException;
/**
+ * Advises the task manager to kill the task with the given ID.
+ *
+ * @param id
+ * the ID of the task to kill
+ * @return the result of the task kill attempt
+ * @throws IOException
+ * thrown if an error occurs during this remote procedure call
+ */
+ TaskKillResult killTask(ExecutionVertexID id) throws IOException;
+
+ /**
* Queries the task manager about the cache status of the libraries stated in the {@link LibraryCacheProfileRequest}
* object.
*
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cd4a134/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/Task.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/Task.java
index 06eec0c..825eae1 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/Task.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/Task.java
@@ -25,6 +25,7 @@ import eu.stratosphere.nephele.jobgraph.JobID;
import eu.stratosphere.nephele.profiling.TaskManagerProfiler;
import eu.stratosphere.nephele.services.memorymanager.MemoryManager;
import eu.stratosphere.nephele.template.AbstractInvokable;
+import eu.stratosphere.util.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -57,9 +58,7 @@ public final class Task implements ExecutionObserver {
private Queue<ExecutionListener> registeredListeners = new ConcurrentLinkedQueue<ExecutionListener>();
- public Task(final ExecutionVertexID vertexID, final RuntimeEnvironment environment,
- final TaskManager taskManager) {
-
+ public Task(ExecutionVertexID vertexID, final RuntimeEnvironment environment, TaskManager taskManager) {
this.vertexID = vertexID;
this.environment = environment;
this.taskManager = taskManager;
@@ -102,58 +101,51 @@ public final class Task implements ExecutionObserver {
executionStateChanged(ExecutionState.FAILED, "Execution thread died unexpectedly");
}
- /**
- * Checks if the state of the thread which is associated with this task is <code>TERMINATED</code>.
- *
- * @return <code>true</code> if the state of this thread which is associated with this task is
- * <code>TERMINATED</code>, <code>false</code> otherwise
- */
- public boolean isTerminated() {
- final Thread executingThread = this.environment.getExecutingThread();
- if (executingThread.getState() == Thread.State.TERMINATED) {
- return true;
- }
-
- return false;
+ public void cancelExecution() {
+ cancelOrKillExecution(true);
}
- /**
- * Starts the execution of this task.
- */
- public void startExecution() {
-
- final Thread thread = this.environment.getExecutingThread();
- thread.start();
+ public void killExecution() {
+ cancelOrKillExecution(false);
}
/**
- * Cancels the execution of the task (i.e. interrupts the execution thread).
+ * Cancels or kills the task.
+ *
+ * @param cancel <code>true/code> if the task shall be canceled, <code>false</code> if it shall be killed
*/
- public void cancelExecution() {
+ private void cancelOrKillExecution(boolean cancel) {
final Thread executingThread = this.environment.getExecutingThread();
if (executingThread == null) {
return;
}
- LOG.info("Canceling " + this.environment.getTaskNameWithIndex());
+ if (this.executionState != ExecutionState.RUNNING && this.executionState != ExecutionState.FINISHING) {
+ return;
+ }
+
+ LOG.info((cancel ? "Canceling " : "Killing ") + this.environment.getTaskNameWithIndex());
- this.isCanceled = true;
- // Change state
- executionStateChanged(ExecutionState.CANCELING, null);
+ if (cancel) {
+ this.isCanceled = true;
+ // Change state
+ executionStateChanged(ExecutionState.CANCELING, null);
- // Request user code to shut down
- try {
- final AbstractInvokable invokable = this.environment.getInvokable();
- if (invokable != null) {
- invokable.cancel();
+ // Request user code to shut down
+ try {
+ final AbstractInvokable invokable = this.environment.getInvokable();
+ if (invokable != null) {
+ invokable.cancel();
+ }
+ } catch (Throwable e) {
+ LOG.error(StringUtils.stringifyException(e));
}
- } catch (Throwable e) {
- LOG.error("Error while canceling task", e);
}
// Continuously interrupt the user thread until it changed to state CANCELED
while (true) {
+
executingThread.interrupt();
if (!executingThread.isAlive()) {
@@ -168,12 +160,36 @@ public final class Task implements ExecutionObserver {
break;
}
- if (LOG.isDebugEnabled())
- LOG.debug("Sending repeated canceling signal to " +
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Sending repeated " + (cancel == true ? "canceling" : "killing") + " signal to " +
this.environment.getTaskName() + " with state " + this.executionState);
+ }
}
}
+ /**
+ * Checks if the state of the thread which is associated with this task is <code>TERMINATED</code>.
+ *
+ * @return <code>true</code> if the state of this thread which is associated with this task is
+ * <code>TERMINATED</code>, <code>false</code> otherwise
+ */
+ public boolean isTerminated() {
+ final Thread executingThread = this.environment.getExecutingThread();
+ if (executingThread.getState() == Thread.State.TERMINATED) {
+ return true;
+ }
+
+ return false;
+ }
+
+ /**
+ * Starts the execution of this task.
+ */
+ public void startExecution() {
+
+ final Thread thread = this.environment.getExecutingThread();
+ thread.start();
+ }
/**
* Registers the task manager profiler with the task.
@@ -324,5 +340,4 @@ public final class Task implements ExecutionObserver {
return this.environment;
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cd4a134/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskKillResult.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskKillResult.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskKillResult.java
new file mode 100644
index 0000000..3c0002c
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskKillResult.java
@@ -0,0 +1,44 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.nephele.taskmanager;
+
+import eu.stratosphere.nephele.executiongraph.ExecutionVertexID;
+
+/**
+ * A <code>TaskKillResult</code> is used to report the results
+ * of a task kill attempt. It contains the ID of the task to be killed, a return code and
+ * a description. In case of an error during the kill operation the description includes an error message.
+ *
+ */
+public class TaskKillResult extends AbstractTaskResult {
+
+ /**
+ * Constructs a new task kill result.
+ *
+ * @param vertexID
+ * the task ID this result belongs to
+ * @param returnCode
+ * the return code of the kill
+ */
+ public TaskKillResult(final ExecutionVertexID vertexID, final ReturnCode returnCode) {
+ super(vertexID, returnCode);
+ }
+
+ /**
+ * Constructs an empty task kill result.
+ */
+ public TaskKillResult() {
+ super();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cd4a134/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskManager.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskManager.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskManager.java
index 5240fc8..3b478cf 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskManager.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskManager.java
@@ -126,7 +126,7 @@ public class TaskManager implements TaskOperationProtocol {
private final InstanceConnectionInfo localInstanceConnectionInfo;
/**
- * The instance of the {@link eu.stratosphere.nephele.taskmanager.io.bytebuffered.ChannelManager} which is responsible for
+ * The instance of the {@link ChannelManager} which is responsible for
* setting up and cleaning up the byte buffered channels of the tasks.
*/
private final ChannelManager channelManager;
@@ -286,16 +286,32 @@ public class TaskManager implements TaskOperationProtocol {
ConfigConstants.TASK_MANAGER_NETWORK_BUFFER_SIZE_KEY,
ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_BUFFER_SIZE);
- // Initialize the byte buffered channel manager
- ChannelManager channelManager = null;
+ int numInThreads = GlobalConfiguration.getInteger(
+ ConfigConstants.TASK_MANAGER_NETTY_NUM_IN_THREADS_KEY,
+ ConfigConstants.DEFAULT_TASK_MANAGER_NETTY_NUM_IN_THREADS);
+
+ int numOutThreads = GlobalConfiguration.getInteger(
+ ConfigConstants.TASK_MANAGER_NETTY_NUM_OUT_THREADS_KEY,
+ ConfigConstants.DEFAULT_TASK_MANAGER_NETTY_NUM_OUT_THREADS);
+
+ int lowWaterMark = GlobalConfiguration.getInteger(
+ ConfigConstants.TASK_MANAGER_NETTY_LOW_WATER_MARK,
+ ConfigConstants.DEFAULT_TASK_MANAGER_NETTY_LOW_WATER_MARK);
+
+ int highWaterMark = GlobalConfiguration.getInteger(
+ ConfigConstants.TASK_MANAGER_NETTY_HIGH_WATER_MARK,
+ ConfigConstants.DEFAULT_TASK_MANAGER_NETTY_HIGH_WATER_MARK);
+
+ // Initialize the channel manager
try {
- channelManager = new ChannelManager(this.lookupService, this.localInstanceConnectionInfo, numBuffers, bufferSize);
+ this.channelManager = new ChannelManager(
+ this.lookupService, this.localInstanceConnectionInfo,
+ numBuffers, bufferSize, numInThreads, numOutThreads, lowWaterMark, highWaterMark);
} catch (IOException ioe) {
LOG.error(StringUtils.stringifyException(ioe));
throw new Exception("Failed to instantiate Byte-buffered channel manager. " + ioe.getMessage(), ioe);
}
- this.channelManager = channelManager;
-
+
{
HardwareDescription resources = HardwareDescriptionFactory.extractFromSystem();
@@ -933,7 +949,7 @@ public class TaskManager implements TaskOperationProtocol {
}
@Override
- public void logBufferUtilization() throws IOException {
+ public void logBufferUtilization() {
this.channelManager.logBufferUtilization();
}
@@ -956,7 +972,7 @@ public class TaskManager implements TaskOperationProtocol {
@Override
public void invalidateLookupCacheEntries(final Set<ChannelID> channelIDs) throws IOException {
- this.byteBufferedChannelManager.invalidateLookupCacheEntries(channelIDs);
+ this.channelManager.invalidateLookupCacheEntries(channelIDs);
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cd4a134/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/runtime/ExecutorThreadFactory.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/runtime/ExecutorThreadFactory.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/runtime/ExecutorThreadFactory.java
new file mode 100644
index 0000000..ed845e8
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/runtime/ExecutorThreadFactory.java
@@ -0,0 +1,35 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+package eu.stratosphere.nephele.taskmanager.runtime;
+
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class ExecutorThreadFactory implements ThreadFactory {
+
+ public static final ExecutorThreadFactory INSTANCE = new ExecutorThreadFactory();
+
+ private static final String THREAD_NAME = "Nephele Executor Thread ";
+
+ private final AtomicInteger threadNumber = new AtomicInteger(1);
+
+
+ private ExecutorThreadFactory() {}
+
+
+ public Thread newThread(Runnable target) {
+ Thread t = new Thread(target, THREAD_NAME + threadNumber.getAndIncrement());
+ t.setDaemon(true);
+ return t;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cd4a134/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/task/AbstractIterativePactTask.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/task/AbstractIterativePactTask.java b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/task/AbstractIterativePactTask.java
index 674f270..79b9c83 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/task/AbstractIterativePactTask.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/task/AbstractIterativePactTask.java
@@ -21,7 +21,6 @@ import eu.stratosphere.api.common.typeutils.TypeSerializer;
import eu.stratosphere.api.common.typeutils.TypeSerializerFactory;
import eu.stratosphere.core.memory.DataOutputView;
import eu.stratosphere.nephele.execution.Environment;
-import eu.stratosphere.nephele.io.MutableReader;
import eu.stratosphere.pact.runtime.hash.CompactingHashTable;
import eu.stratosphere.pact.runtime.iterative.concurrent.BlockingBackChannel;
import eu.stratosphere.pact.runtime.iterative.concurrent.BlockingBackChannelBroker;
@@ -36,6 +35,7 @@ import eu.stratosphere.pact.runtime.task.RegularPactTask;
import eu.stratosphere.pact.runtime.task.ResettablePactDriver;
import eu.stratosphere.pact.runtime.task.util.TaskConfig;
import eu.stratosphere.pact.runtime.udf.RuntimeUDFContext;
+import eu.stratosphere.runtime.io.api.MutableReader;
import eu.stratosphere.types.Value;
import eu.stratosphere.util.Collector;
import eu.stratosphere.util.InstantiationUtil;
@@ -322,14 +322,15 @@ public abstract class AbstractIterativePactTask<S extends Function, OT> extends
* <p/>
* This collector is used by {@link IterationIntermediatePactTask} or {@link IterationTailPactTask} to update the
* solution set of workset iterations. Depending on the task configuration, either a fast (non-probing)
- * {@link SolutionSetFastUpdateOutputCollector} or normal (re-probing) {@link SolutionSetUpdateOutputCollector}
- * is created.
+ * {@link eu.stratosphere.pact.runtime.iterative.io.SolutionSetFastUpdateOutputCollector} or normal (re-probing)
+ * {@link SolutionSetUpdateOutputCollector} is created.
* <p/>
* If a non-null delegate is given, the new {@link Collector} will write back to the solution set and also call
* collect(T) of the delegate.
*
* @param delegate null -OR- a delegate collector to be called by the newly created collector
- * @return a new {@link SolutionSetFastUpdateOutputCollector} or {@link SolutionSetUpdateOutputCollector}
+ * @return a new {@link eu.stratosphere.pact.runtime.iterative.io.SolutionSetFastUpdateOutputCollector} or
+ * {@link SolutionSetUpdateOutputCollector}
*/
protected Collector<OT> createSolutionSetUpdateOutputCollector(Collector<OT> delegate) {
Broker<CompactingHashTable<?>> solutionSetBroker = SolutionSetBroker.instance();
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cd4a134/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/task/IterationHeadPactTask.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/task/IterationHeadPactTask.java b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/task/IterationHeadPactTask.java
index c39e3ef..89571c4 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/task/IterationHeadPactTask.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/task/IterationHeadPactTask.java
@@ -17,6 +17,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import eu.stratosphere.runtime.io.api.BufferWriter;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -27,8 +28,6 @@ import eu.stratosphere.api.common.typeutils.TypeSerializer;
import eu.stratosphere.api.common.typeutils.TypeSerializerFactory;
import eu.stratosphere.core.memory.DataInputView;
import eu.stratosphere.core.memory.MemorySegment;
-import eu.stratosphere.nephele.io.AbstractRecordWriter;
-import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.pact.runtime.hash.CompactingHashTable;
import eu.stratosphere.pact.runtime.io.InputViewIterator;
import eu.stratosphere.pact.runtime.iterative.concurrent.BlockingBackChannel;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cd4a134/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/shipping/OutputEmitter.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/shipping/OutputEmitter.java b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/shipping/OutputEmitter.java
index 46e3249..6491749 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/shipping/OutputEmitter.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/shipping/OutputEmitter.java
@@ -18,7 +18,6 @@ import eu.stratosphere.api.common.typeutils.TypeComparator;
import eu.stratosphere.runtime.io.api.ChannelSelector;
import eu.stratosphere.pact.runtime.plugable.SerializationDelegate;
-
public class OutputEmitter<T> implements ChannelSelector<SerializationDelegate<T>> {
private final ShipStrategyType strategy; // the shipping strategy used by this output emitter
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cd4a134/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/shipping/RecordOutputEmitter.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/shipping/RecordOutputEmitter.java b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/shipping/RecordOutputEmitter.java
index ba352eb..7fe35b4 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/shipping/RecordOutputEmitter.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/shipping/RecordOutputEmitter.java
@@ -15,13 +15,10 @@ package eu.stratosphere.pact.runtime.shipping;
import eu.stratosphere.api.common.distributions.DataDistribution;
import eu.stratosphere.api.common.typeutils.TypeComparator;
-import eu.stratosphere.nephele.io.ChannelSelector;
import eu.stratosphere.runtime.io.api.ChannelSelector;
-import eu.stratosphere.pact.runtime.plugable.pactrecord.RecordComparator;
import eu.stratosphere.types.Key;
import eu.stratosphere.types.Record;
-
public class RecordOutputEmitter implements ChannelSelector<Record> {
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cd4a134/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/DataSinkTask.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/DataSinkTask.java b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/DataSinkTask.java
index cb3e782..638a7aa 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/DataSinkTask.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/DataSinkTask.java
@@ -193,8 +193,9 @@ public class DataSinkTask<IT> extends AbstractOutputTask {
}
// drop, if the task was canceled
else if (!this.taskCanceled) {
- if (LOG.isErrorEnabled())
+ if (LOG.isErrorEnabled()) {
LOG.error(getLogString("Error in user code: " + ex.getMessage()), ex);
+ }
throw ex;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cd4a134/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/RegularPactTask.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/RegularPactTask.java b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/RegularPactTask.java
index b01799a..92c4648 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/RegularPactTask.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/RegularPactTask.java
@@ -22,9 +22,7 @@ import eu.stratosphere.api.common.typeutils.TypeComparator;
import eu.stratosphere.api.common.typeutils.TypeComparatorFactory;
import eu.stratosphere.api.common.typeutils.TypeSerializer;
import eu.stratosphere.api.common.typeutils.TypeSerializerFactory;
-import eu.stratosphere.configuration.ConfigConstants;
import eu.stratosphere.configuration.Configuration;
-import eu.stratosphere.configuration.GlobalConfiguration;
import eu.stratosphere.core.io.IOReadableWritable;
import eu.stratosphere.nephele.execution.CancelTaskException;
import eu.stratosphere.nephele.execution.Environment;
@@ -280,8 +278,9 @@ public class RegularPactTask<S extends Function, OT> extends AbstractTask implem
@Override
public void invoke() throws Exception {
- if (LOG.isDebugEnabled())
+ if (LOG.isDebugEnabled()) {
LOG.debug(formatLogString("Start task code."));
+ }
// whatever happens in this scope, make sure that the local strategies are cleaned up!
// note that the initialization of the local strategies is in the try-finally block as well,
@@ -390,8 +389,9 @@ public class RegularPactTask<S extends Function, OT> extends AbstractTask implem
public void cancel() throws Exception {
this.running = false;
- if (LOG.isDebugEnabled())
+ if (LOG.isDebugEnabled()) {
LOG.debug(formatLogString("Cancelling task code"));
+ }
try {
if (this.driver != null) {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cd4a134/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/Buffer.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/Buffer.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/Buffer.java
index c192cb9..f3f51f1 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/Buffer.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/Buffer.java
@@ -40,8 +40,6 @@ public class Buffer {
}
/**
- * NOTE: Requires that the reference counter was increased prior to the constructor call!
- *
* @param toDuplicate Buffer instance to duplicate
*/
private Buffer(Buffer toDuplicate) {
@@ -74,7 +72,8 @@ public class Buffer {
}
public void recycleBuffer() {
- if (this.referenceCounter.decrementAndGet() == 0) {
+ int refCount = this.referenceCounter.decrementAndGet();
+ if (refCount == 0) {
this.recycler.recycle(this.memorySegment);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cd4a134/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/channels/Channel.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/channels/Channel.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/channels/Channel.java
index dfa5d5e..17fff02 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/channels/Channel.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/channels/Channel.java
@@ -17,8 +17,8 @@ import java.io.IOException;
import eu.stratosphere.nephele.event.task.AbstractEvent;
import eu.stratosphere.nephele.jobgraph.JobID;
-import eu.stratosphere.runtime.io.network.envelope.Envelope;
-import eu.stratosphere.runtime.io.network.envelope.EnvelopeDispatcher;
+import eu.stratosphere.runtime.io.network.Envelope;
+import eu.stratosphere.runtime.io.network.EnvelopeDispatcher;
/**
* The base class for channel objects.
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cd4a134/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/channels/ChannelID.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/channels/ChannelID.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/channels/ChannelID.java
index 66be7de..ba78c01 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/channels/ChannelID.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/channels/ChannelID.java
@@ -13,10 +13,10 @@
package eu.stratosphere.runtime.io.channels;
-import java.nio.ByteBuffer;
-
import eu.stratosphere.nephele.AbstractID;
+import java.nio.ByteBuffer;
+
public class ChannelID extends AbstractID {
public ChannelID() {
@@ -31,6 +31,12 @@ public class ChannelID extends AbstractID {
super(bytes);
}
+ public static ChannelID fromByteBuffer(ByteBuffer buf) {
+ long lower = buf.getLong();
+ long upper = buf.getLong();
+ return new ChannelID(lower, upper);
+ }
+
public static ChannelID fromByteBuffer(ByteBuffer buf, int offset) {
long lower = buf.getLong(offset);
long upper = buf.getLong(offset + 8);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cd4a134/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/channels/ChannelType.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/channels/ChannelType.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/channels/ChannelType.java
index 5d5b53d..3007489 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/channels/ChannelType.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/channels/ChannelType.java
@@ -24,3 +24,8 @@ public enum ChannelType {
/** In-memory channels */
IN_MEMORY
}
+
+
+
+
+
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cd4a134/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/channels/InputChannel.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/channels/InputChannel.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/channels/InputChannel.java
index 860141d..6122c36 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/channels/InputChannel.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/channels/InputChannel.java
@@ -21,7 +21,7 @@ import eu.stratosphere.runtime.io.Buffer;
import eu.stratosphere.runtime.io.gates.InputChannelResult;
import eu.stratosphere.runtime.io.network.bufferprovider.BufferAvailabilityListener;
import eu.stratosphere.runtime.io.network.bufferprovider.BufferProvider;
-import eu.stratosphere.runtime.io.network.envelope.Envelope;
+import eu.stratosphere.runtime.io.network.Envelope;
import eu.stratosphere.runtime.io.gates.InputGate;
import eu.stratosphere.runtime.io.serialization.AdaptiveSpanningRecordDeserializer;
import eu.stratosphere.runtime.io.serialization.RecordDeserializer;
@@ -30,7 +30,6 @@ import eu.stratosphere.runtime.io.serialization.RecordDeserializer.Deserializati
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import java.io.EOFException;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Arrays;
@@ -83,6 +82,8 @@ public class InputChannel<T extends IOReadableWritable> extends Channel implemen
private int lastReceivedEnvelope = -1;
+ private ChannelID lastSourceID = null;
+
private boolean destroyCalled = false;
// ----------------------
@@ -104,7 +105,7 @@ public class InputChannel<T extends IOReadableWritable> extends Channel implemen
* the ID of the channel this channel is connected to
*/
public InputChannel(final InputGate<T> inputGate, final int channelIndex, final ChannelID channelID,
- final ChannelID connectedChannelID, ChannelType type) {
+ final ChannelID connectedChannelID, ChannelType type) {
super(channelIndex, channelID, connectedChannelID, type);
this.inputGate = inputGate;
this.deserializer = new AdaptiveSpanningRecordDeserializer<T>();
@@ -158,6 +159,9 @@ public class InputChannel<T extends IOReadableWritable> extends Channel implemen
// public abstract AbstractTaskEvent getCurrentEvent();
+ private DeserializationResult lastDeserializationResult;
+
+
public InputChannelResult readRecord(T target) throws IOException {
if (this.dataBuffer == null) {
if (isClosed()) {
@@ -176,7 +180,7 @@ public class InputChannel<T extends IOReadableWritable> extends Channel implemen
{
// sanity check: an event may only come after a complete record.
if (this.deserializer.hasUnfinishedData()) {
- throw new IOException("Channel received an event before completing the current partial record.");
+ throw new IllegalStateException("Channel received an event before completing the current partial record.");
}
AbstractEvent evt = boe.getEvent();
@@ -202,8 +206,8 @@ public class InputChannel<T extends IOReadableWritable> extends Channel implemen
}
}
-
DeserializationResult deserializationResult = this.deserializer.getNextRecord(target);
+ this.lastDeserializationResult = deserializationResult;
if (deserializationResult.isBufferConsumed()) {
releasedConsumedReadBuffer(this.dataBuffer);
@@ -348,6 +352,7 @@ public class InputChannel<T extends IOReadableWritable> extends Channel implemen
this.queuedEnvelopes.add(envelope);
this.lastReceivedEnvelope = sequenceNumber;
+ this.lastSourceID = envelope.getSource();
// Notify the channel about the new data. notify as much as there is (buffer plus once per event)
if (envelope.getBuffer() != null) {
@@ -432,7 +437,7 @@ public class InputChannel<T extends IOReadableWritable> extends Channel implemen
}
@Override
- public boolean registerBufferAvailabilityListener(BufferAvailabilityListener listener) {
+ public BufferAvailabilityRegistration registerBufferAvailabilityListener(BufferAvailabilityListener listener) {
return this.inputGate.registerBufferAvailabilityListener(listener);
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cd4a134/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/channels/OutputChannel.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/channels/OutputChannel.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/channels/OutputChannel.java
index 7ca916c..f4fed65 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/channels/OutputChannel.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/channels/OutputChannel.java
@@ -17,7 +17,7 @@ import eu.stratosphere.nephele.event.task.AbstractEvent;
import eu.stratosphere.nephele.event.task.AbstractTaskEvent;
import eu.stratosphere.nephele.jobgraph.JobID;
import eu.stratosphere.runtime.io.Buffer;
-import eu.stratosphere.runtime.io.network.envelope.Envelope;
+import eu.stratosphere.runtime.io.network.Envelope;
import eu.stratosphere.runtime.io.gates.OutputGate;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -107,7 +107,6 @@ public class OutputChannel extends Channel {
this.receiverCloseRequested = true;
this.closeLock.notifyAll();
}
- LOG.debug("OutputChannel received close event from target.");
}
else if (event instanceof AbstractTaskEvent) {
if (LOG.isDebugEnabled()) {
@@ -165,7 +164,7 @@ public class OutputChannel extends Channel {
private void checkStatus() throws IOException {
if (this.senderCloseRequested) {
- throw new IllegalStateException(String.format("Channel %s already requested to be closed.", getID()));
+ throw new IllegalStateException(String.format("Channel %s already requested to be closed", getID()));
}
if (this.receiverCloseRequested) {
throw new ReceiverAlreadyClosedException();
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cd4a134/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/gates/InputGate.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/gates/InputGate.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/gates/InputGate.java
index bdac7a2..c623220 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/gates/InputGate.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/gates/InputGate.java
@@ -14,9 +14,6 @@
package eu.stratosphere.runtime.io.gates;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicReference;
@@ -24,7 +21,6 @@ import java.util.concurrent.atomic.AtomicReference;
import eu.stratosphere.nephele.deployment.ChannelDeploymentDescriptor;
import eu.stratosphere.nephele.deployment.GateDeploymentDescriptor;
import eu.stratosphere.runtime.io.Buffer;
-import eu.stratosphere.runtime.io.channels.ChannelType;
import eu.stratosphere.runtime.io.network.bufferprovider.BufferAvailabilityListener;
import eu.stratosphere.runtime.io.network.bufferprovider.BufferProvider;
import eu.stratosphere.runtime.io.network.bufferprovider.GlobalBufferPool;
@@ -38,7 +34,6 @@ import eu.stratosphere.nephele.event.task.AbstractEvent;
import eu.stratosphere.nephele.event.task.AbstractTaskEvent;
import eu.stratosphere.nephele.execution.Environment;
import eu.stratosphere.runtime.io.channels.InputChannel;
-import eu.stratosphere.runtime.io.channels.ChannelID;
import eu.stratosphere.nephele.jobgraph.JobID;
/**
@@ -378,7 +373,7 @@ public class InputGate<T extends IOReadableWritable> extends Gate<T> implements
}
@Override
- public boolean registerBufferAvailabilityListener(BufferAvailabilityListener listener) {
+ public BufferAvailabilityRegistration registerBufferAvailabilityListener(BufferAvailabilityListener listener) {
return this.bufferPool.registerBufferAvailabilityListener(listener);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cd4a134/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/gates/OutputGate.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/gates/OutputGate.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/gates/OutputGate.java
index d3eaea1..bff9180 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/gates/OutputGate.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/gates/OutputGate.java
@@ -19,7 +19,6 @@ import eu.stratosphere.nephele.deployment.GateDeploymentDescriptor;
import eu.stratosphere.nephele.event.task.AbstractEvent;
import eu.stratosphere.runtime.io.Buffer;
import eu.stratosphere.runtime.io.channels.ChannelID;
-import eu.stratosphere.runtime.io.channels.ChannelType;
import eu.stratosphere.runtime.io.channels.OutputChannel;
import eu.stratosphere.nephele.jobgraph.JobID;