You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2014/06/26 11:46:44 UTC
[19/53] [abbrv] Merge fix to omit input/output registering on
JobManager Rework Invokable Task Hierarchy
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8c1d82a8/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/tasks/DoubleSourceTask.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/tasks/DoubleSourceTask.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/tasks/DoubleSourceTask.java
new file mode 100644
index 0000000..aa46af8
--- /dev/null
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/tasks/DoubleSourceTask.java
@@ -0,0 +1,134 @@
+/***********************************************************************************************************************
+ *
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ *
+ **********************************************************************************************************************/
+
+package eu.stratosphere.nephele.util.tasks;
+
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+import eu.stratosphere.core.fs.FSDataInputStream;
+import eu.stratosphere.core.fs.FileInputSplit;
+import eu.stratosphere.core.fs.FileSystem;
+import eu.stratosphere.core.io.StringRecord;
+import eu.stratosphere.nephele.template.AbstractInvokable;
+import eu.stratosphere.nephele.template.InputSplitProvider;
+import eu.stratosphere.runtime.io.api.RecordWriter;
+import eu.stratosphere.runtime.fs.LineReader;
+
+public class DoubleSourceTask extends AbstractInvokable {
+
+ private RecordWriter<StringRecord> output1 = null;
+
+ private RecordWriter<StringRecord> output2 = null;
+
+ @Override
+ public void invoke() throws Exception {
+ this.output1.initializeSerializers();
+ this.output2.initializeSerializers();
+
+ final Iterator<FileInputSplit> splitIterator = getInputSplits();
+
+ while (splitIterator.hasNext()) {
+
+ final FileInputSplit split = splitIterator.next();
+
+ final long start = split.getStart();
+ final long length = split.getLength();
+
+ final FileSystem fs = FileSystem.get(split.getPath().toUri());
+
+ final FSDataInputStream fdis = fs.open(split.getPath());
+
+ final LineReader lineReader = new LineReader(fdis, start, length, (1024 * 1024));
+
+ byte[] line = lineReader.readLine();
+
+ while (line != null) {
+
+ // Create a string object from the data read
+ StringRecord str = new StringRecord();
+ str.set(line);
+
+ // Send out string
+ output1.emit(str);
+ output2.emit(str);
+
+ line = lineReader.readLine();
+ }
+
+ // Close the stream;
+ lineReader.close();
+ }
+
+ this.output1.flush();
+ this.output2.flush();
+ }
+
+ @Override
+ public void registerInputOutput() {
+ this.output1 = new RecordWriter<StringRecord>(this);
+ this.output2 = new RecordWriter<StringRecord>(this);
+ }
+
+ private Iterator<FileInputSplit> getInputSplits() {
+
+ final InputSplitProvider provider = getEnvironment().getInputSplitProvider();
+
+ return new Iterator<FileInputSplit>() {
+
+ private FileInputSplit nextSplit;
+
+ private boolean exhausted;
+
+ @Override
+ public boolean hasNext() {
+ if (exhausted) {
+ return false;
+ }
+
+ if (nextSplit != null) {
+ return true;
+ }
+
+ FileInputSplit split = (FileInputSplit) provider.getNextInputSplit();
+
+ if (split != null) {
+ this.nextSplit = split;
+ return true;
+ }
+ else {
+ exhausted = true;
+ return false;
+ }
+ }
+
+ @Override
+ public FileInputSplit next() {
+ if (this.nextSplit == null && !hasNext()) {
+ throw new NoSuchElementException();
+ }
+
+ final FileInputSplit tmp = this.nextSplit;
+ this.nextSplit = null;
+ return tmp;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8c1d82a8/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/tasks/FileLineReader.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/tasks/FileLineReader.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/tasks/FileLineReader.java
new file mode 100644
index 0000000..c62911a
--- /dev/null
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/tasks/FileLineReader.java
@@ -0,0 +1,133 @@
+/***********************************************************************************************************************
+ *
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ *
+ **********************************************************************************************************************/
+
+package eu.stratosphere.nephele.util.tasks;
+
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+import eu.stratosphere.core.fs.FSDataInputStream;
+import eu.stratosphere.core.fs.FileInputSplit;
+import eu.stratosphere.core.fs.FileSystem;
+import eu.stratosphere.core.io.StringRecord;
+import eu.stratosphere.runtime.io.api.RecordWriter;
+import eu.stratosphere.nephele.template.AbstractInvokable;
+import eu.stratosphere.nephele.template.InputSplitProvider;
+import eu.stratosphere.runtime.fs.LineReader;
+
+/**
+ * A file line reader reads the associated file input splits line by line and outputs the lines as string records.
+ *
+ */
+public class FileLineReader extends AbstractInvokable {
+
+ private RecordWriter<StringRecord> output = null;
+
+ @Override
+ public void invoke() throws Exception {
+
+ output.initializeSerializers();
+
+ final Iterator<FileInputSplit> splitIterator = getInputSplits();
+
+ while (splitIterator.hasNext()) {
+
+ final FileInputSplit split = splitIterator.next();
+
+ long start = split.getStart();
+ long length = split.getLength();
+
+ final FileSystem fs = FileSystem.get(split.getPath().toUri());
+
+ final FSDataInputStream fdis = fs.open(split.getPath());
+
+ final LineReader lineReader = new LineReader(fdis, start, length, (1024 * 1024));
+
+ byte[] line = lineReader.readLine();
+
+ while (line != null) {
+
+ // Create a string object from the data read
+ StringRecord str = new StringRecord();
+ str.set(line);
+
+ // Send out string
+ output.emit(str);
+
+ line = lineReader.readLine();
+ }
+
+ // Close the stream;
+ lineReader.close();
+ }
+
+ this.output.flush();
+ }
+
+ @Override
+ public void registerInputOutput() {
+ output = new RecordWriter<StringRecord>(this);
+ }
+
+ private Iterator<FileInputSplit> getInputSplits() {
+
+ final InputSplitProvider provider = getEnvironment().getInputSplitProvider();
+
+ return new Iterator<FileInputSplit>() {
+
+ private FileInputSplit nextSplit;
+
+ private boolean exhausted;
+
+ @Override
+ public boolean hasNext() {
+ if (exhausted) {
+ return false;
+ }
+
+ if (nextSplit != null) {
+ return true;
+ }
+
+ FileInputSplit split = (FileInputSplit) provider.getNextInputSplit();
+
+ if (split != null) {
+ this.nextSplit = split;
+ return true;
+ }
+ else {
+ exhausted = true;
+ return false;
+ }
+ }
+
+ @Override
+ public FileInputSplit next() {
+ if (this.nextSplit == null && !hasNext()) {
+ throw new NoSuchElementException();
+ }
+
+ final FileInputSplit tmp = this.nextSplit;
+ this.nextSplit = null;
+ return tmp;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8c1d82a8/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/tasks/FileLineWriter.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/tasks/FileLineWriter.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/tasks/FileLineWriter.java
new file mode 100644
index 0000000..5f6e2b2
--- /dev/null
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/tasks/FileLineWriter.java
@@ -0,0 +1,72 @@
+/***********************************************************************************************************************
+ *
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ *
+ **********************************************************************************************************************/
+
+package eu.stratosphere.nephele.util.tasks;
+
+import eu.stratosphere.configuration.Configuration;
+import eu.stratosphere.core.fs.FSDataOutputStream;
+import eu.stratosphere.core.fs.FileStatus;
+import eu.stratosphere.core.fs.FileSystem;
+import eu.stratosphere.core.fs.Path;
+import eu.stratosphere.core.io.StringRecord;
+import eu.stratosphere.nephele.template.AbstractInvokable;
+import eu.stratosphere.runtime.io.api.RecordReader;
+
+/**
+ * A file line writer reads string records its input gate and writes them to the associated output file.
+ *
+ */
+public class FileLineWriter extends AbstractInvokable {
+ /**
+ * The record reader through which incoming string records are received.
+ */
+ private RecordReader<StringRecord> input = null;
+
+
+ @Override
+ public void invoke() throws Exception {
+
+ final Configuration conf = getEnvironment().getTaskConfiguration();
+ final String outputPathString = conf.getString(JobFileOutputVertex.PATH_PROPERTY, null);
+
+ Path outputPath = new Path(outputPathString);
+
+ FileSystem fs = FileSystem.get(outputPath.toUri());
+ if (fs.exists(outputPath)) {
+ FileStatus status = fs.getFileStatus(outputPath);
+
+ if (status.isDir()) {
+ outputPath = new Path(outputPath.toUri().toString() + "/file_" + getIndexInSubtaskGroup() + ".txt");
+ }
+ }
+
+ final FSDataOutputStream outputStream = fs.create(outputPath, true);
+
+ while (this.input.hasNext()) {
+
+ StringRecord record = this.input.next();
+ byte[] recordByte = (record.toString() + "\r\n").getBytes();
+ outputStream.write(recordByte, 0, recordByte.length);
+ }
+
+ outputStream.close();
+
+ }
+
+ @Override
+ public void registerInputOutput() {
+ this.input = new RecordReader<StringRecord>(this, StringRecord.class);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8c1d82a8/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/tasks/JobFileInputVertex.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/tasks/JobFileInputVertex.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/tasks/JobFileInputVertex.java
new file mode 100644
index 0000000..fb0da91
--- /dev/null
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/tasks/JobFileInputVertex.java
@@ -0,0 +1,255 @@
+/***********************************************************************************************************************
+ *
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ *
+ **********************************************************************************************************************/
+
+package eu.stratosphere.nephele.util.tasks;
+
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import eu.stratosphere.core.fs.BlockLocation;
+import eu.stratosphere.core.fs.FileInputSplit;
+import eu.stratosphere.core.fs.FileStatus;
+import eu.stratosphere.core.fs.FileSystem;
+import eu.stratosphere.core.fs.Path;
+import eu.stratosphere.core.io.InputSplit;
+import eu.stratosphere.nephele.jobgraph.AbstractJobInputVertex;
+import eu.stratosphere.nephele.jobgraph.JobGraph;
+import eu.stratosphere.nephele.jobgraph.JobVertexID;
+
+
+public final class JobFileInputVertex extends AbstractJobInputVertex {
+
+ /**
+ * The fraction that the last split may be larger than the others.
+ */
+ private static final float MAX_SPLIT_SIZE_DISCREPANCY = 1.1f;
+
+ /**
+ * The path pointing to the input file/directory.
+ */
+ private Path path;
+
+
+ public JobFileInputVertex(String name, JobVertexID id, JobGraph jobGraph) {
+ super(name, id, jobGraph);
+ }
+
+ /**
+ * Creates a new job file input vertex with the specified name.
+ *
+ * @param name
+ * the name of the new job file input vertex
+ * @param jobGraph
+ * the job graph this vertex belongs to
+ */
+ public JobFileInputVertex(String name, JobGraph jobGraph) {
+ this(name, null, jobGraph);
+ }
+
+ /**
+ * Creates a new job file input vertex.
+ *
+ * @param jobGraph
+ * the job graph this vertex belongs to
+ */
+ public JobFileInputVertex(JobGraph jobGraph) {
+ this(null, jobGraph);
+ }
+
+ /**
+ * Sets the path of the file the job file input vertex's task should read from.
+ *
+ * @param path
+ * the path of the file the job file input vertex's task should read from
+ */
+ public void setFilePath(final Path path) {
+ this.path = path;
+ }
+
+ /**
+ * Returns the path of the file the job file input vertex's task should read from.
+ *
+ * @return the path of the file the job file input vertex's task should read from or <code>null</code> if no path
+ * has yet been set
+ */
+ public Path getFilePath() {
+ return this.path;
+ }
+
+ @Override
+ public void read(final DataInput in) throws IOException {
+ super.read(in);
+
+ // Read path of the input file
+ final boolean isNotNull = in.readBoolean();
+ if (isNotNull) {
+ this.path = new Path();
+ this.path.read(in);
+ }
+ }
+
+ @Override
+ public void write(final DataOutput out) throws IOException {
+ super.write(out);
+
+ // Write out the path of the input file
+ if (this.path == null) {
+ out.writeBoolean(false);
+ } else {
+ out.writeBoolean(true);
+ this.path.write(out);
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+
+ @Override
+ public InputSplit[] getInputSplits(int minNumSplits) throws Exception {
+ final Path path = this.path;
+ final List<FileInputSplit> inputSplits = new ArrayList<FileInputSplit>();
+
+ // get all the files that are involved in the splits
+ final List<FileStatus> files = new ArrayList<FileStatus>();
+ long totalLength = 0;
+
+ final FileSystem fs = path.getFileSystem();
+ final FileStatus pathFile = fs.getFileStatus(path);
+
+ if (pathFile.isDir()) {
+ // input is directory. list all contained files
+ final FileStatus[] dir = fs.listStatus(path);
+ for (int i = 0; i < dir.length; i++) {
+ if (!dir[i].isDir()) {
+ files.add(dir[i]);
+ totalLength += dir[i].getLen();
+ }
+ }
+
+ } else {
+ files.add(pathFile);
+ totalLength += pathFile.getLen();
+ }
+
+ final long minSplitSize = 1;
+ final long maxSplitSize = (minNumSplits < 1) ? Long.MAX_VALUE : (totalLength / minNumSplits +
+ (totalLength % minNumSplits == 0 ? 0 : 1));
+
+ // now that we have the files, generate the splits
+ int splitNum = 0;
+ for (final FileStatus file : files) {
+
+ final long len = file.getLen();
+ final long blockSize = file.getBlockSize();
+
+ final long splitSize = Math.max(minSplitSize, Math.min(maxSplitSize, blockSize));
+ final long halfSplit = splitSize >>> 1;
+
+ final long maxBytesForLastSplit = (long) (splitSize * MAX_SPLIT_SIZE_DISCREPANCY);
+
+ if (len > 0) {
+
+ // get the block locations and make sure they are in order with respect to their offset
+ final BlockLocation[] blocks = fs.getFileBlockLocations(file, 0, len);
+ Arrays.sort(blocks);
+
+ long bytesUnassigned = len;
+ long position = 0;
+
+ int blockIndex = 0;
+
+ while (bytesUnassigned > maxBytesForLastSplit) {
+ // get the block containing the majority of the data
+ blockIndex = getBlockIndexForPosition(blocks, position, halfSplit, blockIndex);
+ // create a new split
+ final FileInputSplit fis = new FileInputSplit(splitNum++, file.getPath(), position, splitSize,
+ blocks[blockIndex]
+ .getHosts());
+ inputSplits.add(fis);
+
+ // adjust the positions
+ position += splitSize;
+ bytesUnassigned -= splitSize;
+ }
+
+ // assign the last split
+ if (bytesUnassigned > 0) {
+ blockIndex = getBlockIndexForPosition(blocks, position, halfSplit, blockIndex);
+ final FileInputSplit fis = new FileInputSplit(splitNum++, file.getPath(), position,
+ bytesUnassigned,
+ blocks[blockIndex].getHosts());
+ inputSplits.add(fis);
+ }
+ } else {
+ // special case with a file of zero bytes size
+ final BlockLocation[] blocks = fs.getFileBlockLocations(file, 0, 0);
+ String[] hosts;
+ if (blocks.length > 0) {
+ hosts = blocks[0].getHosts();
+ } else {
+ hosts = new String[0];
+ }
+ final FileInputSplit fis = new FileInputSplit(splitNum++, file.getPath(), 0, 0, hosts);
+ inputSplits.add(fis);
+ }
+ }
+
+ return inputSplits.toArray(new FileInputSplit[inputSplits.size()]);
+ }
+
+ /**
+ * Retrieves the index of the <tt>BlockLocation</tt> that contains the part of the file described by the given
+ * offset.
+ *
+ * @param blocks
+ * The different blocks of the file. Must be ordered by their offset.
+ * @param offset
+ * The offset of the position in the file.
+ * @param startIndex
+ * The earliest index to look at.
+ * @return The index of the block containing the given position.
+ */
+ private final int getBlockIndexForPosition(final BlockLocation[] blocks, final long offset,
+ final long halfSplitSize, final int startIndex) {
+
+ // go over all indexes after the startIndex
+ for (int i = startIndex; i < blocks.length; i++) {
+ long blockStart = blocks[i].getOffset();
+ long blockEnd = blockStart + blocks[i].getLength();
+
+ if (offset >= blockStart && offset < blockEnd) {
+ // got the block where the split starts
+ // check if the next block contains more than this one does
+ if (i < blocks.length - 1 && blockEnd - offset < halfSplitSize) {
+ return i + 1;
+ } else {
+ return i;
+ }
+ }
+ }
+ throw new IllegalArgumentException("The given offset is not contained in the any block.");
+ }
+
+
+ @Override
+ public Class<FileInputSplit> getInputSplitType() {
+ return FileInputSplit.class;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8c1d82a8/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/tasks/JobFileOutputVertex.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/tasks/JobFileOutputVertex.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/tasks/JobFileOutputVertex.java
new file mode 100644
index 0000000..593b520
--- /dev/null
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/tasks/JobFileOutputVertex.java
@@ -0,0 +1,109 @@
+/***********************************************************************************************************************
+ *
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ *
+ **********************************************************************************************************************/
+
+package eu.stratosphere.nephele.util.tasks;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import eu.stratosphere.core.fs.Path;
+import eu.stratosphere.nephele.jobgraph.AbstractJobOutputVertex;
+import eu.stratosphere.nephele.jobgraph.JobGraph;
+import eu.stratosphere.nephele.jobgraph.JobVertexID;
+
+
+public class JobFileOutputVertex extends AbstractJobOutputVertex {
+
+ public static final String PATH_PROPERTY = "outputPath";
+
+ /**
+ * The path pointing to the output file/directory.
+ */
+ private Path path;
+
+
+ public JobFileOutputVertex(String name, JobVertexID id, JobGraph jobGraph) {
+ super(name, id, jobGraph);
+ }
+
+ /**
+ * Creates a new job file output vertex with the specified name.
+ *
+ * @param name
+ * the name of the new job file output vertex
+ * @param jobGraph
+ * the job graph this vertex belongs to
+ */
+ public JobFileOutputVertex(String name, JobGraph jobGraph) {
+ this(name, null, jobGraph);
+ }
+
+ /**
+ * Creates a new job file input vertex.
+ *
+ * @param jobGraph
+ * the job graph this vertex belongs to
+ */
+ public JobFileOutputVertex(JobGraph jobGraph) {
+ this(null, jobGraph);
+ }
+
+ /**
+ * Sets the path of the file the job file input vertex's task should write to.
+ *
+ * @param path
+ * the path of the file the job file input vertex's task should write to
+ */
+ public void setFilePath(Path path) {
+ this.path = path;
+ getConfiguration().setString(PATH_PROPERTY, path.toString());
+ }
+
+ /**
+ * Returns the path of the file the job file output vertex's task should write to.
+ *
+ * @return the path of the file the job file output vertex's task should write to or <code>null</code> if no path
+ * has yet been set
+ */
+ public Path getFilePath() {
+ return this.path;
+ }
+
+ @Override
+ public void read(final DataInput in) throws IOException {
+ super.read(in);
+
+ // Read path of the input file
+ boolean isNotNull = in.readBoolean();
+ if (isNotNull) {
+ this.path = new Path();
+ this.path.read(in);
+ }
+ }
+
+ @Override
+ public void write(final DataOutput out) throws IOException {
+ super.write(out);
+
+ // Write out the path of the input file
+ if (this.path == null) {
+ out.writeBoolean(false);
+ } else {
+ out.writeBoolean(true);
+ this.path.write(out);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8c1d82a8/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/hash/HashMatchIteratorITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/hash/HashMatchIteratorITCase.java b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/hash/HashMatchIteratorITCase.java
index a28ba38..e59f4a6 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/hash/HashMatchIteratorITCase.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/hash/HashMatchIteratorITCase.java
@@ -34,7 +34,7 @@ import eu.stratosphere.api.java.record.functions.JoinFunction;
import eu.stratosphere.nephele.services.iomanager.IOManager;
import eu.stratosphere.nephele.services.memorymanager.MemoryManager;
import eu.stratosphere.nephele.services.memorymanager.spi.DefaultMemoryManager;
-import eu.stratosphere.nephele.template.AbstractTask;
+import eu.stratosphere.nephele.template.AbstractInvokable;
import eu.stratosphere.api.java.typeutils.runtime.record.RecordComparator;
import eu.stratosphere.api.java.typeutils.runtime.record.RecordPairComparator;
import eu.stratosphere.api.java.typeutils.runtime.record.RecordSerializer;
@@ -67,7 +67,7 @@ public class HashMatchIteratorITCase {
private static final long SEED1 = 561349061987311L;
private static final long SEED2 = 231434613412342L;
- private final AbstractTask parentTask = new DummyInvokable();
+ private final AbstractInvokable parentTask = new DummyInvokable();
private IOManager ioManager;
private MemoryManager memoryManager;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8c1d82a8/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/hash/ReOpenableHashTableITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/hash/ReOpenableHashTableITCase.java b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/hash/ReOpenableHashTableITCase.java
index d9c8b08..755d08a 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/hash/ReOpenableHashTableITCase.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/hash/ReOpenableHashTableITCase.java
@@ -38,7 +38,6 @@ import eu.stratosphere.nephele.services.memorymanager.MemoryAllocationException;
import eu.stratosphere.nephele.services.memorymanager.MemoryManager;
import eu.stratosphere.nephele.services.memorymanager.spi.DefaultMemoryManager;
import eu.stratosphere.nephele.template.AbstractInvokable;
-import eu.stratosphere.nephele.template.AbstractTask;
import eu.stratosphere.pact.runtime.hash.HashMatchIteratorITCase.RecordMatch;
import eu.stratosphere.pact.runtime.hash.HashMatchIteratorITCase.RecordMatchRemovingJoin;
import eu.stratosphere.pact.runtime.hash.HashTableITCase.ConstantsKeyValuePairsIterator;
@@ -75,7 +74,7 @@ public class ReOpenableHashTableITCase {
private static final int NUM_PROBES = 3; // number of reopenings of hash join
- private final AbstractTask parentTask = new DummyInvokable();
+ private final AbstractInvokable parentTask = new DummyInvokable();
private IOManager ioManager;
private MemoryManager memoryManager;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8c1d82a8/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/io/ChannelViewsTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/io/ChannelViewsTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/io/ChannelViewsTest.java
index fbe4f5b..c2be01a 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/io/ChannelViewsTest.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/io/ChannelViewsTest.java
@@ -31,7 +31,7 @@ import eu.stratosphere.nephele.services.iomanager.ChannelWriterOutputView;
import eu.stratosphere.nephele.services.iomanager.IOManager;
import eu.stratosphere.nephele.services.memorymanager.MemoryManager;
import eu.stratosphere.nephele.services.memorymanager.spi.DefaultMemoryManager;
-import eu.stratosphere.nephele.template.AbstractTask;
+import eu.stratosphere.nephele.template.AbstractInvokable;
import eu.stratosphere.pact.runtime.test.util.DummyInvokable;
import eu.stratosphere.pact.runtime.test.util.TestData;
import eu.stratosphere.pact.runtime.test.util.TestData.Generator.KeyMode;
@@ -63,7 +63,7 @@ public class ChannelViewsTest
private static final int NUM_MEMORY_SEGMENTS = 3;
- private final AbstractTask parentTask = new DummyInvokable();
+ private final AbstractInvokable parentTask = new DummyInvokable();
private IOManager ioManager;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8c1d82a8/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/io/SpillingBufferTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/io/SpillingBufferTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/io/SpillingBufferTest.java
index 1809540..c960280 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/io/SpillingBufferTest.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/io/SpillingBufferTest.java
@@ -28,7 +28,7 @@ import eu.stratosphere.nephele.services.iomanager.IOManager;
import eu.stratosphere.nephele.services.memorymanager.ListMemorySegmentSource;
import eu.stratosphere.nephele.services.memorymanager.MemoryManager;
import eu.stratosphere.nephele.services.memorymanager.spi.DefaultMemoryManager;
-import eu.stratosphere.nephele.template.AbstractTask;
+import eu.stratosphere.nephele.template.AbstractInvokable;
import eu.stratosphere.pact.runtime.test.util.DummyInvokable;
import eu.stratosphere.pact.runtime.test.util.TestData;
import eu.stratosphere.pact.runtime.test.util.TestData.Generator.KeyMode;
@@ -54,7 +54,7 @@ public class SpillingBufferTest {
private static final int NUM_MEMORY_SEGMENTS = 23;
- private final AbstractTask parentTask = new DummyInvokable();
+ private final AbstractInvokable parentTask = new DummyInvokable();
private IOManager ioManager;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8c1d82a8/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/sort/AsynchonousPartialSorterITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/sort/AsynchonousPartialSorterITCase.java b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/sort/AsynchonousPartialSorterITCase.java
index 26ce081..f191075 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/sort/AsynchonousPartialSorterITCase.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/sort/AsynchonousPartialSorterITCase.java
@@ -30,7 +30,6 @@ import eu.stratosphere.nephele.services.memorymanager.MemoryAllocationException;
import eu.stratosphere.nephele.services.memorymanager.MemoryManager;
import eu.stratosphere.nephele.services.memorymanager.spi.DefaultMemoryManager;
import eu.stratosphere.nephele.template.AbstractInvokable;
-import eu.stratosphere.nephele.template.AbstractTask;
import eu.stratosphere.api.java.typeutils.runtime.record.RecordComparator;
import eu.stratosphere.api.java.typeutils.runtime.record.RecordSerializerFactory;
import eu.stratosphere.pact.runtime.test.util.DummyInvokable;
@@ -41,10 +40,9 @@ import eu.stratosphere.pact.runtime.test.util.TestData.Value;
import eu.stratosphere.types.Record;
import eu.stratosphere.util.MutableObjectIterator;
-/**
- */
-public class AsynchonousPartialSorterITCase
-{
+
+public class AsynchonousPartialSorterITCase {
+
private static final Log LOG = LogFactory.getLog(AsynchonousPartialSorterITCase.class);
private static final long SEED = 649180756312423613L;
@@ -57,7 +55,7 @@ public class AsynchonousPartialSorterITCase
public static final int MEMORY_SIZE = 1024 * 1024 * 32;
- private final AbstractTask parentTask = new DummyInvokable();
+ private final AbstractInvokable parentTask = new DummyInvokable();
private IOManager ioManager;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8c1d82a8/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/sort/CombiningUnilateralSortMergerITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/sort/CombiningUnilateralSortMergerITCase.java b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/sort/CombiningUnilateralSortMergerITCase.java
index 1851480..b873f96 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/sort/CombiningUnilateralSortMergerITCase.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/sort/CombiningUnilateralSortMergerITCase.java
@@ -36,7 +36,7 @@ import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.services.iomanager.IOManager;
import eu.stratosphere.nephele.services.memorymanager.MemoryManager;
import eu.stratosphere.nephele.services.memorymanager.spi.DefaultMemoryManager;
-import eu.stratosphere.nephele.template.AbstractTask;
+import eu.stratosphere.nephele.template.AbstractInvokable;
import eu.stratosphere.api.java.typeutils.runtime.record.RecordComparator;
import eu.stratosphere.api.java.typeutils.runtime.record.RecordSerializerFactory;
import eu.stratosphere.pact.runtime.test.util.DummyInvokable;
@@ -66,7 +66,7 @@ public class CombiningUnilateralSortMergerITCase {
public static final int MEMORY_SIZE = 1024 * 1024 * 256;
- private final AbstractTask parentTask = new DummyInvokable();
+ private final AbstractInvokable parentTask = new DummyInvokable();
private IOManager ioManager;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8c1d82a8/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/sort/ExternalSortITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/sort/ExternalSortITCase.java b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/sort/ExternalSortITCase.java
index 7ba42b9..cdb8421 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/sort/ExternalSortITCase.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/sort/ExternalSortITCase.java
@@ -28,7 +28,7 @@ import eu.stratosphere.api.common.typeutils.TypeSerializerFactory;
import eu.stratosphere.nephele.services.iomanager.IOManager;
import eu.stratosphere.nephele.services.memorymanager.MemoryManager;
import eu.stratosphere.nephele.services.memorymanager.spi.DefaultMemoryManager;
-import eu.stratosphere.nephele.template.AbstractTask;
+import eu.stratosphere.nephele.template.AbstractInvokable;
import eu.stratosphere.api.java.typeutils.runtime.record.RecordComparator;
import eu.stratosphere.api.java.typeutils.runtime.record.RecordSerializerFactory;
import eu.stratosphere.pact.runtime.test.util.DummyInvokable;
@@ -61,7 +61,7 @@ public class ExternalSortITCase {
private static final int MEMORY_SIZE = 1024 * 1024 * 78;
- private final AbstractTask parentTask = new DummyInvokable();
+ private final AbstractInvokable parentTask = new DummyInvokable();
private IOManager ioManager;
@@ -238,7 +238,7 @@ public class ExternalSortITCase {
merger.close();
}
- @Test
+// @Test
public void testSpillingSortWithIntermediateMerge() throws Exception {
// amount of pairs
final int PAIRS = 10000000;
@@ -292,7 +292,7 @@ public class ExternalSortITCase {
merger.close();
}
- @Test
+// @Test
public void testSpillingSortWithIntermediateMergeIntPair() throws Exception {
// amount of pairs
final int PAIRS = 50000000;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8c1d82a8/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/sort/MassiveStringSortingITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/sort/MassiveStringSortingITCase.java b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/sort/MassiveStringSortingITCase.java
index f76b802..d9877f4 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/sort/MassiveStringSortingITCase.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/sort/MassiveStringSortingITCase.java
@@ -90,7 +90,7 @@ public class MassiveStringSortingITCase {
MutableObjectIterator<String> inputIterator = new StringReaderMutableObjectIterator(reader);
sorter = new UnilateralSortMerger<String>(mm, ioMan, inputIterator, new DummyInvokable(),
- new RuntimeStatelessSerializerFactory<String>(serializer, String.class), comparator, 1024 * 1024, 4, 0.8f);
+ new RuntimeStatelessSerializerFactory<String>(serializer, String.class), comparator, 1.0, 4, 0.8f);
MutableObjectIterator<String> sortedData = sorter.getIterator();
@@ -182,7 +182,7 @@ public class MassiveStringSortingITCase {
MutableObjectIterator<Tuple2<String, String[]>> inputIterator = new StringTupleReaderMutableObjectIterator(reader);
sorter = new UnilateralSortMerger<Tuple2<String, String[]>>(mm, ioMan, inputIterator, new DummyInvokable(),
- new RuntimeStatelessSerializerFactory<Tuple2<String, String[]>>(serializer, (Class<Tuple2<String, String[]>>) (Class<?>) Tuple2.class), comparator, 1024 * 1024, 4, 0.8f);
+ new RuntimeStatelessSerializerFactory<Tuple2<String, String[]>>(serializer, (Class<Tuple2<String, String[]>>) (Class<?>) Tuple2.class), comparator, 1.0, 4, 0.8f);
@@ -219,10 +219,6 @@ public class MassiveStringSortingITCase {
nextFromStratoSort = sortedData.next(nextFromStratoSort);
Assert.assertNotNull(nextFromStratoSort);
-
- if (nextFromStratoSort.f0.equals("http://some-uri.com/that/is/a/common/prefix/to/all(()HK;V3__.e*")) {
- System.out.println("Found at position " + num);
- }
Assert.assertEquals(next.f0, nextFromStratoSort.f0);
Assert.assertArrayEquals(next.f1, nextFromStratoSort.f1);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8c1d82a8/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/sort/SortMergeMatchIteratorITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/sort/SortMergeMatchIteratorITCase.java b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/sort/SortMergeMatchIteratorITCase.java
index 0f3f558..81266d2 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/sort/SortMergeMatchIteratorITCase.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/sort/SortMergeMatchIteratorITCase.java
@@ -32,7 +32,7 @@ import eu.stratosphere.api.java.record.functions.JoinFunction;
import eu.stratosphere.nephele.services.iomanager.IOManager;
import eu.stratosphere.nephele.services.memorymanager.MemoryManager;
import eu.stratosphere.nephele.services.memorymanager.spi.DefaultMemoryManager;
-import eu.stratosphere.nephele.template.AbstractTask;
+import eu.stratosphere.nephele.template.AbstractInvokable;
import eu.stratosphere.api.java.typeutils.runtime.record.RecordComparator;
import eu.stratosphere.api.java.typeutils.runtime.record.RecordPairComparator;
import eu.stratosphere.api.java.typeutils.runtime.record.RecordSerializer;
@@ -47,10 +47,9 @@ import eu.stratosphere.types.Value;
import eu.stratosphere.util.Collector;
import eu.stratosphere.util.MutableObjectIterator;
-/**
- */
-public class SortMergeMatchIteratorITCase
-{
+
+public class SortMergeMatchIteratorITCase {
+
// total memory
private static final int MEMORY_SIZE = 1024 * 1024 * 16;
private static final int PAGES_FOR_BNLJN = 2;
@@ -66,7 +65,7 @@ public class SortMergeMatchIteratorITCase
private static final long SEED2 = 231434613412342L;
// dummy abstract task
- private final AbstractTask parentTask = new DummyInvokable();
+ private final AbstractInvokable parentTask = new DummyInvokable();
private IOManager ioManager;
private MemoryManager memoryManager;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8c1d82a8/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/util/HashVsSortMiniBenchmark.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/util/HashVsSortMiniBenchmark.java b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/util/HashVsSortMiniBenchmark.java
index 2999436..b744348 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/util/HashVsSortMiniBenchmark.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/task/util/HashVsSortMiniBenchmark.java
@@ -25,7 +25,7 @@ import eu.stratosphere.api.java.record.functions.JoinFunction;
import eu.stratosphere.nephele.services.iomanager.IOManager;
import eu.stratosphere.nephele.services.memorymanager.MemoryManager;
import eu.stratosphere.nephele.services.memorymanager.spi.DefaultMemoryManager;
-import eu.stratosphere.nephele.template.AbstractTask;
+import eu.stratosphere.nephele.template.AbstractInvokable;
import eu.stratosphere.pact.runtime.hash.BuildFirstHashMatchIterator;
import eu.stratosphere.pact.runtime.hash.BuildSecondHashMatchIterator;
import eu.stratosphere.api.java.typeutils.runtime.record.RecordComparator;
@@ -67,7 +67,7 @@ public class HashVsSortMiniBenchmark {
// dummy abstract task
- private final AbstractTask parentTask = new DummyInvokable();
+ private final AbstractInvokable parentTask = new DummyInvokable();
// memory and io manager
private IOManager ioManager;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8c1d82a8/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/test/util/DummyInvokable.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/test/util/DummyInvokable.java b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/test/util/DummyInvokable.java
index cb0b958..7a4e09e 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/test/util/DummyInvokable.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/test/util/DummyInvokable.java
@@ -13,14 +13,12 @@
package eu.stratosphere.pact.runtime.test.util;
-import eu.stratosphere.nephele.template.AbstractTask;
+import eu.stratosphere.nephele.template.AbstractInvokable;
/**
* An invokable that does nothing.
- *
*/
-public class DummyInvokable extends AbstractTask
-{
+public class DummyInvokable extends AbstractInvokable {
@Override
public void registerInputOutput() {}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8c1d82a8/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/test/util/TaskTestBase.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/test/util/TaskTestBase.java b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/test/util/TaskTestBase.java
index a60b479..efa69af 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/test/util/TaskTestBase.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/pact/runtime/test/util/TaskTestBase.java
@@ -27,9 +27,7 @@ import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.core.fs.FileSystem.WriteMode;
import eu.stratosphere.core.fs.Path;
import eu.stratosphere.nephele.services.memorymanager.MemoryManager;
-import eu.stratosphere.nephele.template.AbstractInputTask;
-import eu.stratosphere.nephele.template.AbstractOutputTask;
-import eu.stratosphere.nephele.template.AbstractTask;
+import eu.stratosphere.nephele.template.AbstractInvokable;
import eu.stratosphere.api.java.typeutils.runtime.record.RecordSerializerFactory;
import eu.stratosphere.pact.runtime.shipping.ShipStrategyType;
import eu.stratosphere.pact.runtime.task.DataSinkTask;
@@ -77,7 +75,7 @@ public abstract class TaskTestBase {
return this.mockEnv.getTaskConfiguration();
}
- public void registerTask(AbstractTask task, @SuppressWarnings("rawtypes") Class<? extends PactDriver> driver, Class<? extends Function> stubClass) {
+ public void registerTask(AbstractInvokable task, @SuppressWarnings("rawtypes") Class<? extends PactDriver> driver, Class<? extends Function> stubClass) {
final TaskConfig config = new TaskConfig(this.mockEnv.getTaskConfiguration());
config.setDriver(driver);
config.setStubWrapper(new UserCodeClassWrapper<Function>(stubClass));
@@ -91,17 +89,16 @@ public abstract class TaskTestBase {
task.registerInputOutput();
}
- public void registerTask(AbstractTask task) {
+ public void registerTask(AbstractInvokable task) {
task.setEnvironment(this.mockEnv);
task.registerInputOutput();
}
- public void registerFileOutputTask(AbstractOutputTask outTask, Class<? extends FileOutputFormat> stubClass, String outPath)
- {
+ public void registerFileOutputTask(AbstractInvokable outTask, Class<? extends FileOutputFormat> stubClass, String outPath) {
registerFileOutputTask(outTask, InstantiationUtil.instantiate(stubClass, FileOutputFormat.class), outPath);
}
- public void registerFileOutputTask(AbstractOutputTask outTask, FileOutputFormat outputFormat, String outPath) {
+ public void registerFileOutputTask(AbstractInvokable outTask, FileOutputFormat outputFormat, String outPath) {
TaskConfig dsConfig = new TaskConfig(this.mockEnv.getTaskConfiguration());
outputFormat.setOutputFilePath(new Path(outPath));
@@ -118,7 +115,7 @@ public abstract class TaskTestBase {
outTask.registerInputOutput();
}
- public void registerFileInputTask(AbstractInputTask<?> inTask,
+ public void registerFileInputTask(AbstractInvokable inTask,
Class<? extends DelimitedInputFormat> stubClass, String inPath, String delimiter)
{
DelimitedInputFormat format;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8c1d82a8/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/network/bufferprovider/LocalBufferPoolTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/network/bufferprovider/LocalBufferPoolTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/network/bufferprovider/LocalBufferPoolTest.java
index c7d8d41..4f9313f 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/network/bufferprovider/LocalBufferPoolTest.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/network/bufferprovider/LocalBufferPoolTest.java
@@ -15,6 +15,8 @@ package eu.stratosphere.runtime.io.network.bufferprovider;
import eu.stratosphere.runtime.io.Buffer;
import eu.stratosphere.runtime.io.network.bufferprovider.BufferProvider.BufferAvailabilityRegistration;
+import eu.stratosphere.util.LogUtils;
+
import org.junit.After;
import org.junit.Assert;
import org.junit.BeforeClass;
@@ -33,6 +35,10 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
public class LocalBufferPoolTest {
+
+ static {
+ LogUtils.initializeDefaultTestConsoleLogger();
+ }
private final static int NUM_BUFFERS = 2048;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8c1d82a8/stratosphere-tests/src/test/java/eu/stratosphere/test/exampleJavaPrograms/TransitiveClosureITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/exampleJavaPrograms/TransitiveClosureITCase.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/exampleJavaPrograms/TransitiveClosureITCase.java
index 96761c8..2c5fa9d 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/exampleJavaPrograms/TransitiveClosureITCase.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/exampleJavaPrograms/TransitiveClosureITCase.java
@@ -41,7 +41,7 @@ public class TransitiveClosureITCase extends JavaProgramTestBase {
@Override
protected void testProgram() throws Exception {
- TransitiveClosureNaive.main(edgesPath, resultPath, "100");
+ TransitiveClosureNaive.main(edgesPath, resultPath, "5");
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8c1d82a8/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/JobGraphUtils.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/JobGraphUtils.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/JobGraphUtils.java
index 109c91a..d18160b 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/JobGraphUtils.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/nephele/JobGraphUtils.java
@@ -30,7 +30,6 @@ import eu.stratosphere.nephele.jobgraph.JobGraphDefinitionException;
import eu.stratosphere.nephele.jobgraph.JobInputVertex;
import eu.stratosphere.nephele.jobgraph.JobOutputVertex;
import eu.stratosphere.nephele.jobgraph.JobTaskVertex;
-import eu.stratosphere.nephele.template.AbstractInputTask;
import eu.stratosphere.pact.runtime.iterative.io.FakeOutputTask;
import eu.stratosphere.pact.runtime.iterative.task.IterationSynchronizationSinkTask;
import eu.stratosphere.pact.runtime.task.DataSinkTask;
@@ -62,9 +61,7 @@ public class JobGraphUtils {
{
JobInputVertex inputVertex = new JobInputVertex(name, graph);
- @SuppressWarnings("unchecked")
- Class<AbstractInputTask<?>> clazz = (Class<AbstractInputTask<?>>) (Class<?>) DataSourceTask.class;
- inputVertex.setInputClass(clazz);
+ inputVertex.setInvokableClass(DataSourceTask.class);
inputVertex.setNumberOfSubtasks(degreeOfParallelism);
@@ -91,14 +88,14 @@ public class JobGraphUtils {
int degreeOfParallelism)
{
JobTaskVertex taskVertex = new JobTaskVertex(name, graph);
- taskVertex.setTaskClass(task);
+ taskVertex.setInvokableClass(task);
taskVertex.setNumberOfSubtasks(degreeOfParallelism);
return taskVertex;
}
public static JobOutputVertex createSync(JobGraph jobGraph, int degreeOfParallelism) {
JobOutputVertex sync = new JobOutputVertex("BulkIterationSync", jobGraph);
- sync.setOutputClass(IterationSynchronizationSinkTask.class);
+ sync.setInvokableClass(IterationSynchronizationSinkTask.class);
sync.setNumberOfSubtasks(1);
TaskConfig syncConfig = new TaskConfig(sync.getConfiguration());
syncConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, degreeOfParallelism);
@@ -108,7 +105,7 @@ public class JobGraphUtils {
public static JobOutputVertex createFakeOutput(JobGraph jobGraph, String name, int degreeOfParallelism)
{
JobOutputVertex outputVertex = new JobOutputVertex(name, jobGraph);
- outputVertex.setOutputClass(FakeOutputTask.class);
+ outputVertex.setInvokableClass(FakeOutputTask.class);
outputVertex.setNumberOfSubtasks(degreeOfParallelism);
return outputVertex;
}
@@ -116,7 +113,7 @@ public class JobGraphUtils {
public static JobOutputVertex createFileOutput(JobGraph jobGraph, String name, int degreeOfParallelism)
{
JobOutputVertex sinkVertex = new JobOutputVertex(name, jobGraph);
- sinkVertex.setOutputClass(DataSinkTask.class);
+ sinkVertex.setInvokableClass(DataSinkTask.class);
sinkVertex.setNumberOfSubtasks(degreeOfParallelism);
return sinkVertex;
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8c1d82a8/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobs/util/DiscardingOutputFormat.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobs/util/DiscardingOutputFormat.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobs/util/DiscardingOutputFormat.java
index 3de547e..aa498d8 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobs/util/DiscardingOutputFormat.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/recordJobs/util/DiscardingOutputFormat.java
@@ -13,8 +13,6 @@
package eu.stratosphere.test.recordJobs.util;
-import java.io.IOException;
-
import eu.stratosphere.api.common.io.OutputFormat;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.types.Record;
@@ -23,28 +21,20 @@ import eu.stratosphere.types.Record;
* A simple output format that discards all data by doing nothing.
*/
public class DiscardingOutputFormat implements OutputFormat<Record> {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 1L;
@Override
- public void configure(Configuration parameters)
- {}
+ public void configure(Configuration parameters) {}
@Override
- public void open(int taskNumber, int numTasks) throws IOException
- {}
-
+ public void open(int taskNumber, int numTasks) {}
@Override
- public void writeRecord(Record record) throws IOException
- {}
-
+ public void writeRecord(Record record) {}
- @Override
- public void close() throws IOException
- {}
@Override
- public void initialize(Configuration configuration){}
+ public void close() {}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8c1d82a8/stratosphere-tests/src/test/java/eu/stratosphere/test/runtime/NetworkStackThroughput.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/runtime/NetworkStackThroughput.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/runtime/NetworkStackThroughput.java
index a8ab311..ed6f608 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/runtime/NetworkStackThroughput.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/runtime/NetworkStackThroughput.java
@@ -13,38 +13,35 @@
package eu.stratosphere.test.runtime;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.junit.After;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.core.io.IOReadableWritable;
import eu.stratosphere.nephele.jobgraph.DistributionPattern;
-import eu.stratosphere.nephele.jobgraph.JobGenericInputVertex;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.nephele.jobgraph.JobGraphDefinitionException;
import eu.stratosphere.nephele.jobgraph.JobInputVertex;
import eu.stratosphere.nephele.jobgraph.JobOutputVertex;
import eu.stratosphere.nephele.jobgraph.JobTaskVertex;
-import eu.stratosphere.nephele.template.AbstractGenericInputTask;
-import eu.stratosphere.nephele.template.AbstractOutputTask;
-import eu.stratosphere.nephele.template.AbstractTask;
+import eu.stratosphere.nephele.template.AbstractInvokable;
import eu.stratosphere.runtime.io.api.RecordReader;
import eu.stratosphere.runtime.io.api.RecordWriter;
import eu.stratosphere.runtime.io.channels.ChannelType;
import eu.stratosphere.test.util.RecordAPITestBase;
import eu.stratosphere.util.LogUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.junit.After;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-
@RunWith(Parameterized.class)
public class NetworkStackThroughput extends RecordAPITestBase {
@@ -153,8 +150,8 @@ public class NetworkStackThroughput extends RecordAPITestBase {
JobGraph jobGraph = new JobGraph("Speed Test");
- JobInputVertex producer = new JobGenericInputVertex("Speed Test Producer", jobGraph);
- producer.setInputClass(SpeedTestProducer.class);
+ JobInputVertex producer = new JobInputVertex("Speed Test Producer", jobGraph);
+ producer.setInvokableClass(SpeedTestProducer.class);
producer.setNumberOfSubtasks(numSubtasks);
producer.getConfiguration().setInteger(DATA_VOLUME_GB_CONFIG_KEY, dataVolumeGb);
producer.getConfiguration().setBoolean(IS_SLOW_SENDER_CONFIG_KEY, isSlowSender);
@@ -162,12 +159,12 @@ public class NetworkStackThroughput extends RecordAPITestBase {
JobTaskVertex forwarder = null;
if (useForwarder) {
forwarder = new JobTaskVertex("Speed Test Forwarder", jobGraph);
- forwarder.setTaskClass(SpeedTestForwarder.class);
+ forwarder.setInvokableClass(SpeedTestForwarder.class);
forwarder.setNumberOfSubtasks(numSubtasks);
}
JobOutputVertex consumer = new JobOutputVertex("Speed Test Consumer", jobGraph);
- consumer.setOutputClass(SpeedTestConsumer.class);
+ consumer.setInvokableClass(SpeedTestConsumer.class);
consumer.setNumberOfSubtasks(numSubtasks);
consumer.getConfiguration().setBoolean(IS_SLOW_RECEIVER_CONFIG_KEY, isSlowReceiver);
@@ -188,7 +185,7 @@ public class NetworkStackThroughput extends RecordAPITestBase {
// ------------------------------------------------------------------------
- public static class SpeedTestProducer extends AbstractGenericInputTask {
+ public static class SpeedTestProducer extends AbstractInvokable {
private RecordWriter<SpeedTestRecord> writer;
@@ -227,7 +224,7 @@ public class NetworkStackThroughput extends RecordAPITestBase {
}
}
- public static class SpeedTestForwarder extends AbstractTask {
+ public static class SpeedTestForwarder extends AbstractInvokable {
private RecordReader<SpeedTestRecord> reader;
@@ -252,7 +249,7 @@ public class NetworkStackThroughput extends RecordAPITestBase {
}
}
- public static class SpeedTestConsumer extends AbstractOutputTask {
+ public static class SpeedTestConsumer extends AbstractInvokable {
private RecordReader<SpeedTestRecord> reader;