You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2014/06/26 11:46:40 UTC
[15/53] [abbrv] Removed RuntimeEnvironment instantiation from
execution graph construction. Removed legacy job vertex classes and
input/output tasks.
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ea79186b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/DataSinkTask.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/DataSinkTask.java b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/DataSinkTask.java
index 82359f5..cbe1766 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/DataSinkTask.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/DataSinkTask.java
@@ -351,64 +351,6 @@ public class DataSinkTask<IT> extends AbstractOutputTask {
throw new Exception("Illegal configuration: Number of input gates and group sizes are not consistent.");
}
}
-
- // ------------------------------------------------------------------------
- // Degree of parallelism & checks
- // ------------------------------------------------------------------------
-
-
- @Override
- public int getMaximumNumberOfSubtasks() {
- if (!(this.format instanceof FileOutputFormat<?>)) {
- return -1;
- }
-
- final FileOutputFormat<?> fileOutputFormat = (FileOutputFormat<?>) this.format;
-
- // ----------------- This code applies only to file inputs ------------------
-
- final Path path = fileOutputFormat.getOutputFilePath();
- final WriteMode writeMode = fileOutputFormat.getWriteMode();
- final OutputDirectoryMode outDirMode = fileOutputFormat.getOutputDirectoryMode();
-
- // Prepare output path and determine max DOP
- try {
-
- int dop = getTaskConfiguration().getInteger(DEGREE_OF_PARALLELISM_KEY, -1);
- final FileSystem fs = path.getFileSystem();
-
- if(dop == 1 && outDirMode == OutputDirectoryMode.PARONLY) {
- // output is not written in parallel and should be written to a single file.
-
- if(fs.isDistributedFS()) {
- // prepare distributed output path
- if(!fs.initOutPathDistFS(path, writeMode, false)) {
- // output preparation failed! Cancel task.
- throw new IOException("Output path could not be initialized.");
- }
- }
-
- return 1;
-
- } else {
- // output should be written to a directory
-
- if(fs.isDistributedFS()) {
- // only distributed file systems can be initialized at start-up time.
- if(!fs.initOutPathDistFS(path, writeMode, true)) {
- throw new IOException("Output directory could not be created.");
- }
- }
-
- return -1;
-
- }
- }
- catch (IOException e) {
- LOG.error("Could not access the file system to detemine the status of the output.", e);
- throw new RuntimeException("I/O Error while accessing file", e);
- }
- }
// ------------------------------------------------------------------------
// Utilities
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ea79186b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/DataSourceTask.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/DataSourceTask.java b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/DataSourceTask.java
index af176b9..f835ace 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/DataSourceTask.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/DataSourceTask.java
@@ -78,27 +78,12 @@ public class DataSourceTask<OT> extends AbstractInputTask<InputSplit> {
@Override
public void registerInputOutput()
{
- if (LOG.isDebugEnabled()) {
+ initInputFormat();
+
+ if (LOG.isDebugEnabled())
LOG.debug(getLogString("Start registering input and output"));
}
- if (this.userCodeClassLoader == null) {
- try {
- this.userCodeClassLoader = LibraryCacheManager.getClassLoader(getEnvironment().getJobID());
- }
- catch (IOException ioe) {
- throw new RuntimeException("Usercode ClassLoader could not be obtained for job: " +
- getEnvironment().getJobID(), ioe);
- }
- }
-
- // obtain task configuration (including stub parameters)
- Configuration taskConf = getTaskConfiguration();
- taskConf.setClassLoader(this.userCodeClassLoader);
- this.config = new TaskConfig(taskConf);
-
- initInputFormat(this.userCodeClassLoader);
-
try {
initOutputs(this.userCodeClassLoader);
} catch (Exception ex) {
@@ -301,17 +286,42 @@ public class DataSourceTask<OT> extends AbstractInputTask<InputSplit> {
/**
* Initializes the InputFormat implementation and configuration.
- *
+l *
* @throws RuntimeException
* Throws if instance of InputFormat implementation can not be
* obtained.
*/
- private void initInputFormat(ClassLoader cl) {
- // instantiate the stub
- @SuppressWarnings("unchecked")
- Class<InputFormat<OT, InputSplit>> superClass = (Class<InputFormat<OT, InputSplit>>) (Class<?>) InputFormat.class;
- this.format = RegularPactTask.instantiateUserCode(this.config, cl, superClass);
-
+ private void initInputFormat() {
+ if (this.userCodeClassLoader == null) {
+ try {
+ this.userCodeClassLoader = LibraryCacheManager.getClassLoader(getEnvironment().getJobID());
+ }
+ catch (IOException ioe) {
+ throw new RuntimeException("Usercode ClassLoader could not be obtained for job: " +
+ getEnvironment().getJobID(), ioe);
+ }
+ }
+
+ // obtain task configuration (including stub parameters)
+ Configuration taskConf = getTaskConfiguration();
+ taskConf.setClassLoader(this.userCodeClassLoader);
+ this.config = new TaskConfig(taskConf);
+
+ try {
+ this.format = config.<InputFormat<OT, InputSplit>>getStubWrapper(this.userCodeClassLoader)
+ .getUserCodeObject(InputFormat.class, this.userCodeClassLoader);
+
+ // check if the class is a subclass, if the check is required
+ if (!InputFormat.class.isAssignableFrom(this.format.getClass())) {
+ throw new RuntimeException("The class '" + this.format.getClass().getName() + "' is not a subclass of '" +
+ InputFormat.class.getName() + "' as is required.");
+ }
+ }
+ catch (ClassCastException ccex) {
+ throw new RuntimeException("The stub class is not a proper subclass of " + InputFormat.class.getName(),
+ ccex);
+ }
+
// configure the stub. catch exceptions here extra, to report them as originating from the user code
try {
this.format.configure(this.config.getStubParameters());
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ea79186b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/util/TaskConfig.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/util/TaskConfig.java b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/util/TaskConfig.java
index a43f8cc..2eb003d 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/util/TaskConfig.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/util/TaskConfig.java
@@ -246,6 +246,10 @@ public class TaskConfig {
public String getTaskName() {
return this.config.getString(TASK_NAME, null);
}
+
+ public boolean hasStubWrapper() {
+ return this.config.containsKey(STUB_OBJECT);
+ }
public void setStubWrapper(UserCodeWrapper<?> wrapper) {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ea79186b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/deployment/TaskDeploymentDescriptorTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/deployment/TaskDeploymentDescriptorTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/deployment/TaskDeploymentDescriptorTest.java
index 7000667..dc2e605 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/deployment/TaskDeploymentDescriptorTest.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/deployment/TaskDeploymentDescriptorTest.java
@@ -19,12 +19,12 @@ import static org.junit.Assert.fail;
import java.io.IOException;
+import eu.stratosphere.pact.runtime.task.RegularPactTask;
import org.junit.Test;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.execution.librarycache.LibraryCacheManager;
import eu.stratosphere.nephele.executiongraph.ExecutionVertexID;
-import eu.stratosphere.nephele.util.FileLineReader;
import eu.stratosphere.nephele.jobgraph.JobID;
import eu.stratosphere.nephele.template.AbstractInvokable;
import eu.stratosphere.nephele.util.SerializableArrayList;
@@ -50,7 +50,7 @@ public class TaskDeploymentDescriptorTest {
final int currentNumberOfSubtasks = 1;
final Configuration jobConfiguration = new Configuration();
final Configuration taskConfiguration = new Configuration();
- final Class<? extends AbstractInvokable> invokableClass = FileLineReader.class;
+ final Class<? extends AbstractInvokable> invokableClass = RegularPactTask.class;
final SerializableArrayList<GateDeploymentDescriptor> outputGates = new SerializableArrayList<GateDeploymentDescriptor>(
0);
final SerializableArrayList<GateDeploymentDescriptor> inputGates = new SerializableArrayList<GateDeploymentDescriptor>(
@@ -85,7 +85,7 @@ public class TaskDeploymentDescriptorTest {
final int currentNumberOfSubtasks = 1;
final Configuration jobConfiguration = new Configuration();
final Configuration taskConfiguration = new Configuration();
- final Class<? extends AbstractInvokable> invokableClass = FileLineReader.class;
+ final Class<? extends AbstractInvokable> invokableClass = RegularPactTask.class;
final SerializableArrayList<GateDeploymentDescriptor> outputGates = new SerializableArrayList<GateDeploymentDescriptor>(
0);
final SerializableArrayList<GateDeploymentDescriptor> inputGates = new SerializableArrayList<GateDeploymentDescriptor>(
@@ -239,7 +239,7 @@ public class TaskDeploymentDescriptorTest {
final int currentNumberOfSubtasks = 1;
final Configuration jobConfiguration = new Configuration();
final Configuration taskConfiguration = new Configuration();
- final Class<? extends AbstractInvokable> invokableClass = FileLineReader.class;
+ final Class<? extends AbstractInvokable> invokableClass = RegularPactTask.class;
final SerializableArrayList<GateDeploymentDescriptor> outputGates = new SerializableArrayList<GateDeploymentDescriptor>(
0);
final SerializableArrayList<GateDeploymentDescriptor> inputGates = new SerializableArrayList<GateDeploymentDescriptor>(
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ea79186b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/executiongraph/SelfCrossInputTask.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/executiongraph/SelfCrossInputTask.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/executiongraph/SelfCrossInputTask.java
deleted file mode 100644
index 1ce23e6..0000000
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/executiongraph/SelfCrossInputTask.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.executiongraph;
-
-import eu.stratosphere.core.io.StringRecord;
-import eu.stratosphere.runtime.io.api.RecordWriter;
-import eu.stratosphere.nephele.template.AbstractFileInputTask;
-
-/**
- * This class represents the data source in the self cross unit test.
- *
- */
-public class SelfCrossInputTask extends AbstractFileInputTask {
-
-
- @Override
- public void registerInputOutput() {
-
- new RecordWriter<StringRecord>(this);
- new RecordWriter<StringRecord>(this);
- }
-
-
- @Override
- public void invoke() throws Exception {
-
- // Nothing to do here
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ea79186b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/DoubleSourceTask.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/DoubleSourceTask.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/DoubleSourceTask.java
deleted file mode 100644
index 1e2be47..0000000
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/DoubleSourceTask.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.jobmanager;
-
-import java.util.Iterator;
-
-import eu.stratosphere.core.fs.FSDataInputStream;
-import eu.stratosphere.core.fs.FileInputSplit;
-import eu.stratosphere.core.fs.FileSystem;
-import eu.stratosphere.core.io.StringRecord;
-import eu.stratosphere.runtime.io.api.RecordWriter;
-import eu.stratosphere.nephele.template.AbstractFileInputTask;
-import eu.stratosphere.runtime.fs.LineReader;
-
-public class DoubleSourceTask extends AbstractFileInputTask {
-
- private RecordWriter<StringRecord> output1 = null;
-
- private RecordWriter<StringRecord> output2 = null;
-
- @Override
- public void invoke() throws Exception {
- this.output1.initializeSerializers();
- this.output2.initializeSerializers();
-
- final Iterator<FileInputSplit> splitIterator = getFileInputSplits();
-
- while (splitIterator.hasNext()) {
-
- final FileInputSplit split = splitIterator.next();
-
- final long start = split.getStart();
- final long length = split.getLength();
-
- final FileSystem fs = FileSystem.get(split.getPath().toUri());
-
- final FSDataInputStream fdis = fs.open(split.getPath());
-
- final LineReader lineReader = new LineReader(fdis, start, length, (1024 * 1024));
-
- byte[] line = lineReader.readLine();
-
- while (line != null) {
-
- // Create a string object from the data read
- StringRecord str = new StringRecord();
- str.set(line);
-
- // Send out string
- output1.emit(str);
- output2.emit(str);
-
- line = lineReader.readLine();
- }
-
- // Close the stream;
- lineReader.close();
- }
-
- this.output1.flush();
- this.output2.flush();
- }
-
- @Override
- public void registerInputOutput() {
- this.output1 = new RecordWriter<StringRecord>(this);
- this.output2 = new RecordWriter<StringRecord>(this);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ea79186b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/DoubleTargetTask.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/DoubleTargetTask.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/DoubleTargetTask.java
index f0ca435..a1ce0b2 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/DoubleTargetTask.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/DoubleTargetTask.java
@@ -13,18 +13,18 @@
package eu.stratosphere.nephele.jobmanager;
-import eu.stratosphere.core.io.StringRecord;
import eu.stratosphere.runtime.io.api.RecordReader;
import eu.stratosphere.runtime.io.api.RecordWriter;
import eu.stratosphere.nephele.template.AbstractTask;
+import eu.stratosphere.types.Record;
public class DoubleTargetTask extends AbstractTask {
- private RecordReader<StringRecord> input1 = null;
+ private RecordReader<Record> input1 = null;
- private RecordReader<StringRecord> input2 = null;
+ private RecordReader<Record> input2 = null;
- private RecordWriter<StringRecord> output = null;
+ private RecordWriter<Record> output = null;
@Override
public void invoke() throws Exception {
@@ -33,13 +33,13 @@ public class DoubleTargetTask extends AbstractTask {
while (this.input1.hasNext()) {
- StringRecord s = input1.next();
+ Record s = input1.next();
this.output.emit(s);
}
while (this.input2.hasNext()) {
- StringRecord s = input2.next();
+ Record s = input2.next();
this.output.emit(s);
}
@@ -49,9 +49,9 @@ public class DoubleTargetTask extends AbstractTask {
@Override
public void registerInputOutput() {
- this.input1 = new RecordReader<StringRecord>(this, StringRecord.class);
- this.input2 = new RecordReader<StringRecord>(this, StringRecord.class);
- this.output = new RecordWriter<StringRecord>(this);
+ this.input1 = new RecordReader<Record>(this, Record.class);
+ this.input2 = new RecordReader<Record>(this, Record.class);
+ this.output = new RecordWriter<Record>(this);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ea79186b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/ForwardTask.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/ForwardTask.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/ForwardTask.java
index 96be668..377e304 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/ForwardTask.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/ForwardTask.java
@@ -13,15 +13,15 @@
package eu.stratosphere.nephele.jobmanager;
-import eu.stratosphere.core.io.StringRecord;
import eu.stratosphere.runtime.io.api.RecordReader;
import eu.stratosphere.runtime.io.api.RecordWriter;
import eu.stratosphere.nephele.template.AbstractTask;
+import eu.stratosphere.types.Record;
public class ForwardTask extends AbstractTask {
- private RecordReader<StringRecord> input = null;
- private RecordWriter<StringRecord> output = null;
+ private RecordReader<Record> input = null;
+ private RecordWriter<Record> output = null;
@Override
public void invoke() throws Exception {
@@ -30,7 +30,7 @@ public class ForwardTask extends AbstractTask {
while (this.input.hasNext()) {
- StringRecord s = input.next();
+ Record s = input.next();
this.output.emit(s);
}
@@ -39,7 +39,7 @@ public class ForwardTask extends AbstractTask {
@Override
public void registerInputOutput() {
- this.input = new RecordReader<StringRecord>(this, StringRecord.class);
- this.output = new RecordWriter<StringRecord>(this);
+ this.input = new RecordReader<Record>(this, Record.class);
+ this.output = new RecordWriter<Record>(this);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ea79186b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/UnionTask.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/UnionTask.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/UnionTask.java
index 124a24d..209eff1 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/UnionTask.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/UnionTask.java
@@ -18,6 +18,7 @@ import eu.stratosphere.runtime.io.api.MutableRecordReader;
import eu.stratosphere.runtime.io.api.RecordWriter;
import eu.stratosphere.runtime.io.api.UnionRecordReader;
import eu.stratosphere.nephele.template.AbstractTask;
+import eu.stratosphere.types.Record;
/**
* A simple implementation of a task using a {@link UnionRecordReader}.
@@ -27,21 +28,22 @@ public class UnionTask extends AbstractTask {
/**
* The union record reader to be used during the tests.
*/
- private UnionRecordReader<StringRecord> unionReader;
+ private UnionRecordReader<Record> unionReader;
- private RecordWriter<StringRecord> writer;
+ private RecordWriter<Record> writer;
@Override
public void registerInputOutput() {
@SuppressWarnings("unchecked")
- MutableRecordReader<StringRecord>[] recordReaders = (MutableRecordReader<StringRecord>[]) new MutableRecordReader<?>[2];
- recordReaders[0] = new MutableRecordReader<StringRecord>(this);
- recordReaders[1] = new MutableRecordReader<StringRecord>(this);
- this.unionReader = new UnionRecordReader<StringRecord>(recordReaders, StringRecord.class);
+ MutableRecordReader<Record>[] recordReaders = (MutableRecordReader<Record>[]) new
+ MutableRecordReader<?>[2];
+ recordReaders[0] = new MutableRecordReader<Record>(this);
+ recordReaders[1] = new MutableRecordReader<Record>(this);
+ this.unionReader = new UnionRecordReader<Record>(recordReaders, Record.class);
- this.writer = new RecordWriter<StringRecord>(this);
+ this.writer = new RecordWriter<Record>(this);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ea79186b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/scheduler/queue/DefaultSchedulerTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/scheduler/queue/DefaultSchedulerTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/scheduler/queue/DefaultSchedulerTest.java
index c8bcddc..6a41fe9 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/scheduler/queue/DefaultSchedulerTest.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/scheduler/queue/DefaultSchedulerTest.java
@@ -95,6 +95,47 @@ public class DefaultSchedulerTest {
}
+ public static final class DummyInputFormat extends GenericInputFormat<IntValue> {
+
+ @Override
+ public boolean reachedEnd() throws IOException {
+ return true;
+ }
+
+ @Override
+ public IntValue nextRecord(IntValue reuse) throws IOException {
+ return null;
+ }
+ }
+
+ public static final class DummyOutputFormat implements OutputFormat<IntValue> {
+
+ @Override
+ public void configure(Configuration parameters) {
+
+ }
+
+ @Override
+ public void open(int taskNumber, int numTasks) throws IOException {
+
+ }
+
+ @Override
+ public void writeRecord(IntValue record) throws IOException {
+
+ }
+
+ @Override
+ public void close() throws IOException {
+
+ }
+
+ @Override
+ public void initialize(Configuration configuration) {
+
+ }
+ }
+
/**
* Constructs a sample execution graph consisting of two vertices connected by a channel of the given type.
*
@@ -108,10 +149,12 @@ public class DefaultSchedulerTest {
final JobInputVertex inputVertex = new JobInputVertex("Input 1", jobGraph);
inputVertex.setInputClass(InputTask.class);
+ inputVertex.setInputFormat(new DummyInputFormat());
inputVertex.setNumberOfSubtasks(1);
final JobOutputVertex outputVertex = new JobOutputVertex("Output 1", jobGraph);
outputVertex.setOutputClass(OutputTask.class);
+ outputVertex.setOutputFormat(new DummyOutputFormat());
outputVertex.setNumberOfSubtasks(1);
try {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ea79186b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/FileLineReader.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/FileLineReader.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/FileLineReader.java
deleted file mode 100644
index fcb4fa1..0000000
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/FileLineReader.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.util;
-
-import java.util.Iterator;
-
-import eu.stratosphere.core.fs.FSDataInputStream;
-import eu.stratosphere.core.fs.FileInputSplit;
-import eu.stratosphere.core.fs.FileSystem;
-import eu.stratosphere.core.io.StringRecord;
-import eu.stratosphere.runtime.io.api.RecordWriter;
-import eu.stratosphere.nephele.template.AbstractFileInputTask;
-import eu.stratosphere.runtime.fs.LineReader;
-
-/**
- * A file line reader reads the associated file input splits line by line and outputs the lines as string records.
- *
- */
-public class FileLineReader extends AbstractFileInputTask {
-
- private RecordWriter<StringRecord> output = null;
-
- @Override
- public void invoke() throws Exception {
-
- output.initializeSerializers();
-
- final Iterator<FileInputSplit> splitIterator = getFileInputSplits();
-
- while (splitIterator.hasNext()) {
-
- final FileInputSplit split = splitIterator.next();
-
- long start = split.getStart();
- long length = split.getLength();
-
- final FileSystem fs = FileSystem.get(split.getPath().toUri());
-
- final FSDataInputStream fdis = fs.open(split.getPath());
-
- final LineReader lineReader = new LineReader(fdis, start, length, (1024 * 1024));
-
- byte[] line = lineReader.readLine();
-
- while (line != null) {
-
- // Create a string object from the data read
- StringRecord str = new StringRecord();
- str.set(line);
-
- // Send out string
- output.emit(str);
-
- line = lineReader.readLine();
- }
-
- // Close the stream;
- lineReader.close();
- }
-
- this.output.flush();
- }
-
- @Override
- public void registerInputOutput() {
- output = new RecordWriter<StringRecord>(this);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ea79186b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/FileLineWriter.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/FileLineWriter.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/FileLineWriter.java
deleted file mode 100644
index bc738df..0000000
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/FileLineWriter.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.util;
-
-import eu.stratosphere.core.fs.FSDataOutputStream;
-import eu.stratosphere.core.fs.FileStatus;
-import eu.stratosphere.core.fs.FileSystem;
-import eu.stratosphere.core.fs.Path;
-import eu.stratosphere.core.io.StringRecord;
-import eu.stratosphere.runtime.io.api.RecordReader;
-import eu.stratosphere.nephele.template.AbstractFileOutputTask;
-
-/**
- * A file line writer reads string records its input gate and writes them to the associated output file.
- *
- */
-public class FileLineWriter extends AbstractFileOutputTask {
-
- /**
- * The record reader through which incoming string records are received.
- */
- private RecordReader<StringRecord> input = null;
-
-
- @Override
- public void invoke() throws Exception {
-
- Path outputPath = getFileOutputPath();
-
- FileSystem fs = FileSystem.get(outputPath.toUri());
- if (fs.exists(outputPath)) {
- FileStatus status = fs.getFileStatus(outputPath);
-
- if (status.isDir()) {
- outputPath = new Path(outputPath.toUri().toString() + "/file_" + getIndexInSubtaskGroup() + ".txt");
- }
- }
-
- final FSDataOutputStream outputStream = fs.create(outputPath, true);
-
- while (this.input.hasNext()) {
-
- StringRecord record = this.input.next();
- byte[] recordByte = (record.toString() + "\r\n").getBytes();
- outputStream.write(recordByte, 0, recordByte.length);
- }
-
- outputStream.close();
-
- }
-
-
- @Override
- public void registerInputOutput() {
- this.input = new RecordReader<StringRecord>(this, StringRecord.class);
- }
-
-
- @Override
- public int getMaximumNumberOfSubtasks() {
- // The default implementation always returns -1
- return -1;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ea79186b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/library/FileLineReadWriteTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/library/FileLineReadWriteTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/library/FileLineReadWriteTest.java
deleted file mode 100644
index 17c2f58..0000000
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/library/FileLineReadWriteTest.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.runtime.io.library;
-
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-import static org.powermock.api.mockito.PowerMockito.whenNew;
-
-import java.io.File;
-import java.io.IOException;
-
-import eu.stratosphere.runtime.io.api.RecordWriter;
-import eu.stratosphere.nephele.util.FileLineReader;
-import eu.stratosphere.nephele.util.FileLineWriter;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-import org.powermock.reflect.Whitebox;
-
-import eu.stratosphere.configuration.Configuration;
-import eu.stratosphere.core.fs.FileInputSplit;
-import eu.stratosphere.core.fs.Path;
-import eu.stratosphere.core.io.StringRecord;
-import eu.stratosphere.nephele.execution.Environment;
-import eu.stratosphere.runtime.io.api.RecordReader;
-import eu.stratosphere.nephele.template.InputSplitProvider;
-
-/**
- * This class checks the functionality of the {@link eu.stratosphere.nephele.util.FileLineReader} and the {@link eu.stratosphere.nephele.util.FileLineWriter} class.
- *
- */
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(FileLineReader.class)
-public class FileLineReadWriteTest {
-
- @Mock
- private Environment environment;
-
- @Mock
- private Configuration conf;
-
- @Mock
- private RecordReader<StringRecord> recordReader;
-
- @Mock
- private RecordWriter<StringRecord> recordWriter;
-
- @Mock
- private InputSplitProvider inputSplitProvider;
-
- private File file = new File("./tmp");
-
- /**
- * Set up mocks
- *
- * @throws IOException
- */
- @Before
- public void before() throws Exception {
-
- MockitoAnnotations.initMocks(this);
- }
-
- /**
- * remove the temporary file
- */
- @After
- public void after() {
- this.file.delete();
- }
-
- /**
- * Tests the read and write methods
- *
- * @throws Exception
- */
- @Test
- public void testReadWrite() throws Exception {
-
- this.file.createNewFile();
- FileLineWriter writer = new FileLineWriter();
- Whitebox.setInternalState(writer, "environment", this.environment);
- Whitebox.setInternalState(writer, "input", this.recordReader);
- when(this.environment.getTaskConfiguration()).thenReturn(this.conf);
-
- when(this.conf.getString("outputPath", null)).thenReturn(this.file.toURI().toString());
- when(this.recordReader.hasNext()).thenReturn(true, true, true, false);
- StringRecord in = new StringRecord("abc");
- try {
- when(this.recordReader.next()).thenReturn(in);
- } catch (IOException e) {
- fail();
- e.printStackTrace();
- } catch (InterruptedException e) {
- fail();
- e.printStackTrace();
- }
- writer.invoke();
-
- final FileInputSplit split = new FileInputSplit(0, new Path(this.file.toURI().toString()), 0,
- this.file.length(), null);
- when(this.environment.getInputSplitProvider()).thenReturn(this.inputSplitProvider);
- when(this.inputSplitProvider.getNextInputSplit()).thenReturn(split, (FileInputSplit) null);
-
- FileLineReader reader = new FileLineReader();
- Whitebox.setInternalState(reader, "environment", this.environment);
- Whitebox.setInternalState(reader, "output", this.recordWriter);
- StringRecord record = mock(StringRecord.class);
-
- whenNew(StringRecord.class).withNoArguments().thenReturn(record);
-
- reader.invoke();
-
- // verify the correct bytes have been written and read
- verify(record, times(3)).set(in.getBytes());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ea79186b/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobs/util/DiscardingOutputFormat.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobs/util/DiscardingOutputFormat.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobs/util/DiscardingOutputFormat.java
index 57253d1..3de547e 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobs/util/DiscardingOutputFormat.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobs/util/DiscardingOutputFormat.java
@@ -44,4 +44,7 @@ public class DiscardingOutputFormat implements OutputFormat<Record> {
@Override
public void close() throws IOException
{}
+
+ @Override
+ public void initialize(Configuration configuration){}
}