You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/06/22 23:47:35 UTC
[14/22] git commit: Removed RuntimeEnvironment instantiation from
execution graph construction. Removed legacy job vertex classes and
input/output tasks.
Removed RuntimeEnvironment instantiation from execution graph construction. Removed legacy job vertex classes and input/output tasks.
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/ea79186b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/ea79186b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/ea79186b
Branch: refs/heads/master
Commit: ea79186b7ef787991fa1c4dbfa29f26c7aefd804
Parents: 429493d
Author: Till Rohrmann <ti...@mailbox.tu-berlin.de>
Authored: Wed Mar 26 02:58:15 2014 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Sun Jun 22 21:07:20 2014 +0200
----------------------------------------------------------------------
.../plantranslate/NepheleJobGraphGenerator.java | 33 +--
.../api/common/io/FileOutputFormat.java | 38 ++++
.../api/common/io/OutputFormat.java | 7 +
.../configuration/Configuration.java | 12 ++
.../api/java/io/PrintingOutputFormat.java | 3 +
.../nephele/execution/RuntimeEnvironment.java | 56 ++----
.../nephele/executiongraph/ExecutionGraph.java | 86 ++------
.../executiongraph/ExecutionGroupVertex.java | 53 ++---
.../jobgraph/AbstractJobInputVertex.java | 7 +
.../nephele/jobgraph/AbstractJobVertex.java | 62 +-----
.../nephele/jobgraph/JobFileInputVertex.java | 195 ------------------
.../nephele/jobgraph/JobFileOutputVertex.java | 198 ------------------
.../nephele/jobgraph/JobGenericInputVertex.java | 168 ----------------
.../jobgraph/JobGenericOutputVertex.java | 182 -----------------
.../nephele/jobgraph/JobInputVertex.java | 90 ++++++++-
.../nephele/jobgraph/JobOutputVertex.java | 56 ++++++
.../nephele/jobgraph/JobTaskVertex.java | 17 --
.../splitassigner/InputSplitManager.java | 13 +-
.../LocatableInputSplitAssigner.java | 14 +-
.../file/FileInputSplitAssigner.java | 14 +-
.../nephele/template/AbstractFileInputTask.java | 201 -------------------
.../template/AbstractFileOutputTask.java | 46 -----
.../template/AbstractGenericInputTask.java | 39 ----
.../nephele/template/AbstractInputTask.java | 23 ---
.../nephele/template/AbstractInvokable.java | 33 ---
.../nephele/template/GenericInputTask.java | 39 ----
.../pact/runtime/task/DataSinkTask.java | 58 ------
.../pact/runtime/task/DataSourceTask.java | 60 +++---
.../pact/runtime/task/util/TaskConfig.java | 4 +
.../TaskDeploymentDescriptorTest.java | 8 +-
.../executiongraph/SelfCrossInputTask.java | 41 ----
.../nephele/jobmanager/DoubleSourceTask.java | 81 --------
.../nephele/jobmanager/DoubleTargetTask.java | 18 +-
.../nephele/jobmanager/ForwardTask.java | 12 +-
.../nephele/jobmanager/UnionTask.java | 16 +-
.../scheduler/queue/DefaultSchedulerTest.java | 43 ++++
.../nephele/util/FileLineReader.java | 80 --------
.../nephele/util/FileLineWriter.java | 75 -------
.../io/library/FileLineReadWriteTest.java | 136 -------------
.../recordJobs/util/DiscardingOutputFormat.java | 3 +
40 files changed, 412 insertions(+), 1908 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ea79186b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plantranslate/NepheleJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plantranslate/NepheleJobGraphGenerator.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plantranslate/NepheleJobGraphGenerator.java
index b4c7560..200ef7c 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plantranslate/NepheleJobGraphGenerator.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plantranslate/NepheleJobGraphGenerator.java
@@ -20,7 +20,14 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.Map.Entry;
+
+import eu.stratosphere.api.common.io.InputFormat;
+import eu.stratosphere.api.common.io.OutputFormat;
+import eu.stratosphere.api.common.operators.util.UserCodeWrapper;
+import eu.stratosphere.core.io.InputSplit;
+import eu.stratosphere.nephele.template.AbstractInputTask;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import eu.stratosphere.api.common.aggregators.AggregatorRegistry;
import eu.stratosphere.api.common.aggregators.AggregatorWithName;
@@ -805,31 +812,31 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
private JobInputVertex createDataSourceVertex(SourcePlanNode node) throws CompilerException {
final JobInputVertex vertex = new JobInputVertex(node.getNodeName(), this.jobGraph);
- final TaskConfig config = new TaskConfig(vertex.getConfiguration());
-
+
// set task class
@SuppressWarnings("unchecked")
- final Class<AbstractInputTask<?>> clazz = (Class<AbstractInputTask<?>>) (Class<?>) DataSourceTask.class;
+ final Class<AbstractInputTask<?>> clazz = (Class<AbstractInputTask<?>>) (Class<?>) DataSourceTask
+ .class;
vertex.setInputClass(clazz);
// set user code
- config.setStubWrapper(node.getPactContract().getUserCodeWrapper());
- config.setStubParameters(node.getPactContract().getParameters());
-
- config.setOutputSerializer(node.getSerializer());
+ vertex.setInputFormat((UserCodeWrapper<? extends InputFormat<?, InputSplit>>)node.getPactContract()
+ .getUserCodeWrapper());
+ vertex.setInputFormatParameters(node.getPactContract().getParameters());
+ vertex.setOutputSerializer(node.getSerializer());
return vertex;
}
private AbstractJobOutputVertex createDataSinkVertex(SinkPlanNode node) throws CompilerException {
final JobOutputVertex vertex = new JobOutputVertex(node.getNodeName(), this.jobGraph);
- final TaskConfig config = new TaskConfig(vertex.getConfiguration());
-
+
vertex.setOutputClass(DataSinkTask.class);
vertex.getConfiguration().setInteger(DataSinkTask.DEGREE_OF_PARALLELISM_KEY, node.getDegreeOfParallelism());
-
+
// set user code
- config.setStubWrapper(node.getPactContract().getUserCodeWrapper());
- config.setStubParameters(node.getPactContract().getParameters());
+ vertex.setOutputFormat((UserCodeWrapper<? extends OutputFormat<?>>)node.getPactContract().getUserCodeWrapper
+ ());
+ vertex.setOutputFormatParameters(node.getPactContract().getParameters());
return vertex;
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ea79186b/stratosphere-core/src/main/java/eu/stratosphere/api/common/io/FileOutputFormat.java
----------------------------------------------------------------------
diff --git a/stratosphere-core/src/main/java/eu/stratosphere/api/common/io/FileOutputFormat.java b/stratosphere-core/src/main/java/eu/stratosphere/api/common/io/FileOutputFormat.java
index b04ced9..7733c71 100644
--- a/stratosphere-core/src/main/java/eu/stratosphere/api/common/io/FileOutputFormat.java
+++ b/stratosphere-core/src/main/java/eu/stratosphere/api/common/io/FileOutputFormat.java
@@ -437,4 +437,42 @@ public abstract class FileOutputFormat<IT> implements OutputFormat<IT> {
super(targetConfig);
}
}
+
+ @Override
+ public void initialize(Configuration configuration){
+ final Path path = this.getOutputFilePath();
+ final WriteMode writeMode = this.getWriteMode();
+ final OutputDirectoryMode outDirMode = this.getOutputDirectoryMode();
+
+ // Prepare output path and determine max DOP
+ try {
+ final FileSystem fs = path.getFileSystem();
+
+ int dop = configuration.getInteger(DEGREE_OF_PARALLELISM_KEY, -1);
+ if(dop == 1 && outDirMode == OutputDirectoryMode.PARONLY) {
+ // output is not written in parallel and should be written to a single file.
+
+ if(fs.isDistributedFS()) {
+ // prepare distributed output path
+ if(!fs.initOutPathDistFS(path, writeMode, false)) {
+ // output preparation failed! Cancel task.
+ throw new IOException("Output path could not be initialized.");
+ }
+ }
+ } else {
+ // output should be written to a directory
+
+ if(fs.isDistributedFS()) {
+ // only distributed file systems can be initialized at start-up time.
+ if(!fs.initOutPathDistFS(path, writeMode, true)) {
+ throw new IOException("Output directory could not be created.");
+ }
+ }
+ }
+ }
+ catch (IOException e) {
+ LOG.error("Could not access the file system to detemine the status of the output.", e);
+ throw new RuntimeException("I/O Error while accessing file", e);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ea79186b/stratosphere-core/src/main/java/eu/stratosphere/api/common/io/OutputFormat.java
----------------------------------------------------------------------
diff --git a/stratosphere-core/src/main/java/eu/stratosphere/api/common/io/OutputFormat.java b/stratosphere-core/src/main/java/eu/stratosphere/api/common/io/OutputFormat.java
index c32be78..3b66902 100644
--- a/stratosphere-core/src/main/java/eu/stratosphere/api/common/io/OutputFormat.java
+++ b/stratosphere-core/src/main/java/eu/stratosphere/api/common/io/OutputFormat.java
@@ -79,5 +79,12 @@ public interface OutputFormat<IT> extends Serializable {
* @throws IOException Thrown, if the input could not be closed properly.
*/
void close() throws IOException;
+
+ /**
+ * Method which is called on the master node prior to execution. It can be used to set up the output format.
+ *
+ * @param configuration The task configuration
+ */
+ void initialize(Configuration configuration);
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ea79186b/stratosphere-core/src/main/java/eu/stratosphere/configuration/Configuration.java
----------------------------------------------------------------------
diff --git a/stratosphere-core/src/main/java/eu/stratosphere/configuration/Configuration.java b/stratosphere-core/src/main/java/eu/stratosphere/configuration/Configuration.java
index 46cadc3..0271b59 100644
--- a/stratosphere-core/src/main/java/eu/stratosphere/configuration/Configuration.java
+++ b/stratosphere-core/src/main/java/eu/stratosphere/configuration/Configuration.java
@@ -405,6 +405,18 @@ public class Configuration implements IOReadableWritable {
}
}
}
+
+ /**
+ * Checks whether there is an entry with key
+ *
+ * @param key key of entry
+ * @return true if entry with key is stored in the configuration, otherwise false
+ */
+ public boolean containsKey(String key){
+ synchronized (this.confData){
+ return this.confData.containsKey(key);
+ }
+ }
// --------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ea79186b/stratosphere-java/src/main/java/eu/stratosphere/api/java/io/PrintingOutputFormat.java
----------------------------------------------------------------------
diff --git a/stratosphere-java/src/main/java/eu/stratosphere/api/java/io/PrintingOutputFormat.java b/stratosphere-java/src/main/java/eu/stratosphere/api/java/io/PrintingOutputFormat.java
index 5c09439..d1736d4 100644
--- a/stratosphere-java/src/main/java/eu/stratosphere/api/java/io/PrintingOutputFormat.java
+++ b/stratosphere-java/src/main/java/eu/stratosphere/api/java/io/PrintingOutputFormat.java
@@ -95,4 +95,7 @@ public class PrintingOutputFormat<T> implements OutputFormat<T> {
public String toString() {
return "Print to " + (target == STD_OUT ? "System.out" : "System.err");
}
+
+ @Override
+ public void initialize(Configuration configuration){}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ea79186b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/RuntimeEnvironment.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/RuntimeEnvironment.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/RuntimeEnvironment.java
index 4e07694..70718a9 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/RuntimeEnvironment.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/RuntimeEnvironment.java
@@ -83,6 +83,12 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
private final List<InputGate<? extends IOReadableWritable>> inputGates = new CopyOnWriteArrayList<InputGate<? extends IOReadableWritable>>();
/**
+ * Queue of unbound output gate IDs which are required for deserializing an environment in the course of an RPC
+ * call.
+ */
+ private final Queue<GateID> unboundOutputGateIDs = new ArrayDeque<GateID>();
+
+ /**
* Queue of unbound input gate IDs which are required for deserializing an environment in the course of an RPC
* call.
*/
@@ -165,46 +171,18 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
private volatile boolean canceled;
/**
- * Creates a new runtime environment object which contains the runtime information for the encapsulated Nephele
- * task.
- *
- * @param jobID the ID of the original Nephele job
- * @param taskName the name of task running in this environment
- * @param invokableClass invokableClass the class that should be instantiated as a Nephele task
- * @param taskConfiguration the configuration object which was attached to the original JobVertex
- * @param jobConfiguration the configuration object which was attached to the original JobGraph
- * @throws Exception thrown if an error occurs while instantiating the invokable class
- */
- public RuntimeEnvironment(final JobID jobID, final String taskName,
- final Class<? extends AbstractInvokable> invokableClass, final Configuration taskConfiguration,
- final Configuration jobConfiguration)
- throws Exception
- {
- this.jobID = jobID;
- this.taskName = taskName;
- this.invokableClass = invokableClass;
- this.taskConfiguration = taskConfiguration;
- this.jobConfiguration = jobConfiguration;
- this.indexInSubtaskGroup = 0;
- this.currentNumberOfSubtasks = 0;
- this.memoryManager = null;
- this.ioManager = null;
- this.inputSplitProvider = null;
- this.cacheCopyTasks = new HashMap<String, FutureTask<Path>>();
-
- this.invokable = this.invokableClass.newInstance();
- this.invokable.setEnvironment(this);
- this.invokable.registerInputOutput();
- }
-
- /**
* Constructs a runtime environment from a task deployment description.
- *
- * @param tdd the task deployment description
- * @param memoryManager the task manager's memory manager component
- * @param ioManager the task manager's I/O manager component
- * @param inputSplitProvider the input split provider for this environment
- * @throws Exception thrown if an error occurs while instantiating the invokable class
+ *
+ * @param tdd
+ * the task deployment description
+ * @param memoryManager
+ * the task manager's memory manager component
+ * @param ioManager
+ * the task manager's I/O manager component
+ * @param inputSplitProvider
+ * the input split provider for this environment
+ * @throws Exception
+ * thrown if an error occurs while instantiating the invokable class
*/
public RuntimeEnvironment(final TaskDeploymentDescriptor tdd,
final MemoryManager memoryManager, final IOManager ioManager,
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ea79186b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionGraph.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionGraph.java
index c5059f9..93e0a25 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionGraph.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionGraph.java
@@ -399,17 +399,6 @@ public class ExecutionGraph implements ExecutionListener {
final ExecutionVertex sev = entry.getValue();
final ExecutionGroupVertex sgv = sev.getGroupVertex();
- // First compare number of output gates
- if (sjv.getNumberOfForwardConnections() != sgv.getEnvironment().getNumberOfOutputGates()) {
- throw new GraphConversionException("Job and execution vertex " + sjv.getName()
- + " have different number of outputs");
- }
-
- if (sjv.getNumberOfBackwardConnections() != sgv.getEnvironment().getNumberOfInputGates()) {
- throw new GraphConversionException("Job and execution vertex " + sjv.getName()
- + " have different number of inputs");
- }
-
// First, build the group edges
for (int i = 0; i < sjv.getNumberOfForwardConnections(); ++i) {
final JobEdge edge = sjv.getForwardConnection(i);
@@ -488,16 +477,13 @@ public class ExecutionGraph implements ExecutionListener {
final InputSplit[] inputSplits;
- // let the task code compute the input splits
- if (groupVertex.getEnvironment().getInvokable() instanceof AbstractInputTask) {
- try {
- inputSplits = ((AbstractInputTask<?>) groupVertex.getEnvironment().getInvokable())
- .computeInputSplits(jobVertex.getNumberOfSubtasks());
- } catch (Exception e) {
- throw new GraphConversionException("Cannot compute input splits for " + groupVertex.getName(), e);
- }
- } else {
- throw new GraphConversionException("JobInputVertex contained a task class which was not an input task.");
+ final Class<? extends InputSplit> inputSplitType = jobInputVertex.getInputSplitType();
+
+ try{
+ inputSplits = jobInputVertex.getInputSplits(jobVertex.getNumberOfSubtasks());
+ }catch(Exception e) {
+ throw new GraphConversionException("Cannot compute input splits for " + groupVertex.getName() + ": "
+ + StringUtils.stringifyException(e));
}
if (inputSplits == null) {
@@ -507,13 +493,19 @@ public class ExecutionGraph implements ExecutionListener {
+ " input splits");
}
- // assign input splits
+ // assign input splits and type
groupVertex.setInputSplits(inputSplits);
+ groupVertex.setInputSplitType(inputSplitType);
}
- // TODO: This is a quick workaround, problem can be solved in a more generic way
- if (jobVertex instanceof JobFileOutputVertex) {
- final JobFileOutputVertex jbov = (JobFileOutputVertex) jobVertex;
- jobVertex.getConfiguration().setString("outputPath", jbov.getFilePath().toString());
+
+ if(jobVertex instanceof JobOutputVertex){
+ final JobOutputVertex jobOutputVertex = (JobOutputVertex) jobVertex;
+
+ final OutputFormat<?> outputFormat = jobOutputVertex.getOutputFormat();
+
+ if(outputFormat != null){
+ outputFormat.initialize(groupVertex.getConfiguration());
+ }
}
// Add group vertex to initial execution stage
@@ -796,48 +788,6 @@ public class ExecutionGraph implements ExecutionListener {
}
/**
- * Retrieves the maximum parallel degree of the job represented by this execution graph
- */
- public int getMaxNumberSubtasks() {
- int maxDegree = 0;
- final Iterator<ExecutionStage> stageIterator = this.stages.iterator();
-
- while(stageIterator.hasNext()){
- final ExecutionStage stage = stageIterator.next();
-
- int maxPerStageDegree = stage.getMaxNumberSubtasks();
-
- if(maxPerStageDegree > maxDegree){
- maxDegree = maxPerStageDegree;
- }
- }
-
- return maxDegree;
- }
-
- /**
- * Retrieves the number of required slots to run this execution graph
- * @return
- */
- public int getRequiredSlots(){
- int maxRequiredSlots = 0;
-
- final Iterator<ExecutionStage> stageIterator = this.stages.iterator();
-
- while(stageIterator.hasNext()){
- final ExecutionStage stage = stageIterator.next();
-
- int requiredSlots = stage.getRequiredSlots();
-
- if(requiredSlots > maxRequiredSlots){
- maxRequiredSlots = requiredSlots;
- }
- }
-
- return maxRequiredSlots;
- }
-
- /**
* Returns the stage which is currently executed.
*
* @return the currently executed stage or <code>null</code> if the job execution is already completed
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ea79186b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionGroupVertex.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionGroupVertex.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionGroupVertex.java
index c865609..91e9e53 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionGroupVertex.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionGroupVertex.java
@@ -114,6 +114,11 @@ public final class ExecutionGroupVertex {
private volatile InputSplit[] inputSplits = null;
/**
+ * Input split type
+ */
+ private volatile Class<? extends InputSplit> inputSplitType = null;
+
+ /**
* The execution stage this vertex belongs to.
*/
private volatile ExecutionStage executionStage = null;
@@ -129,11 +134,6 @@ public final class ExecutionGroupVertex {
private final Class<? extends AbstractInvokable> invokableClass;
/**
- * The environment created to execute the vertex's task.
- */
- private final RuntimeEnvironment environment;
-
- /**
* Constructs a new group vertex.
*
* @param name
@@ -177,9 +177,6 @@ public final class ExecutionGroupVertex {
this.executionSignature = signature;
this.invokableClass = invokableClass;
-
- this.environment = new RuntimeEnvironment(executionGraph.getJobID(), name, invokableClass, configuration,
- executionGraph.getJobConfiguration());
}
/**
@@ -192,16 +189,6 @@ public final class ExecutionGroupVertex {
}
/**
- * Returns the environment of the instantiated {@link AbstractInvokable} object.
- *
- * @return the environment of the instantiated {@link AbstractInvokable} object
- */
- public RuntimeEnvironment getEnvironment() {
-
- return this.environment;
- }
-
- /**
* Sets the execution stage this group vertex is associated with.
*
* @param executionStage
@@ -407,20 +394,6 @@ public final class ExecutionGroupVertex {
}
}
- // Make sure the value of newNumber is valid
- // TODO: Move these checks to some other place
- /*
- * if (this.getMinimumNumberOfGroupMember() < 1) {
- * throw new GraphConversionException("The minimum number of members is below 1 for group vertex "
- * + this.getName());
- * }
- * if ((this.getMaximumNumberOfGroupMembers() != -1)
- * && (this.getMaximumNumberOfGroupMembers() < this.getMinimumNumberOfGroupMember())) {
- * throw new GraphConversionException(
- * "The maximum number of members is smaller than the minimum for group vertex " + this.getName());
- * }
- */
-
final ExecutionVertex originalVertex = this.getGroupMember(0);
int currentNumberOfExecutionVertices = this.getCurrentNumberOfGroupMembers();
@@ -453,6 +426,14 @@ public final class ExecutionGroupVertex {
}
/**
+ * Sets the input split type class
+ *
+ * @param inputSplitType Input split type class
+ */
+ public void setInputSplitType(final Class<? extends InputSplit> inputSplitType) { this.inputSplitType =
+ inputSplitType; }
+
+ /**
* Returns the input splits assigned to this group vertex.
*
* @return the input splits, possibly <code>null</code> if the group vertex does not represent an input vertex
@@ -462,6 +443,14 @@ public final class ExecutionGroupVertex {
return this.inputSplits;
}
+ /**
+ * Returns the input split type class
+ *
+ * @return the input split type class, possibly <code>null</code> if the group vertex does not represent an input
+ * vertex
+ */
+ public Class<? extends InputSplit> getInputSplitType() { return this.inputSplitType; }
+
public ExecutionGroupEdge getForwardEdge(int index) {
if (index < 0) {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ea79186b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/AbstractJobInputVertex.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/AbstractJobInputVertex.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/AbstractJobInputVertex.java
index 958ed9d..22b4d7c 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/AbstractJobInputVertex.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/AbstractJobInputVertex.java
@@ -13,6 +13,10 @@
package eu.stratosphere.nephele.jobgraph;
+import eu.stratosphere.core.io.InputSplit;
+
+import java.io.IOException;
+
/**
* An abstract base class for input vertices in Nephele.
*
@@ -34,4 +38,7 @@ public abstract class AbstractJobInputVertex extends AbstractJobVertex {
jobGraph.addVertex(this);
}
+
+ public abstract Class<? extends InputSplit> getInputSplitType();
+ public abstract InputSplit[] getInputSplits(int minNumSplits) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ea79186b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/AbstractJobVertex.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/AbstractJobVertex.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/AbstractJobVertex.java
index d64c622..7cec46a 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/AbstractJobVertex.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/AbstractJobVertex.java
@@ -573,65 +573,15 @@ public abstract class AbstractJobVertex implements IOReadableWritable {
}
/**
- * Performs task specific checks if the
- * respective task has been configured properly.
- *
- * @param invokable
- * an instance of the task this vertex represents
+ * Performs check whether the vertex has been properly configured
+ *
+ * @param configuration
+ * configuration of this vertex
* @throws IllegalConfigurationException
* thrown if the respective tasks is not configured properly
*/
- public void checkConfiguration(final AbstractInvokable invokable) throws IllegalConfigurationException {
-
- if (invokable == null) {
- throw new IllegalArgumentException("Argument invokable is null");
- }
-
- // see if the task itself has a valid configuration
- // because this is user code running on the master, we embed it in a catch-all block
- try {
- invokable.checkConfiguration();
- } catch (IllegalConfigurationException icex) {
- throw icex; // simply forward
- } catch (Throwable t) {
- throw new IllegalConfigurationException("Checking the invokable's configuration caused an error: "
- + StringUtils.stringifyException(t));
- }
- }
-
- /**
- * Returns the minimum number of subtasks the respective task
- * must be split into at runtime.
- *
- * @param invokable
- * an instance of the task this vertex represents
- * @return the minimum number of subtasks the respective task must be split into at runtime
- */
- public int getMinimumNumberOfSubtasks(final AbstractInvokable invokable) {
-
- if (invokable == null) {
- throw new IllegalArgumentException("Argument invokable is null");
- }
-
- return invokable.getMinimumNumberOfSubtasks();
- }
-
- /**
- * Returns the maximum number of subtasks the respective task
- * can be split into at runtime.
- *
- * @param invokable
- * an instance of the task this vertex represents
- * @return the maximum number of subtasks the respective task can be split into at runtime, <code>-1</code> for
- * infinity
- */
- public int getMaximumNumberOfSubtasks(final AbstractInvokable invokable) {
-
- if (invokable == null) {
- throw new IllegalArgumentException("Argument invokable is null");
- }
-
- return invokable.getMaximumNumberOfSubtasks();
+ public void checkConfiguration(final Configuration configuration) throws IllegalConfigurationException {
+ //default configuration check
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ea79186b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobFileInputVertex.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobFileInputVertex.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobFileInputVertex.java
deleted file mode 100644
index 65685ee..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobFileInputVertex.java
+++ /dev/null
@@ -1,195 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.jobgraph;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import eu.stratosphere.configuration.IllegalConfigurationException;
-import eu.stratosphere.core.fs.FileStatus;
-import eu.stratosphere.core.fs.FileSystem;
-import eu.stratosphere.core.fs.Path;
-import eu.stratosphere.nephele.template.AbstractFileInputTask;
-import eu.stratosphere.nephele.template.AbstractInvokable;
-import eu.stratosphere.util.StringUtils;
-
-/**
- * A JobFileInputVertex is a specific subtype of a {@link AbstractJobInputVertex} and is designed
- * for Nephele tasks which read data from a local or distributed file system. As every job input vertex
- * A JobFileInputVertex must not have any further input.
- *
- */
-public final class JobFileInputVertex extends AbstractJobInputVertex {
-
- /**
- * The path pointing to the input file/directory.
- */
- private Path path = null;
-
- /**
- * Creates a new job file input vertex with the specified name.
- *
- * @param name
- * the name of the new job file input vertex
- * @param id
- * the ID of this vertex
- * @param jobGraph
- * the job graph this vertex belongs to
- */
- public JobFileInputVertex(final String name, final JobVertexID id, final JobGraph jobGraph) {
- super(name, id, jobGraph);
- }
-
- /**
- * Creates a new job file input vertex with the specified name.
- *
- * @param name
- * the name of the new job file input vertex
- * @param jobGraph
- * the job graph this vertex belongs to
- */
- public JobFileInputVertex(final String name, final JobGraph jobGraph) {
- super(name, null, jobGraph);
- }
-
- /**
- * Creates a new job file input vertex.
- *
- * @param jobGraph
- * the job graph this vertex belongs to
- */
- public JobFileInputVertex(final JobGraph jobGraph) {
- super(null, null, jobGraph);
- }
-
- /**
- * Sets the path of the file the job file input vertex's task should read from.
- *
- * @param path
- * the path of the file the job file input vertex's task should read from
- */
- public void setFilePath(final Path path) {
- this.path = path;
- }
-
- /**
- * Returns the path of the file the job file input vertex's task should read from.
- *
- * @return the path of the file the job file input vertex's task should read from or <code>null</code> if no path
- * has yet been set
- */
- public Path getFilePath() {
- return this.path;
- }
-
- /**
- * Sets the class of the vertex's input task.
- *
- * @param inputClass
- * the class of the vertex's input task.
- */
- public void setFileInputClass(final Class<? extends AbstractFileInputTask> inputClass) {
- this.invokableClass = inputClass;
- }
-
- /**
- * Returns the class of the vertex's input task.
- *
- * @return the class of the vertex's input task or <code>null</code> if no task has yet been set
- */
- @SuppressWarnings("unchecked")
- public Class<? extends AbstractFileInputTask> getFileInputClass() {
- return (Class<? extends AbstractFileInputTask>) this.invokableClass;
- }
-
-
- @Override
- public void read(final DataInput in) throws IOException {
- super.read(in);
-
- // Read path of the input file
- final boolean isNotNull = in.readBoolean();
- if (isNotNull) {
- this.path = new Path();
- this.path.read(in);
- }
- }
-
-
- @Override
- public void write(final DataOutput out) throws IOException {
- super.write(out);
-
- // Write out the path of the input file
- if (this.path == null) {
- out.writeBoolean(false);
- } else {
- out.writeBoolean(true);
- this.path.write(out);
- }
-
- }
-
-
- @Override
- public void checkConfiguration(final AbstractInvokable invokable) throws IllegalConfigurationException {
-
- // Check if the user has specified a path
- if (this.path == null) {
- throw new IllegalConfigurationException(this.getName() + " does not specify an input path");
- }
-
- // Check if the path is valid
- try {
- final FileSystem fs = this.path.getFileSystem();
- final FileStatus f = fs.getFileStatus(this.path);
- if (f == null) {
- throw new IOException(this.path.toString() + " led to a null object");
- }
- } catch (IOException e) {
- throw new IllegalConfigurationException("Cannot access file or directory: "
- + StringUtils.stringifyException(e));
- }
-
- // register the path in the configuration
- invokable.getTaskConfiguration()
- .setString(AbstractFileInputTask.INPUT_PATH_CONFIG_KEY, this.path.toString());
-
- // Finally, see if the task itself has a valid configuration
- super.checkConfiguration(invokable);
- }
-
-
- @Override
- public int getMaximumNumberOfSubtasks(final AbstractInvokable invokable) {
-
- int numberOfBlocks = -1;
-
- if (this.path == null) {
- return -1;
- }
-
- try {
- final FileSystem fs = this.path.getFileSystem();
- final FileStatus f = fs.getFileStatus(this.path);
- numberOfBlocks = fs.getNumberOfBlocks(f);
-
- } catch (IOException e) {
- return -1;
- }
-
- return (int) Math.min(numberOfBlocks, invokable.getMaximumNumberOfSubtasks());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ea79186b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobFileOutputVertex.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobFileOutputVertex.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobFileOutputVertex.java
deleted file mode 100644
index 645041a..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobFileOutputVertex.java
+++ /dev/null
@@ -1,198 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.jobgraph;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-
-import eu.stratosphere.configuration.IllegalConfigurationException;
-import eu.stratosphere.core.fs.FileStatus;
-import eu.stratosphere.core.fs.FileSystem;
-import eu.stratosphere.core.fs.Path;
-import eu.stratosphere.nephele.template.AbstractFileOutputTask;
-import eu.stratosphere.nephele.template.AbstractInvokable;
-
-/**
- * A JobFileOutputVertex is a specific subtype of a {@link AbstractJobOutputVertex} and is designed
- * for Nephele tasks which write data to a local or distributed file system. As every job output vertex
- * A JobFileOutputVertex must not have any further output.
- *
- */
-public class JobFileOutputVertex extends AbstractJobOutputVertex {
-
- /**
- * The path pointing to the output file/directory.
- */
- private Path path = null;
-
- /**
- * Creates a new job file output vertex with the specified name.
- *
- * @param name
- * the name of the new job file output vertex
- * @param id
- * the ID of this vertex
- * @param jobGraph
- * the job graph this vertex belongs to
- */
- public JobFileOutputVertex(final String name, final JobVertexID id, final JobGraph jobGraph) {
- super(name, id, jobGraph);
- }
-
- /**
- * Creates a new job file output vertex with the specified name.
- *
- * @param name
- * the name of the new job file output vertex
- * @param jobGraph
- * the job graph this vertex belongs to
- */
- public JobFileOutputVertex(final String name, final JobGraph jobGraph) {
- super(name, null, jobGraph);
- }
-
- /**
- * Creates a new job file input vertex.
- *
- * @param jobGraph
- * the job graph this vertex belongs to
- */
- public JobFileOutputVertex(final JobGraph jobGraph) {
- super(null, null, jobGraph);
- }
-
- /**
- * Sets the path of the file the job file input vertex's task should write to.
- *
- * @param path
- * the path of the file the job file input vertex's task should write to
- */
- public void setFilePath(final Path path) {
- this.path = path;
- }
-
- /**
- * Returns the path of the file the job file output vertex's task should write to.
- *
- * @return the path of the file the job file output vertex's task should write to or <code>null</code> if no path
- * has yet been set
- */
-
- public Path getFilePath() {
- return this.path;
- }
-
- /**
- * Sets the class of the vertex's output task.
- *
- * @param outputClass
- * the class of the vertex's output task.
- */
- public void setFileOutputClass(final Class<? extends AbstractFileOutputTask> outputClass) {
- this.invokableClass = outputClass;
- }
-
- /**
- * Returns the class of the vertex's output task.
- *
- * @return the class of the vertex's output task or <code>null</code> if no task has yet been set
- */
- @SuppressWarnings("unchecked")
- public Class<? extends AbstractFileOutputTask> getFileOutputClass() {
- return (Class<? extends AbstractFileOutputTask>) this.invokableClass;
- }
-
-
- @Override
- public void read(final DataInput in) throws IOException {
- super.read(in);
-
- // Read path of the input file
- boolean isNotNull = in.readBoolean();
- if (isNotNull) {
- this.path = new Path();
- this.path.read(in);
- }
- }
-
-
- @Override
- public void write(final DataOutput out) throws IOException {
- super.write(out);
-
- // Write out the path of the input file
- if (this.path == null) {
- out.writeBoolean(false);
- } else {
- out.writeBoolean(true);
- this.path.write(out);
- }
- }
-
-
- @Override
- public void checkConfiguration(final AbstractInvokable invokable) throws IllegalConfigurationException {
-
- // Check if the user has specified a path
- if (this.path == null) {
- throw new IllegalConfigurationException(this.getName() + " does not specify an output path");
- }
-
- super.checkConfiguration(invokable);
- }
-
-
- @Override
- public int getMaximumNumberOfSubtasks(final AbstractInvokable invokable) {
-
- if (this.path == null) {
- return 0;
- }
-
- // Check if the path is valid
- try {
- final FileSystem fs = path.getFileSystem();
-
- try {
- final FileStatus f = fs.getFileStatus(path);
-
- if (f == null) {
- return 1;
- }
-
- // If the path points to a directory we allow an infinity number of subtasks
- if (f.isDir()) {
- return -1;
- }
- } catch (FileNotFoundException fnfex) {
- // The exception is thrown if the requested file/directory does not exist.
- // if the degree of parallelism is > 1, we create a directory for this path
- if (getNumberOfSubtasks() > 1) {
- fs.mkdirs(path);
- return -1;
- } else {
- // a none existing file and a degree of parallelism that is one
- return 1;
- }
- }
- } catch (IOException e) {
- // any other kind of I/O exception: we assume only a degree of one here
- return 1;
- }
-
- return 1;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ea79186b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobGenericInputVertex.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobGenericInputVertex.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobGenericInputVertex.java
deleted file mode 100644
index 658ea0d..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobGenericInputVertex.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.jobgraph;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import eu.stratosphere.configuration.IllegalConfigurationException;
-import eu.stratosphere.core.io.StringRecord;
-import eu.stratosphere.nephele.execution.librarycache.LibraryCacheManager;
-import eu.stratosphere.nephele.template.AbstractInputTask;
-import eu.stratosphere.nephele.template.AbstractInvokable;
-import eu.stratosphere.util.StringUtils;
-
-public class JobGenericInputVertex extends JobInputVertex
-{
- /**
- * Class of input task.
- */
- protected Class<? extends AbstractInputTask<?>> inputClass = null;
-
- /**
- * Creates a new job input vertex with the specified name.
- *
- * @param name The name of the new job file input vertex.
- * @param id The ID of this vertex.
- * @param jobGraph The job graph this vertex belongs to.
- */
- public JobGenericInputVertex(String name, JobVertexID id, JobGraph jobGraph) {
- super(name, id, jobGraph);
- }
-
- /**
- * Creates a new job file input vertex with the specified name.
- *
- * @param name The name of the new job file input vertex.
- * @param jobGraph The job graph this vertex belongs to.
- */
- public JobGenericInputVertex(String name, JobGraph jobGraph) {
- super(name, null, jobGraph);
- }
-
- /**
- * Creates a new job file input vertex.
- *
- * @param jobGraph The job graph this vertex belongs to.
- */
- public JobGenericInputVertex(JobGraph jobGraph) {
- super(null, null, jobGraph);
- }
-
- /**
- * Sets the class of the vertex's input task.
- *
- * @param inputClass The class of the vertex's input task.
- */
- public void setInputClass(Class<? extends AbstractInputTask<?>> inputClass) {
- this.inputClass = inputClass;
- }
-
- /**
- * Returns the class of the vertex's input task.
- *
- * @return the class of the vertex's input task or <code>null</code> if no task has yet been set
- */
- public Class<? extends AbstractInputTask<?>> getInputClass() {
- return this.inputClass;
- }
-
-
- @SuppressWarnings("unchecked")
- @Override
- public void read(DataInput in) throws IOException
- {
- super.read(in);
-
- // Read class
- boolean isNotNull = in.readBoolean();
- if (isNotNull) {
- // Read the name of the class and try to instantiate the class object
- final ClassLoader cl = LibraryCacheManager.getClassLoader(this.getJobGraph().getJobID());
- if (cl == null) {
- throw new IOException("Cannot find class loader for vertex " + getID());
- }
-
- // Read the name of the expected class
- final String className = StringRecord.readString(in);
-
- try {
- this.inputClass = (Class<? extends AbstractInputTask<?>>) Class.forName(className, true, cl).asSubclass(AbstractInputTask.class);
- }
- catch (ClassNotFoundException cnfe) {
- throw new IOException("Class " + className + " not found in one of the supplied jar files: "
- + StringUtils.stringifyException(cnfe));
- }
- catch (ClassCastException ccex) {
- throw new IOException("Class " + className + " is not a subclass of "
- + AbstractInputTask.class.getName() + ": " + StringUtils.stringifyException(ccex));
- }
- }
- }
-
-
- @Override
- public void write(DataOutput out) throws IOException
- {
- super.write(out);
-
- // Write out the name of the class
- if (this.inputClass == null) {
- out.writeBoolean(false);
- } else {
- out.writeBoolean(true);
- StringRecord.writeString(out, this.inputClass.getName());
- }
- }
-
-
- @Override
- public void checkConfiguration(AbstractInvokable invokable) throws IllegalConfigurationException
- {
- // see if the task itself has a valid configuration
- // because this is user code running on the master, we embed it in a catch-all block
- try {
- invokable.checkConfiguration();
- }
- catch (IllegalConfigurationException icex) {
- throw icex; // simply forward
- }
- catch (Throwable t) {
- throw new IllegalConfigurationException("Checking the invokable's configuration caused an error: "
- + StringUtils.stringifyException(t));
- }
- }
-
-
- @Override
- public Class<? extends AbstractInvokable> getInvokableClass() {
-
- return this.inputClass;
- }
-
-
- @Override
- public int getMaximumNumberOfSubtasks(AbstractInvokable invokable)
- {
- return invokable.getMaximumNumberOfSubtasks();
- }
-
-
- @Override
- public int getMinimumNumberOfSubtasks(AbstractInvokable invokable) {
-
- return invokable.getMinimumNumberOfSubtasks();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ea79186b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobGenericOutputVertex.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobGenericOutputVertex.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobGenericOutputVertex.java
deleted file mode 100644
index a5b0665..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobGenericOutputVertex.java
+++ /dev/null
@@ -1,182 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.jobgraph;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import eu.stratosphere.configuration.IllegalConfigurationException;
-import eu.stratosphere.core.io.StringRecord;
-import eu.stratosphere.nephele.execution.librarycache.LibraryCacheManager;
-import eu.stratosphere.nephele.template.AbstractInvokable;
-import eu.stratosphere.nephele.template.AbstractOutputTask;
-import eu.stratosphere.util.StringUtils;
-
-/**
- * A JobGenericOutputVertex is a specific subtype of a {@link JobOutputVertex} and is designed
- * for Nephele tasks which sink data in a not further specified way. As every job output vertex,
- * a JobGenericOutputVertex must not have any further output.
- *
- */
-public class JobGenericOutputVertex extends JobOutputVertex {
-
- /**
- * The class of the output task.
- */
- protected Class<? extends AbstractOutputTask> outputClass = null;
-
-
- /**
- * Creates a new job file output vertex with the specified name.
- *
- * @param name
- * the name of the new job file output vertex
- * @param id
- * the ID of this vertex
- * @param jobGraph
- * the job graph this vertex belongs to
- */
- public JobGenericOutputVertex(String name, JobVertexID id, JobGraph jobGraph) {
- super(name, id, jobGraph);
- }
-
- /**
- * Creates a new job file output vertex with the specified name.
- *
- * @param name
- * the name of the new job file output vertex
- * @param jobGraph
- * the job graph this vertex belongs to
- */
- public JobGenericOutputVertex(String name, JobGraph jobGraph) {
- super(name, null, jobGraph);
- }
-
- /**
- * Creates a new job file input vertex.
- *
- * @param jobGraph
- * the job graph this vertex belongs to
- */
- public JobGenericOutputVertex(JobGraph jobGraph) {
- super(null, null, jobGraph);
- }
-
- /**
- * Sets the class of the vertex's output task.
- *
- * @param outputClass The class of the vertex's output task.
- */
- public void setOutputClass(Class<? extends AbstractOutputTask> outputClass) {
- this.outputClass = outputClass;
- }
-
- /**
- * Returns the class of the vertex's output task.
- *
- * @return The class of the vertex's output task or <code>null</code> if no task has yet been set.
- */
- public Class<? extends AbstractOutputTask> getOutputClass() {
- return this.outputClass;
- }
-
-
- @Override
- public void read(DataInput in) throws IOException {
- super.read(in);
-
- // Read class
- boolean isNotNull = in.readBoolean();
- if (isNotNull) {
-
- // Read the name of the class and try to instantiate the class object
- final ClassLoader cl = LibraryCacheManager.getClassLoader(this.getJobGraph().getJobID());
- if (cl == null) {
- throw new IOException("Cannot find class loader for vertex " + getID());
- }
-
- // Read the name of the expected class
- final String className = StringRecord.readString(in);
-
- try {
- this.outputClass = Class.forName(className, true, cl).asSubclass(AbstractOutputTask.class);
- }
- catch (ClassNotFoundException cnfe) {
- throw new IOException("Class " + className + " not found in one of the supplied jar files: "
- + StringUtils.stringifyException(cnfe));
- }
- catch (ClassCastException ccex) {
- throw new IOException("Class " + className + " is not a subclass of "
- + AbstractOutputTask.class.getName() + ": " + StringUtils.stringifyException(ccex));
- }
- }
- }
-
-
- @Override
- public void write(DataOutput out) throws IOException {
- super.write(out);
-
- // Write out the name of the class
- if (this.outputClass == null) {
- out.writeBoolean(false);
- }
- else {
- out.writeBoolean(true);
- StringRecord.writeString(out, this.outputClass.getName());
- }
- }
-
-
- @Override
- public void checkConfiguration(AbstractInvokable invokable) throws IllegalConfigurationException
- {
- // see if the task itself has a valid configuration
- // because this is user code running on the master, we embed it in a catch-all block
- try {
- invokable.checkConfiguration();
- }
- catch (IllegalConfigurationException icex) {
- throw icex; // simply forward
- }
- catch (Throwable t) {
- throw new IllegalConfigurationException("Checking the invokable's configuration caused an error: "
- + StringUtils.stringifyException(t));
- }
- }
-
-
- @Override
- public Class<? extends AbstractInvokable> getInvokableClass() {
-
- return this.outputClass;
- }
-
-
- @Override
- public int getMaximumNumberOfSubtasks(AbstractInvokable invokable)
- {
- // Delegate call to invokable
- return invokable.getMaximumNumberOfSubtasks();
- }
-
-
- @Override
- public int getMinimumNumberOfSubtasks(AbstractInvokable invokable)
- {
- // Delegate call to invokable
- return invokable.getMinimumNumberOfSubtasks();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ea79186b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobInputVertex.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobInputVertex.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobInputVertex.java
index a22d7ca..9e5f6c7 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobInputVertex.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobInputVertex.java
@@ -13,9 +13,21 @@
package eu.stratosphere.nephele.jobgraph;
+import eu.stratosphere.api.common.io.InputFormat;
+import eu.stratosphere.api.common.operators.util.UserCodeObjectWrapper;
+import eu.stratosphere.api.common.operators.util.UserCodeWrapper;
+import eu.stratosphere.api.common.typeutils.TypeSerializerFactory;
+import eu.stratosphere.configuration.Configuration;
+import eu.stratosphere.core.io.InputSplit;
+import eu.stratosphere.nephele.execution.librarycache.LibraryCacheManager;
import eu.stratosphere.nephele.template.AbstractInputTask;
+import eu.stratosphere.pact.runtime.task.util.TaskConfig;
+
+import java.io.DataInput;
+import java.io.IOException;
public class JobInputVertex extends AbstractJobInputVertex {
+ private volatile InputFormat<?, ? extends InputSplit> inputFormat = null;
/**
* Creates a new job input vertex with the specified name.
@@ -55,7 +67,7 @@ public class JobInputVertex extends AbstractJobInputVertex {
/**
* Sets the class of the vertex's input task.
- *
+ *
* @param inputClass
* The class of the vertex's input task.
*/
@@ -72,4 +84,80 @@ public class JobInputVertex extends AbstractJobInputVertex {
public Class<? extends AbstractInputTask<?>> getInputClass() {
return (Class<? extends AbstractInputTask<?>>) this.invokableClass;
}
+
+ public void setInputFormat(UserCodeWrapper<? extends InputFormat<?, ? extends InputSplit>> inputFormatWrapper) {
+ TaskConfig config = new TaskConfig(this.getConfiguration());
+ config.setStubWrapper(inputFormatWrapper);
+
+ inputFormat = inputFormatWrapper.getUserCodeObject();
+ }
+
+ public void setInputFormat(InputFormat<?, ? extends InputSplit> inputFormat) {
+ this.inputFormat = inputFormat;
+
+ UserCodeWrapper<? extends InputFormat<?, ? extends InputSplit>> wrapper = new
+ UserCodeObjectWrapper<InputFormat<?, ? extends InputSplit>>(inputFormat);
+ TaskConfig config = new TaskConfig(this.getConfiguration());
+ config.setStubWrapper(wrapper);
+ }
+
+ public void setInputFormatParameters(Configuration inputFormatParameters){
+ TaskConfig config = new TaskConfig(this.getConfiguration());
+ config.setStubParameters(inputFormatParameters);
+
+ if(inputFormat == null){
+ throw new RuntimeException("There is no input format set in job vertex: " + this.getID());
+ }
+
+ inputFormat.configure(inputFormatParameters);
+ }
+
+ public void setOutputSerializer(TypeSerializerFactory<?> factory){
+ TaskConfig config = new TaskConfig(this.getConfiguration());
+ config.setOutputSerializer(factory);
+ }
+
+
+ @Override
+ public void read(final DataInput input) throws IOException{
+ super.read(input);
+
+ // load input format wrapper from the config
+ ClassLoader cl = null;
+
+ try{
+ cl = LibraryCacheManager.getClassLoader(this.getJobGraph().getJobID());
+ }
+ catch (IOException ioe) {
+ throw new RuntimeException("Usercode ClassLoader could not be obtained for job: " +
+ this.getJobGraph().getJobID(), ioe);
+ }
+
+ final Configuration config = this.getConfiguration();
+ config.setClassLoader(cl);
+ final TaskConfig taskConfig = new TaskConfig(config);
+
+ inputFormat = taskConfig.<InputFormat<?, InputSplit>>getStubWrapper(cl).getUserCodeObject(InputFormat.class,
+ cl);
+
+ inputFormat.configure(taskConfig.getStubParameters());
+ }
+
+ @Override
+ public Class<? extends InputSplit> getInputSplitType() {
+ if(inputFormat == null){
+ throw new RuntimeException("No input format has been set for job vertex: "+ this.getID());
+ }
+
+ return inputFormat.getInputSplitType();
+ }
+
+ @Override
+ public InputSplit[] getInputSplits(int minNumSplits) throws IOException {
+ if(inputFormat == null){
+ throw new RuntimeException("No input format has been set for job vertex: "+ this.getID());
+ }
+
+ return inputFormat.createInputSplits(minNumSplits);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ea79186b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobOutputVertex.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobOutputVertex.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobOutputVertex.java
index 31452c3..154e639 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobOutputVertex.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobOutputVertex.java
@@ -13,7 +13,16 @@
package eu.stratosphere.nephele.jobgraph;
+import eu.stratosphere.api.common.io.OutputFormat;
+import eu.stratosphere.api.common.operators.util.UserCodeObjectWrapper;
+import eu.stratosphere.api.common.operators.util.UserCodeWrapper;
+import eu.stratosphere.configuration.Configuration;
+import eu.stratosphere.nephele.execution.librarycache.LibraryCacheManager;
import eu.stratosphere.nephele.template.AbstractOutputTask;
+import eu.stratosphere.pact.runtime.task.util.TaskConfig;
+
+import java.io.DataInput;
+import java.io.IOException;
/**
* A JobOutputVertex is a specific subtype of a {@link AbstractJobOutputVertex} and is designed
@@ -22,6 +31,7 @@ import eu.stratosphere.nephele.template.AbstractOutputTask;
*
*/
public class JobOutputVertex extends AbstractJobOutputVertex {
+ private volatile OutputFormat<?> outputFormat = null;
/**
* Creates a new job file output vertex with the specified name.
@@ -78,4 +88,50 @@ public class JobOutputVertex extends AbstractJobOutputVertex {
public Class<? extends AbstractOutputTask> getOutputClass() {
return (Class<? extends AbstractOutputTask>) this.invokableClass;
}
+
+ public void setOutputFormat(UserCodeWrapper<? extends OutputFormat<?>> outputFormatWrapper){
+ TaskConfig config = new TaskConfig(this.getConfiguration());
+ config.setStubWrapper(outputFormatWrapper);
+ outputFormat = outputFormatWrapper.getUserCodeObject();
+ }
+
+ public void setOutputFormat(OutputFormat<?> outputFormat){
+ this.outputFormat = outputFormat;
+ UserCodeWrapper<? extends OutputFormat<?>> wrapper = new UserCodeObjectWrapper<OutputFormat<?>>
+ (outputFormat);
+ TaskConfig config = new TaskConfig(this.getConfiguration());
+ config.setStubWrapper(wrapper);
+ }
+
+ public void setOutputFormatParameters(Configuration parameters){
+ TaskConfig config = new TaskConfig(this.getConfiguration());
+ config.setStubParameters(parameters);
+
+ outputFormat.configure(parameters);
+ }
+
+ @Override
+ public void read(final DataInput input) throws IOException{
+ super.read(input);
+
+ ClassLoader cl = null;
+ try{
+ cl = LibraryCacheManager.getClassLoader(this.getJobGraph().getJobID());
+ }
+ catch (IOException ioe) {
+ throw new RuntimeException("Usercode ClassLoader could not be obtained for job: " +
+ this.getJobGraph().getJobID(), ioe);
+ }
+
+ final Configuration config = this.getConfiguration();
+ config.setClassLoader(cl);
+ final TaskConfig taskConfig = new TaskConfig(config);
+
+ if(taskConfig.hasStubWrapper()){
+ outputFormat = taskConfig.<OutputFormat<?> >getStubWrapper(cl).getUserCodeObject(OutputFormat.class,cl);
+ outputFormat.configure(taskConfig.getStubParameters());
+ }
+ }
+
+ public OutputFormat<?> getOutputFormat() { return outputFormat; }
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ea79186b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobTaskVertex.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobTaskVertex.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobTaskVertex.java
index 61eb66c..8672aeb 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobTaskVertex.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobTaskVertex.java
@@ -13,7 +13,6 @@
package eu.stratosphere.nephele.jobgraph;
-import eu.stratosphere.nephele.template.AbstractInvokable;
import eu.stratosphere.nephele.template.AbstractTask;
/**
@@ -84,20 +83,4 @@ public class JobTaskVertex extends AbstractJobVertex {
public Class<? extends AbstractTask> getTaskClass() {
return (Class<? extends AbstractTask>) this.invokableClass;
}
-
-
- @Override
- public int getMaximumNumberOfSubtasks(final AbstractInvokable invokable) {
-
- // Delegate call to invokable
- return invokable.getMaximumNumberOfSubtasks();
- }
-
-
- @Override
- public int getMinimumNumberOfSubtasks(final AbstractInvokable invokable) {
-
- // Delegate call to invokable
- return invokable.getMinimumNumberOfSubtasks();
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ea79186b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/splitassigner/InputSplitManager.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/splitassigner/InputSplitManager.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/splitassigner/InputSplitManager.java
index bbef991..790aca9 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/splitassigner/InputSplitManager.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/splitassigner/InputSplitManager.java
@@ -102,18 +102,7 @@ public final class InputSplitManager {
continue;
}
- final AbstractInvokable invokable = groupVertex.getEnvironment().getInvokable();
- if (!(invokable instanceof AbstractInputTask)) {
- LOG.error(groupVertex.getName() + " has " + inputSplits.length
- + " input splits, but is not of typt AbstractInputTask, ignoring...");
- continue;
- }
-
- @SuppressWarnings("unchecked")
- final AbstractInputTask<? extends InputSplit> inputTask = (AbstractInputTask<? extends InputSplit>) invokable;
- final Class<? extends InputSplit> splitType = inputTask.getInputSplitType();
-
- final InputSplitAssigner assigner = getAssignerByType(splitType, true);
+ final InputSplitAssigner assigner = getAssignerByType(groupVertex.getInputSplitType(), true);
// Add entry to cache for fast retrieval during the job execution
this.assignerCache.put(groupVertex, assigner);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ea79186b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/splitassigner/LocatableInputSplitAssigner.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/splitassigner/LocatableInputSplitAssigner.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/splitassigner/LocatableInputSplitAssigner.java
index 3717fbf..1e6929d 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/splitassigner/LocatableInputSplitAssigner.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/splitassigner/LocatableInputSplitAssigner.java
@@ -49,18 +49,8 @@ public final class LocatableInputSplitAssigner implements InputSplitAssigner {
@Override
public void registerGroupVertex(final ExecutionGroupVertex groupVertex) {
- // Do some sanity checks first
- final AbstractInvokable invokable = groupVertex.getEnvironment().getInvokable();
-
- // if (!(invokable instanceof AbstractFileInputTask)) {
- // LOG.error(groupVertex.getName() + " is not an input vertex, ignoring vertex...");
- // return;
- // }
-
- @SuppressWarnings("unchecked")
- final AbstractInputTask<? extends InputSplit> inputTask = (AbstractInputTask<? extends InputSplit>) invokable;
- if (!LocatableInputSplit.class.isAssignableFrom(inputTask.getInputSplitType())) {
- LOG.error(groupVertex.getName() + " produces input splits of type " + inputTask.getInputSplitType()
+ if (!LocatableInputSplit.class.isAssignableFrom(groupVertex.getInputSplitType())) {
+ LOG.error(groupVertex.getName() + " produces input splits of type " + groupVertex.getInputSplitType()
+ " and cannot be handled by this split assigner");
return;
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ea79186b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/splitassigner/file/FileInputSplitAssigner.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/splitassigner/file/FileInputSplitAssigner.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/splitassigner/file/FileInputSplitAssigner.java
index 7894334..048562c 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/splitassigner/file/FileInputSplitAssigner.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/splitassigner/file/FileInputSplitAssigner.java
@@ -50,18 +50,8 @@ public final class FileInputSplitAssigner implements InputSplitAssigner {
@Override
public void registerGroupVertex(final ExecutionGroupVertex groupVertex) {
- // Do some sanity checks first
- final AbstractInvokable invokable = groupVertex.getEnvironment().getInvokable();
-
- // if (!(invokable instanceof AbstractFileInputTask)) {
- // LOG.error(groupVertex.getName() + " is not an input vertex, ignoring vertex...");
- // return;
- // }
-
- @SuppressWarnings("unchecked")
- final AbstractInputTask<? extends InputSplit> inputTask = (AbstractInputTask<? extends InputSplit>) invokable;
- if (!FileInputSplit.class.equals(inputTask.getInputSplitType())) {
- LOG.error(groupVertex.getName() + " produces input splits of type " + inputTask.getInputSplitType()
+ if (!FileInputSplit.class.equals(groupVertex.getInputSplitType())) {
+ LOG.error(groupVertex.getName() + " produces input splits of type " + groupVertex.getInputSplitType()
+ " and cannot be handled by this split assigner");
return;
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ea79186b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/template/AbstractFileInputTask.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/template/AbstractFileInputTask.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/template/AbstractFileInputTask.java
deleted file mode 100644
index d16e757..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/template/AbstractFileInputTask.java
+++ /dev/null
@@ -1,201 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.template;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.List;
-
-import eu.stratosphere.core.fs.BlockLocation;
-import eu.stratosphere.core.fs.FileInputSplit;
-import eu.stratosphere.core.fs.FileStatus;
-import eu.stratosphere.core.fs.FileSystem;
-import eu.stratosphere.core.fs.Path;
-
-/**
- * Specialized subtype of {@link AbstractInputTask} for tasks which are supposed to generate input from
- * a file. In addition to {@link AbstractInputTask} this class includes a method to query file splits
- * which should be read during the task's execution.
- *
- */
-public abstract class AbstractFileInputTask extends AbstractInputTask<FileInputSplit> {
-
- public static final String INPUT_PATH_CONFIG_KEY = "input.path";
-
- /**
- * The fraction that the last split may be larger than the others.
- */
- private static final float MAX_SPLIT_SIZE_DISCREPANCY = 1.1f;
-
- // --------------------------------------------------------------------------------------------
-
- /**
- * Returns an iterator to a (possible empty) list of file input splits which is expected to be consumed by this
- * instance of the {@link AbstractFileInputTask}.
- *
- * @return an iterator to a (possible empty) list of file input splits.
- */
- public Iterator<FileInputSplit> getFileInputSplits() {
-
- return new InputSplitIterator<FileInputSplit>(getEnvironment().getInputSplitProvider());
- }
-
-
- @Override
- public FileInputSplit[] computeInputSplits(final int minNumSplits) throws IOException {
-
- final String pathURI = getTaskConfiguration().getString(INPUT_PATH_CONFIG_KEY, null);
- if (pathURI == null) {
- throw new IOException("The path to the file was not found in the runtime configuration.");
- }
-
- final Path path;
- try {
- path = new Path(pathURI);
- } catch (Exception iaex) {
- throw new IOException("Invalid file path specifier: ", iaex);
- }
-
- final List<FileInputSplit> inputSplits = new ArrayList<FileInputSplit>();
-
- // get all the files that are involved in the splits
- final List<FileStatus> files = new ArrayList<FileStatus>();
- long totalLength = 0;
-
- final FileSystem fs = path.getFileSystem();
- final FileStatus pathFile = fs.getFileStatus(path);
-
- if (pathFile.isDir()) {
- // input is directory. list all contained files
- final FileStatus[] dir = fs.listStatus(path);
- for (int i = 0; i < dir.length; i++) {
- if (!dir[i].isDir()) {
- files.add(dir[i]);
- totalLength += dir[i].getLen();
- }
- }
-
- } else {
- files.add(pathFile);
- totalLength += pathFile.getLen();
- }
-
- final long minSplitSize = 1;
- final long maxSplitSize = (minNumSplits < 1) ? Long.MAX_VALUE : (totalLength / minNumSplits +
- (totalLength % minNumSplits == 0 ? 0 : 1));
-
- // now that we have the files, generate the splits
- int splitNum = 0;
- for (final FileStatus file : files) {
-
- final long len = file.getLen();
- final long blockSize = file.getBlockSize();
-
- final long splitSize = Math.max(minSplitSize, Math.min(maxSplitSize, blockSize));
- final long halfSplit = splitSize >>> 1;
-
- final long maxBytesForLastSplit = (long) (splitSize * MAX_SPLIT_SIZE_DISCREPANCY);
-
- if (len > 0) {
-
- // get the block locations and make sure they are in order with respect to their offset
- final BlockLocation[] blocks = fs.getFileBlockLocations(file, 0, len);
- Arrays.sort(blocks);
-
- long bytesUnassigned = len;
- long position = 0;
-
- int blockIndex = 0;
-
- while (bytesUnassigned > maxBytesForLastSplit) {
- // get the block containing the majority of the data
- blockIndex = getBlockIndexForPosition(blocks, position, halfSplit, blockIndex);
- // create a new split
- final FileInputSplit fis = new FileInputSplit(splitNum++, file.getPath(), position, splitSize,
- blocks[blockIndex]
- .getHosts());
- inputSplits.add(fis);
-
- // adjust the positions
- position += splitSize;
- bytesUnassigned -= splitSize;
- }
-
- // assign the last split
- if (bytesUnassigned > 0) {
- blockIndex = getBlockIndexForPosition(blocks, position, halfSplit, blockIndex);
- final FileInputSplit fis = new FileInputSplit(splitNum++, file.getPath(), position,
- bytesUnassigned,
- blocks[blockIndex].getHosts());
- inputSplits.add(fis);
- }
- } else {
- // special case with a file of zero bytes size
- final BlockLocation[] blocks = fs.getFileBlockLocations(file, 0, 0);
- String[] hosts;
- if (blocks.length > 0) {
- hosts = blocks[0].getHosts();
- } else {
- hosts = new String[0];
- }
- final FileInputSplit fis = new FileInputSplit(splitNum++, file.getPath(), 0, 0, hosts);
- inputSplits.add(fis);
- }
- }
-
- return inputSplits.toArray(new FileInputSplit[inputSplits.size()]);
- }
-
- /**
- * Retrieves the index of the <tt>BlockLocation</tt> that contains the part of the file described by the given
- * offset.
- *
- * @param blocks
- * The different blocks of the file. Must be ordered by their offset.
- * @param offset
- * The offset of the position in the file.
- * @param startIndex
- * The earliest index to look at.
- * @return The index of the block containing the given position.
- */
- private final int getBlockIndexForPosition(final BlockLocation[] blocks, final long offset,
- final long halfSplitSize, final int startIndex) {
-
- // go over all indexes after the startIndex
- for (int i = startIndex; i < blocks.length; i++) {
- long blockStart = blocks[i].getOffset();
- long blockEnd = blockStart + blocks[i].getLength();
-
- if (offset >= blockStart && offset < blockEnd) {
- // got the block where the split starts
- // check if the next block contains more than this one does
- if (i < blocks.length - 1 && blockEnd - offset < halfSplitSize) {
- return i + 1;
- } else {
- return i;
- }
- }
- }
- throw new IllegalArgumentException("The given offset is not contained in the any block.");
- }
-
-
- @Override
- public Class<FileInputSplit> getInputSplitType() {
-
- return FileInputSplit.class;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ea79186b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/template/AbstractFileOutputTask.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/template/AbstractFileOutputTask.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/template/AbstractFileOutputTask.java
deleted file mode 100644
index 5f231c1..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/template/AbstractFileOutputTask.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.template;
-
-import eu.stratosphere.configuration.Configuration;
-import eu.stratosphere.core.fs.Path;
-
-/**
- * Specialized subtype of {@link AbstractOutputTask} for tasks which are supposed to write output to
- * a file.
- *
- */
-public abstract class AbstractFileOutputTask extends AbstractOutputTask {
-
- /**
- * Returns the output path which has been assigned to the original {@link JobFileOutputVertex}.
- *
- * @return the output path which has been assigned to the original {@link JobFileOutputVertex} or <code>null</code>
- * if the path cannot be retrieved
- */
- public Path getFileOutputPath() {
-
- // TODO: This is a quick workaround, problem can be solved in a more generic way
- final Configuration conf = getEnvironment().getTaskConfiguration();
-
- final String outputPath = conf.getString("outputPath", null);
-
- if (outputPath != null) {
- return new Path(outputPath);
- }
-
- return null;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ea79186b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/template/AbstractGenericInputTask.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/template/AbstractGenericInputTask.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/template/AbstractGenericInputTask.java
deleted file mode 100644
index cf6d916..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/template/AbstractGenericInputTask.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.template;
-
-import eu.stratosphere.core.io.GenericInputSplit;
-
-/**
- * An input task that processes generic input splits (partitions).
- */
-public abstract class AbstractGenericInputTask extends AbstractInputTask<GenericInputSplit> {
-
-
- @Override
- public GenericInputSplit[] computeInputSplits(final int requestedMinNumber) throws Exception {
- GenericInputSplit[] splits = new GenericInputSplit[requestedMinNumber];
- for (int i = 0; i < requestedMinNumber; i++) {
- splits[i] = new GenericInputSplit(i,requestedMinNumber);
- }
- return splits;
- }
-
-
- @Override
- public Class<GenericInputSplit> getInputSplitType() {
-
- return GenericInputSplit.class;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ea79186b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/template/AbstractInputTask.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/template/AbstractInputTask.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/template/AbstractInputTask.java
index 76c9377..88e4fcb 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/template/AbstractInputTask.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/template/AbstractInputTask.java
@@ -27,29 +27,6 @@ import eu.stratosphere.core.io.InputSplit;
public abstract class AbstractInputTask<T extends InputSplit> extends AbstractInvokable {
/**
- * This method computes the different splits of the input that can be processed in parallel. It needs
- * to be implemented by classes that describe input tasks.
- * <p>
- * Note that this method does not return the input splits for the task instance only, but it computes all splits for
- * all parallel instances. Those computed splits are then assigned to the individual task instances by the Job
- * Manager. To obtain the input splits for the current task instance, use the {@link #getTaskInputSplits()} method.
- *
- * @param requestedMinNumber
- * The minimum number of splits to create. This is a hint by the system how many splits
- * should be generated at least (typically because there are that many parallel task
- * instances), but it is no hard constraint
- * @return The input splits for the input to be processed by all instances of this input task
- */
- public abstract T[] computeInputSplits(int requestedMinNumber) throws Exception;
-
- /**
- * Returns the type of input splits that is generated by this input task.
- *
- * @return the type of input splits that is generated by this input task
- */
- public abstract Class<T> getInputSplitType();
-
- /**
* Returns an iterator to a (possible empty) list of input splits which is expected to be consumed by this
* instance of the {@link AbstractInputTask}.
*
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ea79186b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/template/AbstractInvokable.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/template/AbstractInvokable.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/template/AbstractInvokable.java
index a37f592..79390f8 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/template/AbstractInvokable.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/template/AbstractInvokable.java
@@ -62,39 +62,6 @@ public abstract class AbstractInvokable {
return this.environment;
}
- /**
- * Overwrite this method to implement task specific checks if the
- * respective task has been configured properly.
- *
- * @throws IllegalConfigurationException
- * thrown if the respective tasks is not configured properly
- */
- public void checkConfiguration() throws IllegalConfigurationException {
- // The default implementation does nothing
- }
-
- /**
- * Overwrite this method to provide the minimum number of subtasks the respective task
- * must be split into at runtime.
- *
- * @return the minimum number of subtasks the respective task must be split into at runtime
- */
- public int getMinimumNumberOfSubtasks() {
- // The default implementation always returns 1
- return 1;
- }
-
- /**
- * Overwrite this method to provide the maximum number of subtasks the respective task
- * can be split into at runtime.
- *
- * @return the maximum number of subtasks the respective task can be split into at runtime, <code>-1</code> for
- * infinity
- */
- public int getMaximumNumberOfSubtasks() {
- // The default implementation always returns -1
- return -1;
- }
/**
* Returns the current number of subtasks the respective task is split into.
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ea79186b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/template/GenericInputTask.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/template/GenericInputTask.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/template/GenericInputTask.java
deleted file mode 100644
index c2cbbc1..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/template/GenericInputTask.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.template;
-
-import eu.stratosphere.core.io.GenericInputSplit;
-
-/**
- * An input task that processes generic input splits (partitions).
- */
-public abstract class GenericInputTask extends AbstractInputTask<GenericInputSplit> {
-
-
- @Override
- public GenericInputSplit[] computeInputSplits(final int requestedMinNumber) throws Exception {
- GenericInputSplit[] splits = new GenericInputSplit[requestedMinNumber];
- for (int i = 0; i < requestedMinNumber; i++) {
- splits[i] = new GenericInputSplit(i, requestedMinNumber);
- }
- return splits;
- }
-
-
- @Override
- public Class<GenericInputSplit> getInputSplitType() {
-
- return GenericInputSplit.class;
- }
-}