You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/06/22 23:47:42 UTC
[21/22] git commit: Merge fix to omit input/output registering on
JobManager Rework Invokable Task Hierarchy
Merge fix to omit input/output registering on JobManager
Rework Invokable Task Hierarchy
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/8c1d82a8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/8c1d82a8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/8c1d82a8
Branch: refs/heads/master
Commit: 8c1d82a8ec674de6525319501c6be2674e3143f1
Parents: 2692643
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Jun 20 21:13:23 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Sun Jun 22 21:07:21 2014 +0200
----------------------------------------------------------------------
.../stratosphere/client/program/ClientTest.java | 29 +-
.../plantranslate/NepheleJobGraphGenerator.java | 66 ++---
.../stratosphere/api/common/PlanExecutor.java | 2 -
.../api/common/io/FileOutputFormat.java | 291 +++++--------------
.../stratosphere/api/common/io/FormatUtil.java | 1 -
.../api/common/io/InitializeOnMaster.java | 35 +++
.../api/common/io/OutputFormat.java | 15 +-
.../configuration/Configuration.java | 16 +-
.../eu/stratosphere/core/fs/FileSystem.java | 3 +-
.../eu/stratosphere/core/io/StringRecord.java | 6 +-
.../eu/stratosphere/util/IterableIterator.java | 4 +-
.../api/java/io/PrintingOutputFormat.java | 3 -
.../nephele/execution/RuntimeEnvironment.java | 1 -
.../nephele/executiongraph/ExecutionGraph.java | 104 ++++---
.../executiongraph/ExecutionGroupVertex.java | 1 -
.../jobgraph/AbstractJobInputVertex.java | 19 +-
.../jobgraph/AbstractJobOutputVertex.java | 9 +-
.../nephele/jobgraph/AbstractJobVertex.java | 31 +-
.../stratosphere/nephele/jobgraph/JobGraph.java | 31 +-
.../nephele/jobgraph/JobInputVertex.java | 155 ++--------
.../nephele/jobgraph/JobOutputVertex.java | 132 ++-------
.../nephele/jobgraph/JobTaskVertex.java | 51 +---
.../nephele/jobmanager/JobManager.java | 4 +-
.../splitassigner/InputSplitManager.java | 2 -
.../LocatableInputSplitAssigner.java | 2 -
.../file/FileInputSplitAssigner.java | 5 -
.../nephele/taskmanager/TaskManager.java | 2 +-
.../nephele/template/AbstractInputTask.java | 79 -----
.../nephele/template/AbstractInvokable.java | 1 -
.../nephele/template/AbstractOutputTask.java | 22 --
.../nephele/template/AbstractTask.java | 21 --
.../runtime/iterative/io/FakeOutputTask.java | 4 +-
.../task/IterationSynchronizationSinkTask.java | 4 +-
.../iterative/task/IterationTailPactTask.java | 8 +-
.../pact/runtime/task/DataSinkTask.java | 10 +-
.../pact/runtime/task/DataSourceTask.java | 109 +++----
.../pact/runtime/task/RegularPactTask.java | 16 +-
.../pact/runtime/task/util/TaskConfig.java | 6 +-
.../runtime/io/api/MutableRecordReader.java | 38 +--
.../runtime/io/api/RecordReader.java | 18 +-
.../runtime/io/api/RecordWriter.java | 22 +-
.../executiongraph/ExecutionGraphTest.java | 163 ++++++-----
.../ForwardTask1Input1Output.java | 4 +-
.../ForwardTask1Input2Outputs.java | 4 +-
.../ForwardTask2Inputs1Output.java | 4 +-
.../executiongraph/SelfCrossForwardTask.java | 13 +-
.../nephele/jobmanager/DoubleSourceTask.java | 132 +++++++++
.../nephele/jobmanager/DoubleTargetTask.java | 24 +-
.../jobmanager/ExceptionOutputFormat.java | 26 +-
.../nephele/jobmanager/ExceptionTask.java | 11 +-
.../nephele/jobmanager/ForwardTask.java | 16 +-
.../nephele/jobmanager/JobManagerITCase.java | 158 +++++-----
.../jobmanager/RuntimeExceptionTask.java | 13 +-
.../nephele/jobmanager/UnionTask.java | 22 +-
.../scheduler/queue/DefaultSchedulerTest.java | 68 ++---
.../nephele/util/tasks/DoubleSourceTask.java | 134 +++++++++
.../nephele/util/tasks/FileLineReader.java | 133 +++++++++
.../nephele/util/tasks/FileLineWriter.java | 72 +++++
.../nephele/util/tasks/JobFileInputVertex.java | 255 ++++++++++++++++
.../nephele/util/tasks/JobFileOutputVertex.java | 109 +++++++
.../runtime/hash/HashMatchIteratorITCase.java | 4 +-
.../runtime/hash/ReOpenableHashTableITCase.java | 3 +-
.../pact/runtime/io/ChannelViewsTest.java | 4 +-
.../pact/runtime/io/SpillingBufferTest.java | 4 +-
.../sort/AsynchonousPartialSorterITCase.java | 10 +-
.../CombiningUnilateralSortMergerITCase.java | 4 +-
.../pact/runtime/sort/ExternalSortITCase.java | 8 +-
.../sort/MassiveStringSortingITCase.java | 8 +-
.../sort/SortMergeMatchIteratorITCase.java | 11 +-
.../task/util/HashVsSortMiniBenchmark.java | 4 +-
.../pact/runtime/test/util/DummyInvokable.java | 6 +-
.../pact/runtime/test/util/TaskTestBase.java | 15 +-
.../bufferprovider/LocalBufferPoolTest.java | 6 +
.../TransitiveClosureITCase.java | 2 +-
.../test/iterative/nephele/JobGraphUtils.java | 13 +-
.../recordJobs/util/DiscardingOutputFormat.java | 20 +-
.../test/runtime/NetworkStackThroughput.java | 47 ++-
77 files changed, 1567 insertions(+), 1341 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8c1d82a8/stratosphere-clients/src/test/java/eu/stratosphere/client/program/ClientTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-clients/src/test/java/eu/stratosphere/client/program/ClientTest.java b/stratosphere-clients/src/test/java/eu/stratosphere/client/program/ClientTest.java
index b3f8159..a948706 100644
--- a/stratosphere-clients/src/test/java/eu/stratosphere/client/program/ClientTest.java
+++ b/stratosphere-clients/src/test/java/eu/stratosphere/client/program/ClientTest.java
@@ -28,10 +28,7 @@ import org.mockito.Mock;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
-import eu.stratosphere.api.common.InvalidProgramException;
import eu.stratosphere.api.common.Plan;
-import eu.stratosphere.api.java.LocalEnvironment;
-import eu.stratosphere.client.LocalExecutor;
import eu.stratosphere.compiler.DataStatistics;
import eu.stratosphere.compiler.PactCompiler;
import eu.stratosphere.compiler.costs.CostEstimator;
@@ -134,16 +131,16 @@ public class ClientTest {
verify(this.jobClientMock).submitJob();
}
-
- @Test(expected=InvalidProgramException.class)
- public void tryLocalExecution() throws Exception {
- new Client(configMock);
- LocalExecutor.execute(planMock);
- }
-
- @Test(expected=InvalidProgramException.class)
- public void tryLocalEnvironmentExecution() throws Exception {
- new Client(configMock);
- new LocalEnvironment();
- }
-}
+//
+// @Test(expected=InvalidProgramException.class)
+// public void tryLocalExecution() throws Exception {
+// new Client(configMock);
+// LocalExecutor.execute(planMock);
+// }
+//
+// @Test(expected=InvalidProgramException.class)
+// public void tryLocalEnvironmentExecution() throws Exception {
+// new Client(configMock);
+// new LocalEnvironment();
+// }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8c1d82a8/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plantranslate/NepheleJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plantranslate/NepheleJobGraphGenerator.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plantranslate/NepheleJobGraphGenerator.java
index 3089cdb..3c1e9e3 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plantranslate/NepheleJobGraphGenerator.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plantranslate/NepheleJobGraphGenerator.java
@@ -20,14 +20,7 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-
-import eu.stratosphere.api.common.io.InputFormat;
-import eu.stratosphere.api.common.io.OutputFormat;
-import eu.stratosphere.api.common.operators.util.UserCodeWrapper;
-import eu.stratosphere.core.io.InputSplit;
-import eu.stratosphere.nephele.template.AbstractInputTask;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import java.util.Map.Entry;
import eu.stratosphere.api.common.aggregators.AggregatorRegistry;
import eu.stratosphere.api.common.aggregators.AggregatorWithName;
@@ -66,7 +59,6 @@ import eu.stratosphere.nephele.jobgraph.JobGraphDefinitionException;
import eu.stratosphere.nephele.jobgraph.JobInputVertex;
import eu.stratosphere.nephele.jobgraph.JobOutputVertex;
import eu.stratosphere.nephele.jobgraph.JobTaskVertex;
-import eu.stratosphere.nephele.template.AbstractInputTask;
import eu.stratosphere.pact.runtime.iterative.convergence.WorksetEmptyConvergenceCriterion;
import eu.stratosphere.pact.runtime.iterative.io.FakeOutputTask;
import eu.stratosphere.pact.runtime.iterative.task.IterationHeadPactTask;
@@ -760,7 +752,7 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
} else {
// create task vertex
vertex = new JobTaskVertex(taskName, this.jobGraph);
- vertex.setTaskClass( (this.currentIteration != null && node.isOnDynamicPath()) ? IterationIntermediatePactTask.class : RegularPactTask.class);
+ vertex.setInvokableClass((this.currentIteration != null && node.isOnDynamicPath()) ? IterationIntermediatePactTask.class : RegularPactTask.class);
config = new TaskConfig(vertex.getConfiguration());
config.setDriver(ds.getDriverClass());
@@ -786,7 +778,7 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
final DriverStrategy ds = node.getDriverStrategy();
final JobTaskVertex vertex = new JobTaskVertex(taskName, this.jobGraph);
final TaskConfig config = new TaskConfig(vertex.getConfiguration());
- vertex.setTaskClass( (this.currentIteration != null && node.isOnDynamicPath()) ? IterationIntermediatePactTask.class : RegularPactTask.class);
+ vertex.setInvokableClass( (this.currentIteration != null && node.isOnDynamicPath()) ? IterationIntermediatePactTask.class : RegularPactTask.class);
// set user code
config.setStubWrapper(node.getPactContract().getUserCodeWrapper());
@@ -812,31 +804,29 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
private JobInputVertex createDataSourceVertex(SourcePlanNode node) throws CompilerException {
final JobInputVertex vertex = new JobInputVertex(node.getNodeName(), this.jobGraph);
+ final TaskConfig config = new TaskConfig(vertex.getConfiguration());
- // set task class
- @SuppressWarnings("unchecked")
- final Class<AbstractInputTask<?>> clazz = (Class<AbstractInputTask<?>>) (Class<?>) DataSourceTask
- .class;
- vertex.setInputClass(clazz);
+ vertex.setInvokableClass(DataSourceTask.class);
// set user code
- vertex.setInputFormat((UserCodeWrapper<? extends InputFormat<?, InputSplit>>)node.getPactContract()
- .getUserCodeWrapper());
- vertex.setInputFormatParameters(node.getPactContract().getParameters());
- vertex.setOutputSerializer(node.getSerializer());
+ config.setStubWrapper(node.getPactContract().getUserCodeWrapper());
+ config.setStubParameters(node.getPactContract().getParameters());
+
+ config.setOutputSerializer(node.getSerializer());
return vertex;
}
private AbstractJobOutputVertex createDataSinkVertex(SinkPlanNode node) throws CompilerException {
final JobOutputVertex vertex = new JobOutputVertex(node.getNodeName(), this.jobGraph);
+ final TaskConfig config = new TaskConfig(vertex.getConfiguration());
- vertex.setOutputClass(DataSinkTask.class);
+ vertex.setInvokableClass(DataSinkTask.class);
vertex.getConfiguration().setInteger(DataSinkTask.DEGREE_OF_PARALLELISM_KEY, node.getDegreeOfParallelism());
// set user code
- vertex.setOutputFormat((UserCodeWrapper<? extends OutputFormat<?>>)node.getPactContract().getUserCodeWrapper());
- vertex.setOutputFormatParameters(node.getPactContract().getParameters());
-
+ config.setStubWrapper(node.getPactContract().getUserCodeWrapper());
+ config.setStubParameters(node.getPactContract().getParameters());
+
return vertex;
}
@@ -884,7 +874,7 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
}
// reset the vertex type to iteration head
- headVertex.setTaskClass(IterationHeadPactTask.class);
+ headVertex.setInvokableClass(IterationHeadPactTask.class);
headConfig = new TaskConfig(headVertex.getConfiguration());
toReturn = null;
} else {
@@ -892,7 +882,7 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
// everything else happens in the post visit, after the input (the initial partial solution)
// is connected.
headVertex = new JobTaskVertex("PartialSolution ("+iteration.getNodeName()+")", this.jobGraph);
- headVertex.setTaskClass(IterationHeadPactTask.class);
+ headVertex.setInvokableClass(IterationHeadPactTask.class);
headConfig = new TaskConfig(headVertex.getConfiguration());
headConfig.setDriver(NoOpDriver.class);
toReturn = headVertex;
@@ -952,7 +942,7 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
}
// reset the vertex type to iteration head
- headVertex.setTaskClass(IterationHeadPactTask.class);
+ headVertex.setInvokableClass(IterationHeadPactTask.class);
headConfig = new TaskConfig(headVertex.getConfiguration());
toReturn = null;
} else {
@@ -960,7 +950,7 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
// everything else happens in the post visit, after the input (the initial partial solution)
// is connected.
headVertex = new JobTaskVertex("IterationHead("+iteration.getNodeName()+")", this.jobGraph);
- headVertex.setTaskClass(IterationHeadPactTask.class);
+ headVertex.setInvokableClass(IterationHeadPactTask.class);
headConfig = new TaskConfig(headVertex.getConfiguration());
headConfig.setDriver(NoOpDriver.class);
toReturn = headVertex;
@@ -1144,7 +1134,7 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
// --------------------------- create the sync task ---------------------------
final JobOutputVertex sync = new JobOutputVertex("Sync(" +
bulkNode.getNodeName() + ")", this.jobGraph);
- sync.setOutputClass(IterationSynchronizationSinkTask.class);
+ sync.setInvokableClass(IterationSynchronizationSinkTask.class);
sync.setNumberOfSubtasks(1);
this.auxVertices.add(sync);
@@ -1192,14 +1182,14 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
// No following termination criterion
if(rootOfStepFunction.getOutgoingChannels().isEmpty()) {
- rootOfStepFunctionVertex.setTaskClass(IterationTailPactTask.class);
+ rootOfStepFunctionVertex.setInvokableClass(IterationTailPactTask.class);
tailConfig.setOutputSerializer(bulkNode.getSerializerForIterationChannel());
tailConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
// create the fake output task
JobOutputVertex fakeTail = new JobOutputVertex("Fake Tail", this.jobGraph);
- fakeTail.setOutputClass(FakeOutputTask.class);
+ fakeTail.setInvokableClass(FakeOutputTask.class);
fakeTail.setNumberOfSubtasks(headVertex.getNumberOfSubtasks());
this.auxVertices.add(fakeTail);
@@ -1234,14 +1224,14 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
tailConfigOfTerminationCriterion = new TaskConfig(rootOfTerminationCriterionVertex.getConfiguration());
}
- rootOfTerminationCriterionVertex.setTaskClass(IterationTailPactTask.class);
+ rootOfTerminationCriterionVertex.setInvokableClass(IterationTailPactTask.class);
// Hack
tailConfigOfTerminationCriterion.setIsSolutionSetUpdate();
tailConfigOfTerminationCriterion.setOutputSerializer(bulkNode.getSerializerForIterationChannel());
tailConfigOfTerminationCriterion.addOutputShipStrategy(ShipStrategyType.FORWARD);
JobOutputVertex fakeTailTerminationCriterion = new JobOutputVertex("Fake Tail for Termination Criterion", this.jobGraph);
- fakeTailTerminationCriterion.setOutputClass(FakeOutputTask.class);
+ fakeTailTerminationCriterion.setInvokableClass(FakeOutputTask.class);
fakeTailTerminationCriterion.setNumberOfSubtasks(headVertex.getNumberOfSubtasks());
this.auxVertices.add(fakeTailTerminationCriterion);
@@ -1309,7 +1299,7 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
{
final JobOutputVertex sync = new JobOutputVertex("Sync (" +
iterNode.getNodeName() + ")", this.jobGraph);
- sync.setOutputClass(IterationSynchronizationSinkTask.class);
+ sync.setInvokableClass(IterationSynchronizationSinkTask.class);
sync.setNumberOfSubtasks(1);
this.auxVertices.add(sync);
@@ -1367,14 +1357,14 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
worksetTailConfig.setIsWorksetUpdate();
if (hasWorksetTail) {
- nextWorksetVertex.setTaskClass(IterationTailPactTask.class);
+ nextWorksetVertex.setInvokableClass(IterationTailPactTask.class);
worksetTailConfig.setOutputSerializer(iterNode.getWorksetSerializer());
worksetTailConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
// create the fake output task
JobOutputVertex fakeTail = new JobOutputVertex("Fake Tail", this.jobGraph);
- fakeTail.setOutputClass(FakeOutputTask.class);
+ fakeTail.setInvokableClass(FakeOutputTask.class);
fakeTail.setNumberOfSubtasks(headVertex.getNumberOfSubtasks());
this.auxVertices.add(fakeTail);
@@ -1405,14 +1395,14 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
solutionDeltaConfig.setIsSolutionSetUpdate();
if (hasSolutionSetTail) {
- solutionDeltaVertex.setTaskClass(IterationTailPactTask.class);
+ solutionDeltaVertex.setInvokableClass(IterationTailPactTask.class);
solutionDeltaConfig.setOutputSerializer(iterNode.getSolutionSetSerializer());
solutionDeltaConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
// create the fake output task
JobOutputVertex fakeTail = new JobOutputVertex("Fake Tail", this.jobGraph);
- fakeTail.setOutputClass(FakeOutputTask.class);
+ fakeTail.setInvokableClass(FakeOutputTask.class);
fakeTail.setNumberOfSubtasks(headVertex.getNumberOfSubtasks());
this.auxVertices.add(fakeTail);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8c1d82a8/stratosphere-core/src/main/java/eu/stratosphere/api/common/PlanExecutor.java
----------------------------------------------------------------------
diff --git a/stratosphere-core/src/main/java/eu/stratosphere/api/common/PlanExecutor.java b/stratosphere-core/src/main/java/eu/stratosphere/api/common/PlanExecutor.java
index 7caaab2..d91abf8 100644
--- a/stratosphere-core/src/main/java/eu/stratosphere/api/common/PlanExecutor.java
+++ b/stratosphere-core/src/main/java/eu/stratosphere/api/common/PlanExecutor.java
@@ -53,7 +53,6 @@ public abstract class PlanExecutor {
* Creates an executor that runs the plan locally in a multi-threaded environment.
*
* @return A local executor.
- * @see eu.stratosphere.client.LocalExecutor
*/
public static PlanExecutor createLocalExecutor() {
Class<? extends PlanExecutor> leClass = loadExecutorClass(LOCAL_EXECUTOR_CLASS);
@@ -75,7 +74,6 @@ public abstract class PlanExecutor {
* @param jarFiles A list of jar files that contain the user-defined function (UDF) classes and all classes used
* from within the UDFs.
* @return A remote executor.
- * @see eu.stratosphere.client.RemoteExecutor
*/
public static PlanExecutor createRemoteExecutor(String hostname, int port, String... jarFiles) {
if (hostname == null) {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8c1d82a8/stratosphere-core/src/main/java/eu/stratosphere/api/common/io/FileOutputFormat.java
----------------------------------------------------------------------
diff --git a/stratosphere-core/src/main/java/eu/stratosphere/api/common/io/FileOutputFormat.java b/stratosphere-core/src/main/java/eu/stratosphere/api/common/io/FileOutputFormat.java
index c4e1d5a..d43c987 100644
--- a/stratosphere-core/src/main/java/eu/stratosphere/api/common/io/FileOutputFormat.java
+++ b/stratosphere-core/src/main/java/eu/stratosphere/api/common/io/FileOutputFormat.java
@@ -31,14 +31,14 @@ import eu.stratosphere.core.fs.Path;
* The abstract base class for all output formats that are file based. Contains the logic to open/close the target
* file streams.
*/
-public abstract class FileOutputFormat<IT> implements OutputFormat<IT> {
+public abstract class FileOutputFormat<IT> implements OutputFormat<IT>, InitializeOnMaster {
+
private static final long serialVersionUID = 1L;
// --------------------------------------------------------------------------------------------
/**
- * Defines the behavior for creating output directories.
- *
+ * Behavior for creating output directories.
*/
public static enum OutputDirectoryMode {
@@ -54,7 +54,7 @@ public abstract class FileOutputFormat<IT> implements OutputFormat<IT> {
private static WriteMode DEFAULT_WRITE_MODE;
- private static OutputDirectoryMode DEFAULT_OUTPUT_DIRECTORY_MODE;
+ private static OutputDirectoryMode DEFAULT_OUTPUT_DIRECTORY_MODE;
private static final void initDefaultsFromConfiguration() {
@@ -100,11 +100,6 @@ public abstract class FileOutputFormat<IT> implements OutputFormat<IT> {
*/
private OutputDirectoryMode outputDirectoryMode;
- /**
- * Stream opening timeout.
- */
- private long openTimeout = -1;
-
// --------------------------------------------------------------------------------------------
/**
@@ -158,19 +153,6 @@ public abstract class FileOutputFormat<IT> implements OutputFormat<IT> {
return this.outputDirectoryMode;
}
-
- public void setOpenTimeout(long timeout) {
- if (timeout < 0) {
- throw new IllegalArgumentException("The timeout must be a nonnegative numer of milliseconds (zero for infinite).");
- }
-
- this.openTimeout = (timeout == 0) ? Long.MAX_VALUE : timeout;
- }
-
- public long getOpenTimeout() {
- return this.openTimeout;
- }
-
// ----------------------------------------------------------------
@Override
@@ -200,34 +182,58 @@ public abstract class FileOutputFormat<IT> implements OutputFormat<IT> {
if (this.outputDirectoryMode == null) {
this.outputDirectoryMode = DEFAULT_OUTPUT_DIRECTORY_MODE;
}
-
- if (this.openTimeout == -1) {
- this.openTimeout = FileInputFormat.getDefaultOpeningTimeout();
- }
}
@Override
public void open(int taskNumber, int numTasks) throws IOException {
+ if (taskNumber < 0 || numTasks < 1) {
+ throw new IllegalArgumentException("TaskNumber: " + taskNumber + ", numTasks: " + numTasks);
+ }
if (LOG.isDebugEnabled()) {
- LOG.debug("Openint stream for output (" + (taskNumber+1) + "/" + numTasks + "). WriteMode=" + writeMode +
- ", OutputDirectoryMode=" + outputDirectoryMode + ", timeout=" + openTimeout);
+ LOG.debug("Opening stream for output (" + (taskNumber+1) + "/" + numTasks + "). WriteMode=" + writeMode +
+ ", OutputDirectoryMode=" + outputDirectoryMode);
}
- // obtain FSDataOutputStream asynchronously, since HDFS client is vulnerable to InterruptedExceptions
- OutputPathOpenThread opot = new OutputPathOpenThread(this, (taskNumber + 1), numTasks);
- opot.start();
+ Path p = this.outputFilePath;
+ if (p == null) {
+ throw new IOException("The file path is null.");
+ }
- try {
- // get FSDataOutputStream
- this.stream = opot.waitForCompletion();
+ final FileSystem fs = p.getFileSystem();
+
+ // if this is a local file system, we need to initialize the local output directory here
+ if (!fs.isDistributedFS()) {
+
+ if (numTasks == 1 && outputDirectoryMode == OutputDirectoryMode.PARONLY) {
+ // output should go to a single file
+
+ // prepare local output path. checks for write mode and removes existing files in case of OVERWRITE mode
+ if(!fs.initOutPathLocalFS(p, writeMode, false)) {
+ // output preparation failed! Cancel task.
+ throw new IOException("Output path could not be initialized. Canceling task...");
+ }
+ }
+ else {
+ // numTasks > 1 || outDirMode == OutputDirectoryMode.ALWAYS
+
+ if(!fs.initOutPathLocalFS(p, writeMode, true)) {
+ // output preparation failed! Cancel task.
+ throw new IOException("Output directory could not be created. Canceling task...");
+ }
+ }
}
- catch (Exception e) {
- throw new RuntimeException("Stream to output file could not be opened: " + e.getMessage(), e);
+
+
+ // Suffix the path with the parallel instance index, if needed
+ if (numTasks > 1 || outputDirectoryMode == OutputDirectoryMode.ALWAYS) {
+ p = p.suffix("/" + (taskNumber+1));
}
- }
+ // create output file
+ this.stream = fs.create(p, writeMode == WriteMode.OVERWRITE);
+ }
@Override
public void close() throws IOException {
@@ -238,153 +244,37 @@ public abstract class FileOutputFormat<IT> implements OutputFormat<IT> {
}
}
- // ============================================================================================
-
- private static final class OutputPathOpenThread extends Thread {
-
- private final Path path;
-
- private final int taskIndex;
-
- private final int numTasks;
-
- private final WriteMode writeMode;
-
- private final OutputDirectoryMode outDirMode;
-
- private final long timeoutMillies;
-
- private volatile FSDataOutputStream fdos;
-
- private volatile Throwable error;
-
- private volatile boolean aborted;
-
-
- public OutputPathOpenThread(FileOutputFormat<?> fof, int taskIndex, int numTasks) {
- this.path = fof.getOutputFilePath();
- this.writeMode = fof.getWriteMode();
- this.outDirMode = fof.getOutputDirectoryMode();
- this.timeoutMillies = fof.getOpenTimeout();
- this.taskIndex = taskIndex;
- this.numTasks = numTasks;
- }
-
- @Override
- public void run() {
-
- try {
- Path p = this.path;
- final FileSystem fs = p.getFileSystem();
-
- // initialize output path.
- if(this.numTasks == 1 && outDirMode == OutputDirectoryMode.PARONLY) {
- // output is not written in parallel and should go to a single file
-
- if(!fs.isDistributedFS()) {
- // prepare local output path
- // checks for write mode and removes existing files in case of OVERWRITE mode
- if(!fs.initOutPathLocalFS(p, writeMode, false)) {
- // output preparation failed! Cancel task.
- throw new IOException("Output path could not be initialized. Canceling task.");
- }
- }
-
- } else if(this.numTasks > 1 || outDirMode == OutputDirectoryMode.ALWAYS) {
- // output is written in parallel into a directory or should always be written to a directory
-
- if(!fs.isDistributedFS()) {
- // File system is not distributed.
- // We need to prepare the output path on each executing node.
- if(!fs.initOutPathLocalFS(p, writeMode, true)) {
- // output preparation failed! Cancel task.
- throw new IOException("Output directory could not be created. Canceling task.");
- }
- }
-
- // Suffix the path with the parallel instance index
- p = p.suffix("/" + this.taskIndex);
-
- } else {
- // invalid number of subtasks (<= 0)
- throw new IllegalArgumentException("Invalid number of subtasks. Canceling task.");
- }
-
- // create output file
- switch(writeMode) {
- case NO_OVERWRITE:
- this.fdos = fs.create(p, false);
- break;
- case OVERWRITE:
- this.fdos = fs.create(p, true);
- break;
- default:
- throw new IllegalArgumentException("Invalid write mode: "+writeMode);
- }
-
- // check for canceling and close the stream in that case, because no one will obtain it
- if (this.aborted) {
- final FSDataOutputStream f = this.fdos;
- this.fdos = null;
- f.close();
- }
- }
- catch (Throwable t) {
- this.error = t;
- }
- }
+ /**
+ * Initialization of the distributed file system if it is used.
+ *
+ * @param parallelism The task parallelism.
+ */
+ @Override
+ public void initializeGlobal(int parallelism) throws IOException {
+ final Path path = getOutputFilePath();
+ final FileSystem fs = path.getFileSystem();
- public FSDataOutputStream waitForCompletion() throws Exception {
- final long start = System.currentTimeMillis();
- long remaining = this.timeoutMillies;
+ // only distributed file systems can be initialized at start-up time.
+ if (fs.isDistributedFS()) {
- do {
- try {
- this.join(remaining);
- } catch (InterruptedException iex) {
- // we were canceled, so abort the procedure
- abortWait();
- throw iex;
+ final WriteMode writeMode = getWriteMode();
+ final OutputDirectoryMode outDirMode = getOutputDirectoryMode();
+
+ if (parallelism == 1 && outDirMode == OutputDirectoryMode.PARONLY) {
+ // output is not written in parallel and should be written to a single file.
+ // prepare distributed output path
+ if(!fs.initOutPathDistFS(path, writeMode, false)) {
+ // output preparation failed! Cancel task.
+ throw new IOException("Output path could not be initialized.");
}
- }
- while (this.error == null && this.fdos == null &&
- (remaining = this.timeoutMillies + start - System.currentTimeMillis()) > 0);
-
- if (this.error != null) {
- throw new IOException("Opening the file output stream failed" +
- (this.error.getMessage() == null ? "." : ": " + this.error.getMessage()), this.error);
- }
-
- if (this.fdos != null) {
- return this.fdos;
+
} else {
- // double-check that the stream has not been set by now. we don't know here whether
- // a) the opener thread recognized the canceling and closed the stream
- // b) the flag was set such that the stream did not see it and we have a valid stream
- // In any case, close the stream and throw an exception.
- abortWait();
-
- final boolean stillAlive = this.isAlive();
- final StringBuilder bld = new StringBuilder(256);
- for (StackTraceElement e : this.getStackTrace()) {
- bld.append("\tat ").append(e.toString()).append('\n');
+ // output should be written to a directory
+
+ // only distributed file systems can be initialized at start-up time.
+ if(!fs.initOutPathDistFS(path, writeMode, true)) {
+ throw new IOException("Output directory could not be created.");
}
- throw new IOException("Output opening request timed out. Opener was " + (stillAlive ? "" : "NOT ") +
- " alive. Stack:\n" + bld.toString());
- }
- }
-
- /**
- * Double checked procedure setting the abort flag and closing the stream.
- */
- private final void abortWait() {
- this.aborted = true;
- final FSDataOutputStream outStream = this.fdos;
- this.fdos = null;
- if (outStream != null) {
- try {
- outStream.close();
- } catch (Throwable t) {}
}
}
}
@@ -437,47 +327,4 @@ public abstract class FileOutputFormat<IT> implements OutputFormat<IT> {
super(targetConfig);
}
}
-
- /**
- * Initialization of the distributed file system if it is used.
- *
- * @param configuration The task configuration
- */
- @Override
- public void initialize(Configuration configuration){
- final Path path = this.getOutputFilePath();
- final WriteMode writeMode = this.getWriteMode();
- final OutputDirectoryMode outDirMode = this.getOutputDirectoryMode();
-
- // Prepare output path and determine max DOP
- try {
- final FileSystem fs = path.getFileSystem();
-
- int dop = configuration.getInteger(DEGREE_OF_PARALLELISM_KEY, -1);
- if(dop == 1 && outDirMode == OutputDirectoryMode.PARONLY) {
- // output is not written in parallel and should be written to a single file.
-
- if(fs.isDistributedFS()) {
- // prepare distributed output path
- if(!fs.initOutPathDistFS(path, writeMode, false)) {
- // output preparation failed! Cancel task.
- throw new IOException("Output path could not be initialized.");
- }
- }
- } else {
- // output should be written to a directory
-
- if(fs.isDistributedFS()) {
- // only distributed file systems can be initialized at start-up time.
- if(!fs.initOutPathDistFS(path, writeMode, true)) {
- throw new IOException("Output directory could not be created.");
- }
- }
- }
- }
- catch (IOException e) {
- LOG.error("Could not access the file system to detemine the status of the output.", e);
- throw new RuntimeException("I/O Error while accessing file", e);
- }
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8c1d82a8/stratosphere-core/src/main/java/eu/stratosphere/api/common/io/FormatUtil.java
----------------------------------------------------------------------
diff --git a/stratosphere-core/src/main/java/eu/stratosphere/api/common/io/FormatUtil.java b/stratosphere-core/src/main/java/eu/stratosphere/api/common/io/FormatUtil.java
index f191c61..ec1033e 100644
--- a/stratosphere-core/src/main/java/eu/stratosphere/api/common/io/FormatUtil.java
+++ b/stratosphere-core/src/main/java/eu/stratosphere/api/common/io/FormatUtil.java
@@ -153,7 +153,6 @@ public class FormatUtil {
{
final F outputFormat = ReflectionUtil.newInstance(outputFormatClass);
outputFormat.setOutputFilePath(new Path(path));
- outputFormat.setOpenTimeout(0);
outputFormat.setWriteMode(WriteMode.OVERWRITE);
configuration = configuration == null ? new Configuration() : configuration;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8c1d82a8/stratosphere-core/src/main/java/eu/stratosphere/api/common/io/InitializeOnMaster.java
----------------------------------------------------------------------
diff --git a/stratosphere-core/src/main/java/eu/stratosphere/api/common/io/InitializeOnMaster.java b/stratosphere-core/src/main/java/eu/stratosphere/api/common/io/InitializeOnMaster.java
new file mode 100644
index 0000000..86fdee2
--- /dev/null
+++ b/stratosphere-core/src/main/java/eu/stratosphere/api/common/io/InitializeOnMaster.java
@@ -0,0 +1,35 @@
+/***********************************************************************************************************************
+ *
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ *
+ **********************************************************************************************************************/
+
+package eu.stratosphere.api.common.io;
+
+import java.io.IOException;
+
+/**
+ * This interface may be implemented by {@link OutputFormat}s to have the master initialize them globally.
+ *
+ * For example, the {@link FileOutputFormat} implements this behavior for distributed file systems and
+ * creates/deletes target directories if necessary.
+ */
+public interface InitializeOnMaster {
+
+ /**
+ * The method is invoked on the master (JobManager) before the distributed program execution starts.
+ *
+ * @param parallelism The degree of parallelism with which the format or functions will be run.
+ * @throws IOException The initialization may throw exceptions, which may cause the job to abort.
+ */
+ void initializeGlobal(int parallelism) throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8c1d82a8/stratosphere-core/src/main/java/eu/stratosphere/api/common/io/OutputFormat.java
----------------------------------------------------------------------
diff --git a/stratosphere-core/src/main/java/eu/stratosphere/api/common/io/OutputFormat.java b/stratosphere-core/src/main/java/eu/stratosphere/api/common/io/OutputFormat.java
index bdc59e4..72dddf4 100644
--- a/stratosphere-core/src/main/java/eu/stratosphere/api/common/io/OutputFormat.java
+++ b/stratosphere-core/src/main/java/eu/stratosphere/api/common/io/OutputFormat.java
@@ -20,14 +20,13 @@ import eu.stratosphere.configuration.Configuration;
/**
- * Describes the base interface that is used describe an output that consumes records. The output format
+ * The base interface for outputs that consumes records. The output format
* describes how to store the final records, for example in a file.
* <p>
* The life cycle of an output format is the following:
* <ol>
- * <li>After being instantiated (parameterless), it is configured with a {@link Configuration} object.
- * Basic fields are read from the configuration, such as for example a file path, if the format describes
- * files as the sink for the records.</li>
+ * <li>configure() is invoked a single time. The method can be used to implement initialization from
+ * the parameters (configuration) that may be attached upon instantiation.</li>
* <li>Each parallel output task creates an instance, configures it and opens it.</li>
* <li>All records of its parallel instance are handed to the output format.</li>
* <li>The output format is closed</li>
@@ -79,13 +78,5 @@ public interface OutputFormat<IT> extends Serializable {
* @throws IOException Thrown, if the input could not be closed properly.
*/
void close() throws IOException;
-
- /**
- * Method which is called on the JobManager node prior to execution. It can be used to set up output format
- * related tasks.
- *
- * @param configuration The task configuration
- */
- void initialize(Configuration configuration);
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8c1d82a8/stratosphere-core/src/main/java/eu/stratosphere/configuration/Configuration.java
----------------------------------------------------------------------
diff --git a/stratosphere-core/src/main/java/eu/stratosphere/configuration/Configuration.java b/stratosphere-core/src/main/java/eu/stratosphere/configuration/Configuration.java
index 451577f..6b9436b 100644
--- a/stratosphere-core/src/main/java/eu/stratosphere/configuration/Configuration.java
+++ b/stratosphere-core/src/main/java/eu/stratosphere/configuration/Configuration.java
@@ -34,17 +34,19 @@ import eu.stratosphere.core.io.StringRecord;
* This class is thread-safe.
*
*/
-public class Configuration implements IOReadableWritable {
+public class Configuration implements IOReadableWritable, java.io.Serializable {
+
+ private static final long serialVersionUID = 1L;
/**
* Stores the concrete key/value pairs of this configuration object.
*/
- private Map<String, String> confData = new HashMap<String, String>();
+ private final Map<String, String> confData = new HashMap<String, String>();
/**
* The class loader to be used for the <code>getClass</code> method.
*/
- private ClassLoader classLoader;
+ private transient ClassLoader classLoader;
/**
* Constructs a new configuration object.
@@ -446,7 +448,6 @@ public class Configuration implements IOReadableWritable {
// --------------------------------------------------------------------------------------------
-
@Override
public void read(final DataInput in) throws IOException {
@@ -479,6 +480,13 @@ public class Configuration implements IOReadableWritable {
}
}
}
+
+ private void readObject(java.io.ObjectInputStream s) throws java.io.IOException, ClassNotFoundException {
+ s.defaultReadObject();
+ this.classLoader = getClass().getClassLoader();
+ }
+
+ // --------------------------------------------------------------------------------------------
@Override
public int hashCode() {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8c1d82a8/stratosphere-core/src/main/java/eu/stratosphere/core/fs/FileSystem.java
----------------------------------------------------------------------
diff --git a/stratosphere-core/src/main/java/eu/stratosphere/core/fs/FileSystem.java b/stratosphere-core/src/main/java/eu/stratosphere/core/fs/FileSystem.java
index 11c7007..8e65636 100644
--- a/stratosphere-core/src/main/java/eu/stratosphere/core/fs/FileSystem.java
+++ b/stratosphere-core/src/main/java/eu/stratosphere/core/fs/FileSystem.java
@@ -459,7 +459,7 @@ public abstract class FileSystem {
// path exists, check write mode
switch (writeMode) {
case NO_OVERWRITE:
- if (status.isDir()) {
+ if (status.isDir() && createDirectory) {
return true;
} else {
// file may not be overwritten
@@ -467,6 +467,7 @@ public abstract class FileSystem {
WriteMode.NO_OVERWRITE.name() + " mode. Use " + WriteMode.OVERWRITE.name() +
" mode to overwrite existing files and directories.");
}
+
case OVERWRITE:
if (status.isDir()) {
if (createDirectory) {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8c1d82a8/stratosphere-core/src/main/java/eu/stratosphere/core/io/StringRecord.java
----------------------------------------------------------------------
diff --git a/stratosphere-core/src/main/java/eu/stratosphere/core/io/StringRecord.java b/stratosphere-core/src/main/java/eu/stratosphere/core/io/StringRecord.java
index 50c2599..de2358b 100644
--- a/stratosphere-core/src/main/java/eu/stratosphere/core/io/StringRecord.java
+++ b/stratosphere-core/src/main/java/eu/stratosphere/core/io/StringRecord.java
@@ -34,6 +34,8 @@ import java.text.CharacterIterator;
import java.text.StringCharacterIterator;
import java.util.Arrays;
+import eu.stratosphere.types.Value;
+
/**
* This class stores text using standard UTF8 encoding. It provides methods to
* serialize, deserialize, and compare texts at byte level. The type of length
@@ -44,7 +46,9 @@ import java.util.Arrays;
* Also includes utilities for serializing/deserialing a string, coding/decoding a string, checking if a byte array
* contains valid UTF8 code, calculating the length of an encoded string.
*/
-public class StringRecord implements IOReadableWritable {
+public class StringRecord implements Value {
+
+ private static final long serialVersionUID = 1L;
private static final ThreadLocal<CharsetEncoder> ENCODER_FACTORY = new ThreadLocal<CharsetEncoder>() {
protected CharsetEncoder initialValue() {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8c1d82a8/stratosphere-core/src/main/java/eu/stratosphere/util/IterableIterator.java
----------------------------------------------------------------------
diff --git a/stratosphere-core/src/main/java/eu/stratosphere/util/IterableIterator.java b/stratosphere-core/src/main/java/eu/stratosphere/util/IterableIterator.java
index 16f610a..b59e2e6 100644
--- a/stratosphere-core/src/main/java/eu/stratosphere/util/IterableIterator.java
+++ b/stratosphere-core/src/main/java/eu/stratosphere/util/IterableIterator.java
@@ -18,9 +18,9 @@ package eu.stratosphere.util;
import java.util.Iterator;
/**
- * An {@link Iterator] that is also {@link Iterable} (often by returning itself).
+ * An {@link Iterator} that is also {@link Iterable} (often by returning itself).
*
- * @param <T> The iterated elements' type.
+ * @param <E> The iterated elements' type.
*/
public interface IterableIterator<E> extends Iterator<E>, Iterable<E> {
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8c1d82a8/stratosphere-java/src/main/java/eu/stratosphere/api/java/io/PrintingOutputFormat.java
----------------------------------------------------------------------
diff --git a/stratosphere-java/src/main/java/eu/stratosphere/api/java/io/PrintingOutputFormat.java b/stratosphere-java/src/main/java/eu/stratosphere/api/java/io/PrintingOutputFormat.java
index d1736d4..5c09439 100644
--- a/stratosphere-java/src/main/java/eu/stratosphere/api/java/io/PrintingOutputFormat.java
+++ b/stratosphere-java/src/main/java/eu/stratosphere/api/java/io/PrintingOutputFormat.java
@@ -95,7 +95,4 @@ public class PrintingOutputFormat<T> implements OutputFormat<T> {
public String toString() {
return "Print to " + (target == STD_OUT ? "System.out" : "System.err");
}
-
- @Override
- public void initialize(Configuration configuration){}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8c1d82a8/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/RuntimeEnvironment.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/RuntimeEnvironment.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/RuntimeEnvironment.java
index 2416b07..ae5198a 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/RuntimeEnvironment.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/RuntimeEnvironment.java
@@ -43,7 +43,6 @@ import org.apache.commons.logging.LogFactory;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Collections;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8c1d82a8/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionGraph.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionGraph.java
index 1c4a820..18395fb 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionGraph.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionGraph.java
@@ -31,11 +31,13 @@ import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import eu.stratosphere.api.common.io.InitializeOnMaster;
+import eu.stratosphere.api.common.io.OutputFormat;
import eu.stratosphere.configuration.Configuration;
-import eu.stratosphere.configuration.IllegalConfigurationException;
import eu.stratosphere.core.io.InputSplit;
import eu.stratosphere.nephele.execution.ExecutionListener;
import eu.stratosphere.nephele.execution.ExecutionState;
+import eu.stratosphere.nephele.execution.librarycache.LibraryCacheManager;
import eu.stratosphere.nephele.instance.AllocatedResource;
import eu.stratosphere.nephele.instance.DummyInstance;
import eu.stratosphere.nephele.jobgraph.DistributionPattern;
@@ -45,11 +47,11 @@ import eu.stratosphere.runtime.io.channels.ChannelType;
import eu.stratosphere.nephele.jobgraph.AbstractJobInputVertex;
import eu.stratosphere.nephele.jobgraph.AbstractJobVertex;
import eu.stratosphere.nephele.jobgraph.JobEdge;
-import eu.stratosphere.nephele.jobgraph.JobFileOutputVertex;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.nephele.jobgraph.JobID;
+import eu.stratosphere.nephele.jobgraph.JobInputVertex;
+import eu.stratosphere.nephele.jobgraph.JobOutputVertex;
import eu.stratosphere.nephele.taskmanager.ExecutorThreadFactory;
-import eu.stratosphere.nephele.template.AbstractInputTask;
import eu.stratosphere.nephele.template.AbstractInvokable;
import eu.stratosphere.util.StringUtils;
@@ -462,42 +464,68 @@ public class ExecutionGraph implements ExecutionListener {
: false, jobVertex.getNumberOfExecutionRetries(), jobVertex.getConfiguration(), signature,
invokableClass);
} catch (Throwable t) {
- throw new GraphConversionException(StringUtils.stringifyException(t));
+ throw new GraphConversionException(t);
}
// Register input and output vertices separately
if (jobVertex instanceof AbstractJobInputVertex) {
- final InputSplit[] inputSplits;
-
+ final AbstractJobInputVertex jobInputVertex = (AbstractJobInputVertex) jobVertex;
+
+ if (jobVertex instanceof JobInputVertex) {
+ try {
+ // get a handle to the user code class loader
+ ClassLoader cl = LibraryCacheManager.getClassLoader(jobVertex.getJobGraph().getJobID());
+
+ ((JobInputVertex) jobVertex).initializeInputFormatFromTaskConfig(cl);
+ }
+ catch (Throwable t) {
+ throw new GraphConversionException("Could not deserialize input format.", t);
+ }
+ }
+
final Class<? extends InputSplit> inputSplitType = jobInputVertex.getInputSplitType();
+
+ InputSplit[] inputSplits;
- try{
+ try {
inputSplits = jobInputVertex.getInputSplits(jobVertex.getNumberOfSubtasks());
- }catch(Exception e) {
- throw new GraphConversionException("Cannot compute input splits for " + groupVertex.getName() + ": "
- + StringUtils.stringifyException(e));
+ }
+ catch (Throwable t) {
+ throw new GraphConversionException("Cannot compute input splits for " + groupVertex.getName(), t);
}
if (inputSplits == null) {
- LOG.info("Job input vertex " + jobVertex.getName() + " generated 0 input splits");
- } else {
- LOG.info("Job input vertex " + jobVertex.getName() + " generated " + inputSplits.length
- + " input splits");
+ inputSplits = new InputSplit[0];
}
+
+ LOG.info("Job input vertex " + jobVertex.getName() + " generated " + inputSplits.length + " input splits");
// assign input splits and type
groupVertex.setInputSplits(inputSplits);
groupVertex.setInputSplitType(inputSplitType);
}
- if(jobVertex instanceof JobOutputVertex){
+ if (jobVertex instanceof JobOutputVertex){
final JobOutputVertex jobOutputVertex = (JobOutputVertex) jobVertex;
+
+ try {
+ // get a handle to the user code class loader
+ ClassLoader cl = LibraryCacheManager.getClassLoader(jobVertex.getJobGraph().getJobID());
+ jobOutputVertex.initializeOutputFormatFromTaskConfig(cl);
+ }
+ catch (Throwable t) {
+ throw new GraphConversionException("Could not deserialize output format.", t);
+ }
- final OutputFormat<?> outputFormat = jobOutputVertex.getOutputFormat();
-
- if(outputFormat != null){
- outputFormat.initialize(groupVertex.getConfiguration());
+ OutputFormat<?> outputFormat = jobOutputVertex.getOutputFormat();
+ if (outputFormat != null && outputFormat instanceof InitializeOnMaster){
+ try {
+ ((InitializeOnMaster) outputFormat).initializeGlobal(jobVertex.getNumberOfSubtasks());
+ }
+ catch (Throwable t) {
+ throw new GraphConversionException(t);
+ }
}
}
@@ -519,7 +547,6 @@ public class ExecutionGraph implements ExecutionListener {
* @return the number of input vertices registered with this execution graph
*/
public int getNumberOfInputVertices() {
-
return this.stages.get(0).getNumberOfInputExecutionVertices();
}
@@ -531,7 +558,6 @@ public class ExecutionGraph implements ExecutionListener {
* @return the number of input vertices for the given stage
*/
public int getNumberOfInputVertices(int stage) {
-
if (stage >= this.stages.size()) {
return 0;
}
@@ -545,7 +571,6 @@ public class ExecutionGraph implements ExecutionListener {
* @return the number of output vertices registered with this execution graph
*/
public int getNumberOfOutputVertices() {
-
return this.stages.get(0).getNumberOfOutputExecutionVertices();
}
@@ -557,7 +582,6 @@ public class ExecutionGraph implements ExecutionListener {
* @return the number of input vertices for the given stage
*/
public int getNumberOfOutputVertices(final int stage) {
-
if (stage >= this.stages.size()) {
return 0;
}
@@ -574,7 +598,6 @@ public class ExecutionGraph implements ExecutionListener {
* exists
*/
public ExecutionVertex getInputVertex(final int index) {
-
return this.stages.get(0).getInputExecutionVertex(index);
}
@@ -587,7 +610,6 @@ public class ExecutionGraph implements ExecutionListener {
* exists
*/
public ExecutionVertex getOutputVertex(final int index) {
-
return this.stages.get(0).getOutputExecutionVertex(index);
}
@@ -602,7 +624,6 @@ public class ExecutionGraph implements ExecutionListener {
* exists in that stage
*/
public ExecutionVertex getInputVertex(final int stage, final int index) {
-
try {
final ExecutionStage s = this.stages.get(stage);
if (s == null) {
@@ -627,7 +648,6 @@ public class ExecutionGraph implements ExecutionListener {
* exists in that stage
*/
public ExecutionVertex getOutputVertex(final int stage, final int index) {
-
try {
final ExecutionStage s = this.stages.get(stage);
if (s == null) {
@@ -649,7 +669,6 @@ public class ExecutionGraph implements ExecutionListener {
* @return the execution stage with number <code>num</code> or <code>null</code> if no such execution stage exists
*/
public ExecutionStage getStage(final int num) {
-
try {
return this.stages.get(num);
} catch (ArrayIndexOutOfBoundsException e) {
@@ -663,7 +682,6 @@ public class ExecutionGraph implements ExecutionListener {
* @return the number of execution stages in the execution graph
*/
public int getNumberOfStages() {
-
return this.stages.size();
}
@@ -676,7 +694,6 @@ public class ExecutionGraph implements ExecutionListener {
* exists in the execution graph
*/
public ExecutionVertex getVertexByChannelID(final ChannelID id) {
-
final ExecutionEdge edge = this.edgeMap.get(id);
if (edge == null) {
return null;
@@ -697,7 +714,6 @@ public class ExecutionGraph implements ExecutionListener {
* @return the edge whose ID matches <code>id</code> or <code>null</code> if no such edge is known
*/
public ExecutionEdge getEdgeByID(final ChannelID id) {
-
return this.edgeMap.get(id);
}
@@ -708,7 +724,6 @@ public class ExecutionGraph implements ExecutionListener {
* the execution vertex to register
*/
void registerExecutionVertex(final ExecutionVertex vertex) {
-
if (this.vertexMap.put(vertex.getID(), vertex) != null) {
throw new IllegalStateException("There is already an execution vertex with ID " + vertex.getID()
+ " registered");
@@ -724,7 +739,6 @@ public class ExecutionGraph implements ExecutionListener {
* found
*/
public ExecutionVertex getVertexByID(final ExecutionVertexID id) {
-
return this.vertexMap.get(id);
}
@@ -735,7 +749,6 @@ public class ExecutionGraph implements ExecutionListener {
* @return <code>true</code> if stage is completed, <code>false</code> otherwise
*/
private boolean isCurrentStageCompleted() {
-
if (this.indexToCurrentExecutionStage >= this.stages.size()) {
return true;
}
@@ -758,7 +771,6 @@ public class ExecutionGraph implements ExecutionListener {
* @return <code>true</code> if the execution of the graph is finished, <code>false</code> otherwise
*/
public boolean isExecutionFinished() {
-
return (getJobStatus() == InternalJobStatus.FINISHED);
}
@@ -1307,4 +1319,26 @@ public class ExecutionGraph implements ExecutionListener {
}
}
}
+
+ /**
+ * Retrieves the number of required slots to run this execution graph
+ * @return
+ */
+ public int getRequiredSlots(){
+ int maxRequiredSlots = 0;
+
+ final Iterator<ExecutionStage> stageIterator = this.stages.iterator();
+
+ while(stageIterator.hasNext()){
+ final ExecutionStage stage = stageIterator.next();
+
+ int requiredSlots = stage.getRequiredSlots();
+
+ if(requiredSlots > maxRequiredSlots){
+ maxRequiredSlots = requiredSlots;
+ }
+ }
+
+ return maxRequiredSlots;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8c1d82a8/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionGroupVertex.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionGroupVertex.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionGroupVertex.java
index 91e9e53..dceeb90 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionGroupVertex.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionGroupVertex.java
@@ -15,7 +15,6 @@ package eu.stratosphere.nephele.executiongraph;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.core.io.InputSplit;
-import eu.stratosphere.nephele.execution.RuntimeEnvironment;
import eu.stratosphere.nephele.instance.AllocatedResource;
import eu.stratosphere.nephele.instance.DummyInstance;
import eu.stratosphere.nephele.jobgraph.JobVertexID;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8c1d82a8/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/AbstractJobInputVertex.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/AbstractJobInputVertex.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/AbstractJobInputVertex.java
index e4d3b9d..b901742 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/AbstractJobInputVertex.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/AbstractJobInputVertex.java
@@ -15,11 +15,8 @@ package eu.stratosphere.nephele.jobgraph;
import eu.stratosphere.core.io.InputSplit;
-import java.io.IOException;
-
/**
- * An abstract base class for input vertices in Nephele.
- *
+ * An abstract base class for input vertices.
*/
public abstract class AbstractJobInputVertex extends AbstractJobVertex {
@@ -28,12 +25,24 @@ public abstract class AbstractJobInputVertex extends AbstractJobVertex {
*
* @param name
* the name of the new job input vertex
+ * @param jobGraph
+ * the job graph this vertex belongs to
+ */
+ protected AbstractJobInputVertex(String name, JobGraph jobGraph) {
+ this(name, null, jobGraph);
+ }
+
+ /**
+ * Constructs a new job input vertex with the given name.
+ *
+ * @param name
+ * the name of the new job input vertex
* @param id
* the ID of this vertex
* @param jobGraph
* the job graph this vertex belongs to
*/
- protected AbstractJobInputVertex(final String name, final JobVertexID id, final JobGraph jobGraph) {
+ protected AbstractJobInputVertex(String name, JobVertexID id, JobGraph jobGraph) {
super(name, id, jobGraph);
jobGraph.addVertex(this);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8c1d82a8/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/AbstractJobOutputVertex.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/AbstractJobOutputVertex.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/AbstractJobOutputVertex.java
index 849df4b..6020f24 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/AbstractJobOutputVertex.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/AbstractJobOutputVertex.java
@@ -24,14 +24,15 @@ public abstract class AbstractJobOutputVertex extends AbstractJobVertex {
*
* @param name
* the name of the new job output vertex
- * @param id
- * the ID of this vertex
* @param jobGraph
* the job graph this vertex belongs to
*/
- protected AbstractJobOutputVertex(final String name, final JobVertexID id, final JobGraph jobGraph) {
+ protected AbstractJobOutputVertex(String name, JobGraph jobGraph) {
+ this(name, null, jobGraph);
+ }
+
+ protected AbstractJobOutputVertex(String name, JobVertexID id, JobGraph jobGraph) {
super(name, id, jobGraph);
-
jobGraph.addVertex(this);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8c1d82a8/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/AbstractJobVertex.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/AbstractJobVertex.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/AbstractJobVertex.java
index cdadd3c..cc936d9 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/AbstractJobVertex.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/AbstractJobVertex.java
@@ -18,8 +18,9 @@ import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
+import org.apache.commons.lang.Validate;
+
import eu.stratosphere.configuration.Configuration;
-import eu.stratosphere.configuration.IllegalConfigurationException;
import eu.stratosphere.core.io.IOReadableWritable;
import eu.stratosphere.core.io.StringRecord;
import eu.stratosphere.nephele.execution.librarycache.LibraryCacheManager;
@@ -29,8 +30,7 @@ import eu.stratosphere.nephele.util.EnumUtils;
import eu.stratosphere.util.StringUtils;
/**
- * An abstract base class for a job vertex in Nephele.
- *
+ * An abstract base class for a job vertex.
*/
public abstract class AbstractJobVertex implements IOReadableWritable {
@@ -86,19 +86,30 @@ public abstract class AbstractJobVertex implements IOReadableWritable {
*/
protected Class<? extends AbstractInvokable> invokableClass = null;
+
/**
* Constructs a new job vertex and assigns it with the given name.
*
* @param name
* the name of the new job vertex
- * @param id
- * the ID of this vertex
* @param jobGraph
* the job graph this vertex belongs to
*/
- protected AbstractJobVertex(final String name, final JobVertexID id, final JobGraph jobGraph) {
+ protected AbstractJobVertex(String name, JobGraph jobGraph) {
+ this(name, null, jobGraph);
+ }
+
+ /**
+ * Constructs a new job vertex and assigns it with the given name.
+ *
+ * @param name
+ * the name of the new job vertex
+ * @param jobGraph
+ * the job graph this vertex belongs to
+ */
+ protected AbstractJobVertex(String name, JobVertexID id, JobGraph jobGraph) {
this.name = name == null ? DEFAULT_NAME : name;
- this.id = (id == null) ? new JobVertexID() : id;
+ this.id = id == null ? new JobVertexID() : id;
this.jobGraph = jobGraph;
}
@@ -572,13 +583,17 @@ public abstract class AbstractJobVertex implements IOReadableWritable {
return this.configuration;
}
+ public void setInvokableClass(Class<? extends AbstractInvokable> invokable) {
+ Validate.notNull(invokable);
+ this.invokableClass = invokable;
+ }
+
/**
* Returns the invokable class which represents the task of this vertex
*
* @return the invokable class, <code>null</code> if it is not set
*/
public Class<? extends AbstractInvokable> getInvokableClass() {
-
return this.invokableClass;
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8c1d82a8/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobGraph.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobGraph.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobGraph.java
index f048b0d..3d14d0a 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobGraph.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobGraph.java
@@ -26,8 +26,6 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.Stack;
-import java.util.Vector;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.core.fs.FSDataInputStream;
@@ -77,11 +75,6 @@ public class JobGraph implements IOReadableWritable {
private Configuration jobConfiguration = new Configuration();
/**
- * The configuration which should be applied to the task managers involved in processing this job.
- */
- private final Configuration taskManagerConfiguration = new Configuration();
-
- /**
* List of JAR files required to run this job.
*/
private final ArrayList<Path> userJars = new ArrayList<Path>();
@@ -134,24 +127,12 @@ public class JobGraph implements IOReadableWritable {
}
/**
- * Returns the configuration object distributed among the task managers
- * before they start processing this job.
- *
- * @return the configuration object for the task managers, or <code>null</code> if it is not set
- */
- public Configuration getTaskmanagerConfiguration() {
-
- return this.taskManagerConfiguration;
- }
-
- /**
* Adds a new input vertex to the job graph if it is not already included.
*
* @param inputVertex
* the new input vertex to be added
*/
- public void addVertex(final AbstractJobInputVertex inputVertex) {
-
+ public void addVertex(AbstractJobInputVertex inputVertex) {
if (!inputVertices.containsKey(inputVertex.getID())) {
inputVertices.put(inputVertex.getID(), inputVertex);
}
@@ -163,8 +144,7 @@ public class JobGraph implements IOReadableWritable {
* @param taskVertex
* the new task vertex to be added
*/
- public void addVertex(final JobTaskVertex taskVertex) {
-
+ public void addVertex(JobTaskVertex taskVertex) {
if (!taskVertices.containsKey(taskVertex.getID())) {
taskVertices.put(taskVertex.getID(), taskVertex);
}
@@ -176,8 +156,7 @@ public class JobGraph implements IOReadableWritable {
* @param outputVertex
* the new output vertex to be added
*/
- public void addVertex(final AbstractJobOutputVertex outputVertex) {
-
+ public void addVertex(AbstractJobOutputVertex outputVertex) {
if (!outputVertices.containsKey(outputVertex.getID())) {
outputVertices.put(outputVertex.getID(), outputVertex);
}
@@ -570,9 +549,6 @@ public class JobGraph implements IOReadableWritable {
// Re-instantiate the job configuration object and read the configuration
this.jobConfiguration = new Configuration(cl);
this.jobConfiguration.read(in);
-
- // Read the task manager configuration
- this.taskManagerConfiguration.read(in);
}
@@ -610,7 +586,6 @@ public class JobGraph implements IOReadableWritable {
// Write out configuration objects
this.jobConfiguration.write(out);
- this.taskManagerConfiguration.write(out);
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8c1d82a8/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobInputVertex.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobInputVertex.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobInputVertex.java
index 29f98d9..bf8f544 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobInputVertex.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobInputVertex.java
@@ -13,41 +13,21 @@
package eu.stratosphere.nephele.jobgraph;
+import java.io.IOException;
+
import eu.stratosphere.api.common.io.InputFormat;
-import eu.stratosphere.api.common.operators.util.UserCodeObjectWrapper;
import eu.stratosphere.api.common.operators.util.UserCodeWrapper;
-import eu.stratosphere.api.common.typeutils.TypeSerializerFactory;
-import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.core.io.InputSplit;
-import eu.stratosphere.nephele.execution.librarycache.LibraryCacheManager;
-import eu.stratosphere.nephele.template.AbstractInputTask;
import eu.stratosphere.pact.runtime.task.util.TaskConfig;
-import java.io.DataInput;
-import java.io.IOException;
-
public class JobInputVertex extends AbstractJobInputVertex {
- /**
- * Input format associated to this JobInputVertex. It is either directly set or reconstructed from the task
- * configuration. Every job input vertex requires an input format to compute the input splits and the input split
- * type.
- */
- private volatile InputFormat<?, ? extends InputSplit> inputFormat = null;
- /**
- * Creates a new job input vertex with the specified name.
- *
- * @param name
- * The name of the new job file input vertex.
- * @param id
- * The ID of this vertex.
- * @param jobGraph
- * The job graph this vertex belongs to.
- */
- public JobInputVertex(final String name, final JobVertexID id, final JobGraph jobGraph) {
+ private InputFormat<?, ?> inputFormat;
+
+ public JobInputVertex(String name, JobVertexID id, JobGraph jobGraph) {
super(name, id, jobGraph);
}
-
+
/**
* Creates a new job file input vertex with the specified name.
*
@@ -56,8 +36,8 @@ public class JobInputVertex extends AbstractJobInputVertex {
* @param jobGraph
* The job graph this vertex belongs to.
*/
- public JobInputVertex(final String name, final JobGraph jobGraph) {
- super(name, null, jobGraph);
+ public JobInputVertex(String name, JobGraph jobGraph) {
+ this(name, null, jobGraph);
}
/**
@@ -66,112 +46,23 @@ public class JobInputVertex extends AbstractJobInputVertex {
* @param jobGraph
* The job graph this vertex belongs to.
*/
- public JobInputVertex(final JobGraph jobGraph) {
- super(null, null, jobGraph);
- }
-
- /**
- * Sets the class of the vertex's input task.
- *
- * @param inputClass
- * The class of the vertex's input task.
- */
- public void setInputClass(final Class<? extends AbstractInputTask<?>> inputClass) {
- this.invokableClass = inputClass;
- }
-
- /**
- * Returns the class of the vertex's input task.
- *
- * @return the class of the vertex's input task or <code>null</code> if no task has yet been set
- */
- @SuppressWarnings("unchecked")
- public Class<? extends AbstractInputTask<?>> getInputClass() {
- return (Class<? extends AbstractInputTask<?>>) this.invokableClass;
- }
-
- /**
- * Sets the input format and writes it to the task configuration. It extracts it from the UserCodeWrapper.
- *
- * @param inputFormatWrapper Wrapped input format
- */
- public void setInputFormat(UserCodeWrapper<? extends InputFormat<?, ? extends InputSplit>> inputFormatWrapper) {
- TaskConfig config = new TaskConfig(this.getConfiguration());
- config.setStubWrapper(inputFormatWrapper);
-
- inputFormat = inputFormatWrapper.getUserCodeObject();
- }
-
- /**
- * Sets the input format and writes it to the task configuration.
- *
- * @param inputFormat Input format
- */
- public void setInputFormat(InputFormat<?, ? extends InputSplit> inputFormat) {
- this.inputFormat = inputFormat;
-
- UserCodeWrapper<? extends InputFormat<?, ? extends InputSplit>> wrapper = new
- UserCodeObjectWrapper<InputFormat<?, ? extends InputSplit>>(inputFormat);
- TaskConfig config = new TaskConfig(this.getConfiguration());
- config.setStubWrapper(wrapper);
+ public JobInputVertex(JobGraph jobGraph) {
+ this(null, jobGraph);
}
-
- /**
- * Sets the input format parameters.
- *
- * @param inputFormatParameters Input format parameters
- */
- public void setInputFormatParameters(Configuration inputFormatParameters){
- TaskConfig config = new TaskConfig(this.getConfiguration());
- config.setStubParameters(inputFormatParameters);
-
- if(inputFormat == null){
- throw new RuntimeException("There is no input format set in job vertex: " + this.getID());
- }
-
- inputFormat.configure(inputFormatParameters);
+
+ public void setInputFormat(InputFormat<?, ?> format) {
+ this.inputFormat = format;
}
-
- /**
- * Sets the output serializer for the task associated to this vertex.
- *
- * @param factory Type serializer factory
- */
- public void setOutputSerializer(TypeSerializerFactory<?> factory){
- TaskConfig config = new TaskConfig(this.getConfiguration());
- config.setOutputSerializer(factory);
- }
-
- /**
- * Deserializes the input format from the deserialized task configuration. It then configures the input format by
- * calling the configure method with the current configuration.
- *
- * @param input
- * @throws IOException
- */
- @Override
- public void read(final DataInput input) throws IOException{
- super.read(input);
-
- // load input format wrapper from the config
- ClassLoader cl = null;
-
- try{
- cl = LibraryCacheManager.getClassLoader(this.getJobGraph().getJobID());
+
+ public void initializeInputFormatFromTaskConfig(ClassLoader cl) {
+ TaskConfig cfg = new TaskConfig(getConfiguration());
+
+ UserCodeWrapper<InputFormat<?, ?>> wrapper = cfg.<InputFormat<?, ?>>getStubWrapper(cl);
+
+ if (wrapper != null) {
+ this.inputFormat = wrapper.getUserCodeObject(InputFormat.class, cl);
+ this.inputFormat.configure(cfg.getStubParameters());
}
- catch (IOException ioe) {
- throw new RuntimeException("Usercode ClassLoader could not be obtained for job: " +
- this.getJobGraph().getJobID(), ioe);
- }
-
- final Configuration config = this.getConfiguration();
- config.setClassLoader(cl);
- final TaskConfig taskConfig = new TaskConfig(config);
-
- inputFormat = taskConfig.<InputFormat<?, InputSplit>>getStubWrapper(cl).getUserCodeObject(InputFormat.class,
- cl);
-
- inputFormat.configure(taskConfig.getStubParameters());
}
/**
@@ -197,7 +88,7 @@ public class JobInputVertex extends AbstractJobInputVertex {
*/
@Override
public InputSplit[] getInputSplits(int minNumSplits) throws IOException {
- if(inputFormat == null){
+ if (inputFormat == null){
throw new RuntimeException("No input format has been set for job vertex: "+ this.getID());
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8c1d82a8/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobOutputVertex.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobOutputVertex.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobOutputVertex.java
index cf937a0..abe6be9 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobOutputVertex.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobOutputVertex.java
@@ -14,41 +14,20 @@
package eu.stratosphere.nephele.jobgraph;
import eu.stratosphere.api.common.io.OutputFormat;
-import eu.stratosphere.api.common.operators.util.UserCodeObjectWrapper;
import eu.stratosphere.api.common.operators.util.UserCodeWrapper;
-import eu.stratosphere.configuration.Configuration;
-import eu.stratosphere.nephele.execution.librarycache.LibraryCacheManager;
-import eu.stratosphere.nephele.template.AbstractOutputTask;
import eu.stratosphere.pact.runtime.task.util.TaskConfig;
-import java.io.DataInput;
-import java.io.IOException;
-
/**
- * A JobOutputVertex is a specific subtype of a {@link AbstractJobOutputVertex} and is designed
+ * A JobOutputVertex is a specific sub-type of a {@link AbstractJobOutputVertex} and is designed
* for Nephele tasks which sink data in a not further specified way. As every job output vertex,
* a JobOutputVertex must not have any further output.
- *
*/
public class JobOutputVertex extends AbstractJobOutputVertex {
/**
* Contains the output format associated to this output vertex. It can be <pre>null</pre>.
*/
- private volatile OutputFormat<?> outputFormat = null;
+ private OutputFormat<?> outputFormat;
- /**
- * Creates a new job file output vertex with the specified name.
- *
- * @param name
- * the name of the new job file output vertex
- * @param id
- * the ID of this vertex
- * @param jobGraph
- * the job graph this vertex belongs to
- */
- public JobOutputVertex(final String name, final JobVertexID id, final JobGraph jobGraph) {
- super(name, id, jobGraph);
- }
/**
* Creates a new job file output vertex with the specified name.
@@ -58,8 +37,12 @@ public class JobOutputVertex extends AbstractJobOutputVertex {
* @param jobGraph
* the job graph this vertex belongs to
*/
- public JobOutputVertex(final String name, final JobGraph jobGraph) {
- super(name, null, jobGraph);
+ public JobOutputVertex(String name, JobGraph jobGraph) {
+ this(name, null, jobGraph);
+ }
+
+ public JobOutputVertex(String name, JobVertexID id, JobGraph jobGraph) {
+ super(name, id, jobGraph);
}
/**
@@ -68,94 +51,21 @@ public class JobOutputVertex extends AbstractJobOutputVertex {
* @param jobGraph
* the job graph this vertex belongs to
*/
- public JobOutputVertex(final JobGraph jobGraph) {
- super(null, null, jobGraph);
- }
-
- /**
- * Sets the class of the vertex's output task.
- *
- * @param outputClass
- * The class of the vertex's output task.
- */
- public void setOutputClass(final Class<? extends AbstractOutputTask> outputClass) {
- this.invokableClass = outputClass;
+ public JobOutputVertex(JobGraph jobGraph) {
+ this(null, jobGraph);
}
-
- /**
- * Returns the class of the vertex's output task.
- *
- * @return The class of the vertex's output task or <code>null</code> if no task has yet been set.
- */
- @SuppressWarnings("unchecked")
- public Class<? extends AbstractOutputTask> getOutputClass() {
- return (Class<? extends AbstractOutputTask>) this.invokableClass;
+
+ public void setOutputFormat(OutputFormat<?> format) {
+ this.outputFormat = format;
}
-
- /**
- * Sets the output format and writes it to the task configuration.
- *
- * @param outputFormatWrapper Wrapped output format
- */
- public void setOutputFormat(UserCodeWrapper<? extends OutputFormat<?>> outputFormatWrapper){
- TaskConfig config = new TaskConfig(this.getConfiguration());
- config.setStubWrapper(outputFormatWrapper);
- outputFormat = outputFormatWrapper.getUserCodeObject();
- }
-
- /**
- * Sets the output format and writes it to the task configuration.
- *
- * @param outputFormat Output format
- */
- public void setOutputFormat(OutputFormat<?> outputFormat){
- this.outputFormat = outputFormat;
- UserCodeWrapper<? extends OutputFormat<?>> wrapper = new UserCodeObjectWrapper<OutputFormat<?>>
- (outputFormat);
- TaskConfig config = new TaskConfig(this.getConfiguration());
- config.setStubWrapper(wrapper);
- }
-
- /**
- * Sets the output format parameters for the output format by writing it to the task configuration.
- *
- * @param parameters Output format parameters
- */
- public void setOutputFormatParameters(Configuration parameters){
- TaskConfig config = new TaskConfig(this.getConfiguration());
- config.setStubParameters(parameters);
-
- outputFormat.configure(parameters);
- }
-
- /**
- * Deserializes the output format from the deserialized configuration if it contains an output format. The output
- * format is always stored in the stub wrapper. If the task configuration contains an output format,
- * then it is configured after deserialization.
- *
- * @param input
- * @throws IOException
- */
- @Override
- public void read(final DataInput input) throws IOException{
- super.read(input);
-
- ClassLoader cl = null;
- try{
- cl = LibraryCacheManager.getClassLoader(this.getJobGraph().getJobID());
- }
- catch (IOException ioe) {
- throw new RuntimeException("Usercode ClassLoader could not be obtained for job: " +
- this.getJobGraph().getJobID(), ioe);
- }
-
- final Configuration config = this.getConfiguration();
- config.setClassLoader(cl);
- final TaskConfig taskConfig = new TaskConfig(config);
-
- if(taskConfig.hasStubWrapper()){
- outputFormat = taskConfig.<OutputFormat<?> >getStubWrapper(cl).getUserCodeObject(OutputFormat.class,cl);
- outputFormat.configure(taskConfig.getStubParameters());
+
+ public void initializeOutputFormatFromTaskConfig(ClassLoader cl) {
+ TaskConfig cfg = new TaskConfig(getConfiguration());
+ UserCodeWrapper<OutputFormat<?>> wrapper = cfg.<OutputFormat<?>>getStubWrapper(cl);
+
+ if (wrapper != null) {
+ this.outputFormat = wrapper.getUserCodeObject(OutputFormat.class, cl);
+ this.outputFormat.configure(cfg.getStubParameters());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8c1d82a8/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobTaskVertex.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobTaskVertex.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobTaskVertex.java
index 8672aeb..d16286c 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobTaskVertex.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobTaskVertex.java
@@ -13,8 +13,6 @@
package eu.stratosphere.nephele.jobgraph;
-import eu.stratosphere.nephele.template.AbstractTask;
-
/**
* A JobTaskVertex is the vertex type for regular tasks (with both input and output) in Nephele.
* Tasks running inside a JobTaskVertex must specify at least one record reader and one record writer.
@@ -27,28 +25,15 @@ public class JobTaskVertex extends AbstractJobVertex {
*
* @param name
* the name for the new job task vertex
- * @param id
- * the ID of this vertex
* @param jobGraph
* the job graph this vertex belongs to
*/
- public JobTaskVertex(final String name, final JobVertexID id, final JobGraph jobGraph) {
- super(name, id, jobGraph);
-
- jobGraph.addVertex(this);
+ public JobTaskVertex(String name, JobGraph jobGraph) {
+ this(name, null, jobGraph);
}
-
- /**
- * Creates a new job task vertex with the specified name.
- *
- * @param name
- * the name for the new job task vertex
- * @param jobGraph
- * the job graph this vertex belongs to
- */
- public JobTaskVertex(final String name, final JobGraph jobGraph) {
- super(name, null, jobGraph);
-
+
+ public JobTaskVertex(String name, JobVertexID id, JobGraph jobGraph) {
+ super(name, id, jobGraph);
jobGraph.addVertex(this);
}
@@ -58,29 +43,7 @@ public class JobTaskVertex extends AbstractJobVertex {
* @param jobGraph
* the job graph this vertex belongs to
*/
- public JobTaskVertex(final JobGraph jobGraph) {
- super(null, null, jobGraph);
-
- jobGraph.addVertex(this);
- }
-
- /**
- * Sets the class of the vertex's task.
- *
- * @param taskClass
- * the class of the vertex's task
- */
- public void setTaskClass(final Class<? extends AbstractTask> taskClass) {
- this.invokableClass = taskClass;
- }
-
- /**
- * Returns the class of the vertex's task.
- *
- * @return the class of the vertex's task or <code>null</code> if the class has not yet been set
- */
- @SuppressWarnings("unchecked")
- public Class<? extends AbstractTask> getTaskClass() {
- return (Class<? extends AbstractTask>) this.invokableClass;
+ public JobTaskVertex(JobGraph jobGraph) {
+ this(null, jobGraph);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8c1d82a8/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/JobManager.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/JobManager.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/JobManager.java
index 8a3cba4..f3cf3a3 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/JobManager.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/JobManager.java
@@ -393,9 +393,7 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
try {
// First check if job is null
if (job == null) {
- JobSubmissionResult result = new JobSubmissionResult(AbstractJobResult.ReturnCode.ERROR,
- "Submitted job is null!");
- return result;
+ return new JobSubmissionResult(AbstractJobResult.ReturnCode.ERROR, "Submitted job is null!");
}
if (LOG.isDebugEnabled()) {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8c1d82a8/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/splitassigner/InputSplitManager.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/splitassigner/InputSplitManager.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/splitassigner/InputSplitManager.java
index 790aca9..da63bf2 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/splitassigner/InputSplitManager.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/splitassigner/InputSplitManager.java
@@ -31,8 +31,6 @@ import eu.stratosphere.nephele.executiongraph.ExecutionGroupVertexIterator;
import eu.stratosphere.nephele.executiongraph.ExecutionVertex;
import eu.stratosphere.nephele.jobgraph.JobID;
import eu.stratosphere.nephele.jobmanager.splitassigner.file.FileInputSplitAssigner;
-import eu.stratosphere.nephele.template.AbstractInputTask;
-import eu.stratosphere.nephele.template.AbstractInvokable;
import eu.stratosphere.util.StringUtils;
/**
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8c1d82a8/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/splitassigner/LocatableInputSplitAssigner.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/splitassigner/LocatableInputSplitAssigner.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/splitassigner/LocatableInputSplitAssigner.java
index 1e6929d..dc52911 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/splitassigner/LocatableInputSplitAssigner.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/splitassigner/LocatableInputSplitAssigner.java
@@ -24,8 +24,6 @@ import eu.stratosphere.core.io.InputSplit;
import eu.stratosphere.core.io.LocatableInputSplit;
import eu.stratosphere.nephele.executiongraph.ExecutionGroupVertex;
import eu.stratosphere.nephele.executiongraph.ExecutionVertex;
-import eu.stratosphere.nephele.template.AbstractInputTask;
-import eu.stratosphere.nephele.template.AbstractInvokable;
/**
* The locatable input split assigner is a specific implementation of the {@link InputSplitAssigner} interface for
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8c1d82a8/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/splitassigner/file/FileInputSplitAssigner.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/splitassigner/file/FileInputSplitAssigner.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/splitassigner/file/FileInputSplitAssigner.java
index 048562c..3580fda 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/splitassigner/file/FileInputSplitAssigner.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/splitassigner/file/FileInputSplitAssigner.java
@@ -25,8 +25,6 @@ import eu.stratosphere.core.io.InputSplit;
import eu.stratosphere.nephele.executiongraph.ExecutionGroupVertex;
import eu.stratosphere.nephele.executiongraph.ExecutionVertex;
import eu.stratosphere.nephele.jobmanager.splitassigner.InputSplitAssigner;
-import eu.stratosphere.nephele.template.AbstractInputTask;
-import eu.stratosphere.nephele.template.AbstractInvokable;
/**
* The file input split assigner is a specific implementation of the {@link InputSplitAssigner} interface for
@@ -89,14 +87,11 @@ public final class FileInputSplitAssigner implements InputSplitAssigner {
}
}
-
@Override
public void unregisterGroupVertex(final ExecutionGroupVertex groupVertex) {
-
this.vertexMap.remove(groupVertex);
}
-
@Override
public InputSplit getNextInputSplit(final ExecutionVertex vertex) {