You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/06/18 20:52:27 UTC
[1/6] git commit: [FLINK-950] Fix spurious file deletion failures.
Repository: incubator-flink
Updated Branches:
refs/heads/master 3a452e5bb -> 9ecb6df76
[FLINK-950] Fix spurious file deletion failures.
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/5c23b8fc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/5c23b8fc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/5c23b8fc
Branch: refs/heads/master
Commit: 5c23b8fc332f8f92fda36233b45dfa88fb86ede6
Parents: 3a452e5
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Jun 18 18:46:21 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Jun 18 18:46:21 2014 +0200
----------------------------------------------------------------------
.../eu/stratosphere/core/fs/FileSystem.java | 32 ++++++++++++++------
1 file changed, 23 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/5c23b8fc/stratosphere-core/src/main/java/eu/stratosphere/core/fs/FileSystem.java
----------------------------------------------------------------------
diff --git a/stratosphere-core/src/main/java/eu/stratosphere/core/fs/FileSystem.java b/stratosphere-core/src/main/java/eu/stratosphere/core/fs/FileSystem.java
index 007a74f..11c7007 100644
--- a/stratosphere-core/src/main/java/eu/stratosphere/core/fs/FileSystem.java
+++ b/stratosphere-core/src/main/java/eu/stratosphere/core/fs/FileSystem.java
@@ -446,12 +446,20 @@ public abstract class FileSystem {
return false;
}
+ FileStatus status = null;
+ try {
+ status = getFileStatus(outPath);
+ }
+ catch (FileNotFoundException e) {
+ // okay, the file is not there
+ }
+
// check if path exists
- if(this.exists(outPath)) {
+ if (status != null) {
// path exists, check write mode
- switch(writeMode) {
+ switch (writeMode) {
case NO_OVERWRITE:
- if(this.getFileStatus(outPath).isDir()) {
+ if (status.isDir()) {
return true;
} else {
// file may not be overwritten
@@ -460,8 +468,8 @@ public abstract class FileSystem {
" mode to overwrite existing files and directories.");
}
case OVERWRITE:
- if(this.getFileStatus(outPath).isDir()) {
- if(createDirectory) {
+ if (status.isDir()) {
+ if (createDirectory) {
// directory exists and does not need to be created
return true;
} else {
@@ -469,7 +477,9 @@ public abstract class FileSystem {
try {
this.delete(outPath, true);
} catch(IOException ioe) {
- throw new IOException("Could not prepare output path. ",ioe);
+ // due to races in some file systems, it may spuriously occur that a deleted the file looks
+ // as if it still exists and is gone a millisecond later, once the change is committed
+ // we ignore the exception
}
}
} else {
@@ -488,7 +498,7 @@ public abstract class FileSystem {
}
}
- if(createDirectory) {
+ if (createDirectory) {
// Output directory needs to be created
try {
if(!this.exists(outPath)) {
@@ -501,9 +511,13 @@ public abstract class FileSystem {
}
// double check that the output directory exists
- return this.exists(outPath) && this.getFileStatus(outPath).isDir();
+ try {
+ FileStatus check = getFileStatus(outPath);
+ return check.isDir();
+ } catch (FileNotFoundException e) {
+ return false;
+ }
} else {
-
// check that the output path does not exist and an output file can be created by the output format.
return !this.exists(outPath);
}
[5/6] git commit: [FLINK-944] CollectionInputFormat now uses the
TypeSerializer to serialize the collection entries. This allows to use
objects not implementing the Serializable interface as collection elements.
Posted by se...@apache.org.
[FLINK-944] CollectionInputFormat now uses the TypeSerializer to serialize the collection entries. This allows to use objects not implementing the Serializable interface as collection elements.
This closes #25
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/5fa5e502
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/5fa5e502
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/5fa5e502
Branch: refs/heads/master
Commit: 5fa5e50205afb41ebc39f197da07e87c0fdd0334
Parents: f3930e3
Author: Till Rohrmann <ti...@gmail.com>
Authored: Tue Jun 17 19:21:02 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Jun 18 19:03:18 2014 +0200
----------------------------------------------------------------------
.../api/common/io/ByteArrayInputView.java | 114 +++++++++++++++++
.../api/common/io/ByteArrayOutputView.java | 126 +++++++++++++++++++
.../api/java/ExecutionEnvironment.java | 2 +-
.../api/java/io/CollectionInputFormat.java | 68 ++++++++--
.../api/java/io/CollectionInputFormatTest.java | 107 ++++++++++++++++
5 files changed, 407 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/5fa5e502/stratosphere-core/src/main/java/eu/stratosphere/api/common/io/ByteArrayInputView.java
----------------------------------------------------------------------
diff --git a/stratosphere-core/src/main/java/eu/stratosphere/api/common/io/ByteArrayInputView.java b/stratosphere-core/src/main/java/eu/stratosphere/api/common/io/ByteArrayInputView.java
new file mode 100644
index 0000000..7e65f13
--- /dev/null
+++ b/stratosphere-core/src/main/java/eu/stratosphere/api/common/io/ByteArrayInputView.java
@@ -0,0 +1,114 @@
+/*
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package eu.stratosphere.api.common.io;
+
+import eu.stratosphere.core.memory.DataInputView;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+
+/**
+ * Wrapper to use ByteArrayInputStream with TypeSerializers
+ */
+public class ByteArrayInputView implements DataInputView{
+
+ private final ByteArrayInputStream byteArrayInputStream;
+ private final DataInputStream inputStream;
+
+ public ByteArrayInputView(byte[] buffer){
+ byteArrayInputStream = new ByteArrayInputStream(buffer);
+ inputStream = new DataInputStream(byteArrayInputStream);
+ }
+
+ @Override
+ public void skipBytesToRead(int numBytes) throws IOException {
+ inputStream.skipBytes(numBytes);
+ }
+
+ @Override
+ public void readFully(byte[] b) throws IOException {
+ inputStream.readFully(b);
+ }
+
+ @Override
+ public void readFully(byte[] b, int off, int len) throws IOException {
+ inputStream.readFully(b, off, len);
+ }
+
+ @Override
+ public int skipBytes(int n) throws IOException {
+ return inputStream.skipBytes(n);
+ }
+
+ @Override
+ public boolean readBoolean() throws IOException {
+ return inputStream.readBoolean();
+ }
+
+ @Override
+ public byte readByte() throws IOException {
+ return inputStream.readByte();
+ }
+
+ @Override
+ public int readUnsignedByte() throws IOException {
+ return inputStream.readUnsignedByte();
+ }
+
+ @Override
+ public short readShort() throws IOException {
+ return inputStream.readShort();
+ }
+
+ @Override
+ public int readUnsignedShort() throws IOException {
+ return inputStream.readUnsignedShort();
+ }
+
+ @Override
+ public char readChar() throws IOException {
+ return inputStream.readChar();
+ }
+
+ @Override
+ public int readInt() throws IOException {
+ return inputStream.readInt();
+ }
+
+ @Override
+ public long readLong() throws IOException {
+ return inputStream.readLong();
+ }
+
+ @Override
+ public float readFloat() throws IOException {
+ return inputStream.readFloat();
+ }
+
+ @Override
+ public double readDouble() throws IOException {
+ return inputStream.readDouble();
+ }
+
+ @Override
+ public String readLine() throws IOException {
+ return inputStream.readLine();
+ }
+
+ @Override
+ public String readUTF() throws IOException {
+ return inputStream.readUTF();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/5fa5e502/stratosphere-core/src/main/java/eu/stratosphere/api/common/io/ByteArrayOutputView.java
----------------------------------------------------------------------
diff --git a/stratosphere-core/src/main/java/eu/stratosphere/api/common/io/ByteArrayOutputView.java b/stratosphere-core/src/main/java/eu/stratosphere/api/common/io/ByteArrayOutputView.java
new file mode 100644
index 0000000..b96338f
--- /dev/null
+++ b/stratosphere-core/src/main/java/eu/stratosphere/api/common/io/ByteArrayOutputView.java
@@ -0,0 +1,126 @@
+/*
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package eu.stratosphere.api.common.io;
+
+import eu.stratosphere.core.memory.DataInputView;
+import eu.stratosphere.core.memory.DataOutputView;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+/**
+ * Wrapper class to use ByteArrayOutputStream with TypeSerializers.
+ */
+public class ByteArrayOutputView implements DataOutputView {
+ private final ByteArrayOutputStream byteOutputStream;
+ private final DataOutputStream outputStream;
+
+ public ByteArrayOutputView(){
+ byteOutputStream = new ByteArrayOutputStream();
+ outputStream = new DataOutputStream(byteOutputStream);
+ }
+
+ public byte[] getByteArray(){
+ return byteOutputStream.toByteArray();
+ }
+
+ public void reset() {
+ byteOutputStream.reset();
+ }
+
+ @Override
+ public void skipBytesToWrite(int numBytes) throws IOException {
+ for(int i=0; i<numBytes; i++){
+ writeByte(0);
+ }
+ }
+
+ @Override
+ public void write(DataInputView source, int numBytes) throws IOException {
+ byte[] buffer = new byte[numBytes];
+ source.readFully(buffer);
+ outputStream.write(buffer);
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ outputStream.write(b);
+ }
+
+ @Override
+ public void write(byte[] b) throws IOException {
+ outputStream.write(b);
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ outputStream.write(b, off, len);
+ }
+
+ @Override
+ public void writeBoolean(boolean v) throws IOException {
+ outputStream.writeBoolean(v);
+ }
+
+ @Override
+ public void writeByte(int v) throws IOException {
+ outputStream.writeByte(v);
+ }
+
+ @Override
+ public void writeShort(int v) throws IOException {
+ outputStream.writeShort(v);
+ }
+
+ @Override
+ public void writeChar(int v) throws IOException {
+ outputStream.writeChar(v);
+ }
+
+ @Override
+ public void writeInt(int v) throws IOException {
+ outputStream.writeInt(v);
+ }
+
+ @Override
+ public void writeLong(long v) throws IOException {
+ outputStream.writeLong(v);
+ }
+
+ @Override
+ public void writeFloat(float v) throws IOException {
+ outputStream.writeFloat(v);
+ }
+
+ @Override
+ public void writeDouble(double v) throws IOException {
+ outputStream.writeDouble(v);
+ }
+
+ @Override
+ public void writeBytes(String s) throws IOException {
+ outputStream.writeBytes(s);
+ }
+
+ @Override
+ public void writeChars(String s) throws IOException {
+ outputStream.writeChars(s);
+ }
+
+ @Override
+ public void writeUTF(String s) throws IOException {
+ outputStream.writeUTF(s);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/5fa5e502/stratosphere-java/src/main/java/eu/stratosphere/api/java/ExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/stratosphere-java/src/main/java/eu/stratosphere/api/java/ExecutionEnvironment.java b/stratosphere-java/src/main/java/eu/stratosphere/api/java/ExecutionEnvironment.java
index 2f7aef3..1340477 100644
--- a/stratosphere-java/src/main/java/eu/stratosphere/api/java/ExecutionEnvironment.java
+++ b/stratosphere-java/src/main/java/eu/stratosphere/api/java/ExecutionEnvironment.java
@@ -362,7 +362,7 @@ public abstract class ExecutionEnvironment {
public <X> DataSource<X> fromCollection(Collection<X> data, TypeInformation<X> type) {
CollectionInputFormat.checkCollection(data, type.getTypeClass());
- return new DataSource<X>(this, new CollectionInputFormat<X>(data), type);
+ return new DataSource<X>(this, new CollectionInputFormat<X>(data, type.createSerializer()), type);
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/5fa5e502/stratosphere-java/src/main/java/eu/stratosphere/api/java/io/CollectionInputFormat.java
----------------------------------------------------------------------
diff --git a/stratosphere-java/src/main/java/eu/stratosphere/api/java/io/CollectionInputFormat.java b/stratosphere-java/src/main/java/eu/stratosphere/api/java/io/CollectionInputFormat.java
index 8d051af..5e513b2 100644
--- a/stratosphere-java/src/main/java/eu/stratosphere/api/java/io/CollectionInputFormat.java
+++ b/stratosphere-java/src/main/java/eu/stratosphere/api/java/io/CollectionInputFormat.java
@@ -16,14 +16,20 @@
package eu.stratosphere.api.java.io;
import java.io.IOException;
-import java.io.Serializable;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
+import java.util.List;
-import eu.stratosphere.api.common.InvalidProgramException;
+import eu.stratosphere.api.common.io.ByteArrayInputView;
+import eu.stratosphere.api.common.io.ByteArrayOutputView;
import eu.stratosphere.api.common.io.GenericInputFormat;
import eu.stratosphere.api.common.io.NonParallelInput;
+import eu.stratosphere.api.common.typeutils.TypeSerializer;
import eu.stratosphere.core.io.GenericInputSplit;
+import eu.stratosphere.core.memory.DataInputView;
/**
* An input format that returns objects from a collection.
@@ -32,16 +38,19 @@ public class CollectionInputFormat<T> extends GenericInputFormat<T> implements N
private static final long serialVersionUID = 1L;
- private final Collection<T> dataSet; // input data as collection
+ private Collection<T> dataSet; // input data as collection
+
+ private TypeSerializer<T> serializer;
private transient Iterator<T> iterator;
-
- public CollectionInputFormat(Collection<T> dataSet) {
+ public CollectionInputFormat(Collection<T> dataSet, TypeSerializer<T> serializer) {
if (dataSet == null) {
throw new NullPointerException();
}
+
+ this.serializer = serializer;
this.dataSet = dataSet;
}
@@ -63,6 +72,51 @@ public class CollectionInputFormat<T> extends GenericInputFormat<T> implements N
public T nextRecord(T record) throws IOException {
return this.iterator.next();
}
+
+ // --------------------------------------------------------------------------------------------
+
+ private void writeObject(ObjectOutputStream out) throws IOException{
+ out.writeObject(serializer);
+ out.writeInt(dataSet.size());
+ ByteArrayOutputView outputView = new ByteArrayOutputView();
+ for(T element : dataSet){
+ serializer.serialize(element, outputView);
+ }
+
+ byte[] blob = outputView.getByteArray();
+ out.writeInt(blob.length);
+ out.write(blob);
+ }
+
+ private void readObject(ObjectInputStream in) throws IOException{
+ try{
+ Object obj = in.readObject();
+
+ if(obj instanceof TypeSerializer<?>){
+ serializer = (TypeSerializer<T>)obj;
+ }
+ }catch(ClassNotFoundException ex){
+ throw new IOException(ex);
+ }
+
+
+ int collectionLength = in.readInt();
+ List<T> list = new ArrayList<T>(collectionLength);
+
+ int blobLength = in.readInt();
+ byte[] blob = new byte[blobLength];
+ in.readFully(blob);
+
+ DataInputView inputView = new ByteArrayInputView(blob);
+
+ for(int i=0; i< collectionLength; i++){
+ T element = serializer.createInstance();
+ element = serializer.deserialize(element, inputView);
+ list.add(element);
+ }
+
+ dataSet = list;
+ }
// --------------------------------------------------------------------------------------------
@@ -78,10 +132,6 @@ public class CollectionInputFormat<T> extends GenericInputFormat<T> implements N
throw new NullPointerException();
}
- if (!Serializable.class.isAssignableFrom(viewedAs)) {
- throw new InvalidProgramException("The elements are not serializable (java.io.Serializable).");
- }
-
for (X elem : elements) {
if (elem == null) {
throw new IllegalArgumentException("The collection must not contain null elements.");
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/5fa5e502/stratosphere-java/src/test/java/eu/stratosphere/api/java/io/CollectionInputFormatTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-java/src/test/java/eu/stratosphere/api/java/io/CollectionInputFormatTest.java b/stratosphere-java/src/test/java/eu/stratosphere/api/java/io/CollectionInputFormatTest.java
new file mode 100644
index 0000000..781e7a8
--- /dev/null
+++ b/stratosphere-java/src/test/java/eu/stratosphere/api/java/io/CollectionInputFormatTest.java
@@ -0,0 +1,107 @@
+/*
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package eu.stratosphere.api.java.io;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import eu.stratosphere.api.java.typeutils.TypeExtractor;
+import eu.stratosphere.core.io.GenericInputSplit;
+import eu.stratosphere.types.TypeInformation;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+
+public class CollectionInputFormatTest {
+ public static class ElementType{
+ private int id;
+
+ public ElementType(){
+ this(-1);
+ }
+
+ public ElementType(int id){
+ this.id = id;
+ }
+
+ public int getId(){return id;}
+
+ @Override
+ public boolean equals(Object obj){
+ if(obj != null && obj instanceof ElementType){
+ ElementType et = (ElementType) obj;
+
+ return et.getId() == this.getId();
+ }else {
+ return false;
+ }
+ }
+ }
+
+ @Test
+ public void testSerializability(){
+ Collection<ElementType> inputCollection = new ArrayList<ElementType>();
+ ElementType element1 = new ElementType(1);
+ ElementType element2 = new ElementType(2);
+ ElementType element3 = new ElementType(3);
+ inputCollection.add(element1);
+ inputCollection.add(element2);
+ inputCollection.add(element3);
+
+ TypeInformation<ElementType> info = (TypeInformation<ElementType>)TypeExtractor.createTypeInfo(ElementType
+ .class);
+
+ CollectionInputFormat<ElementType> inputFormat = new CollectionInputFormat<ElementType>(inputCollection,
+ info.createSerializer());
+
+ try{
+ ByteArrayOutputStream buffer = new ByteArrayOutputStream();
+ ObjectOutputStream out = new ObjectOutputStream(buffer);
+
+ out.writeObject(inputFormat);
+
+ ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(buffer.toByteArray()));
+
+ Object serializationResult = in.readObject();
+
+ assertNotNull(serializationResult);
+ assertTrue(serializationResult instanceof CollectionInputFormat<?>);
+
+ CollectionInputFormat<ElementType> result = (CollectionInputFormat<ElementType>) serializationResult;
+
+ GenericInputSplit inputSplit = new GenericInputSplit();
+ inputFormat.open(inputSplit);
+ result.open(inputSplit);
+
+ while(!inputFormat.reachedEnd() && !result.reachedEnd()){
+ ElementType expectedElement = inputFormat.nextRecord(null);
+ ElementType actualElement = result.nextRecord(null);
+
+ assertEquals(expectedElement, actualElement);
+ }
+ }catch(IOException ex){
+ fail(ex.toString());
+ }catch(ClassNotFoundException ex){
+ fail(ex.toString());
+ }
+ }
+}
[3/6] git commit: [FLINK-943] Remove leading and ending double quotes
from 'env.java.opts' config value in startup scripts
Posted by se...@apache.org.
[FLINK-943] Remove leading and ending double quotes from 'env.java.opts' config value in startup scripts
This closes #26
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/b2bd4697
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/b2bd4697
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/b2bd4697
Branch: refs/heads/master
Commit: b2bd4697af1984a00af3ded443b93528cf146769
Parents: 6ee874a
Author: uce <u....@fu-berlin.de>
Authored: Wed Jun 18 00:48:15 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Jun 18 18:54:10 2014 +0200
----------------------------------------------------------------------
stratosphere-dist/src/main/stratosphere-bin/bin/config.sh | 3 +++
stratosphere-dist/src/main/stratosphere-bin/bin/jobmanager.sh | 2 +-
stratosphere-dist/src/main/stratosphere-bin/bin/taskmanager.sh | 2 +-
3 files changed, 5 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b2bd4697/stratosphere-dist/src/main/stratosphere-bin/bin/config.sh
----------------------------------------------------------------------
diff --git a/stratosphere-dist/src/main/stratosphere-bin/bin/config.sh b/stratosphere-dist/src/main/stratosphere-bin/bin/config.sh
index 9168813..64f8ea7 100755
--- a/stratosphere-dist/src/main/stratosphere-bin/bin/config.sh
+++ b/stratosphere-dist/src/main/stratosphere-bin/bin/config.sh
@@ -169,6 +169,9 @@ fi
if [ -z "${STRATOSPHERE_ENV_JAVA_OPTS}" ]; then
STRATOSPHERE_ENV_JAVA_OPTS=$(readFromConfig ${KEY_ENV_JAVA_OPTS} "${DEFAULT_ENV_JAVA_OPTS}" "${YAML_CONF}")
+
+ # Remove leading and ending double quotes (if present) of value
+ STRATOSPHERE_ENV_JAVA_OPTS="$( echo "${STRATOSPHERE_ENV_JAVA_OPTS}" | sed -e 's/^"//' -e 's/"$//' )"
fi
if [ -z "${STRATOSPHERE_SSH_OPTS}" ]; then
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b2bd4697/stratosphere-dist/src/main/stratosphere-bin/bin/jobmanager.sh
----------------------------------------------------------------------
diff --git a/stratosphere-dist/src/main/stratosphere-bin/bin/jobmanager.sh b/stratosphere-dist/src/main/stratosphere-bin/bin/jobmanager.sh
index e474e5e..a2ef1be 100755
--- a/stratosphere-dist/src/main/stratosphere-bin/bin/jobmanager.sh
+++ b/stratosphere-dist/src/main/stratosphere-bin/bin/jobmanager.sh
@@ -72,7 +72,7 @@ case $STARTSTOP in
rotateLogFile $out
echo Starting job manager
- $JAVA_RUN $JVM_ARGS $STRATOSPHERE_ENV_JAVA_OPTS "${log_setting[@]}" -classpath "$STRATOSPHERE_JM_CLASSPATH" eu.stratosphere.nephele.jobmanager.JobManager -executionMode $EXECUTIONMODE -configDir "$STRATOSPHERE_CONF_DIR" > "$out" 2>&1 < /dev/null &
+ $JAVA_RUN $JVM_ARGS ${STRATOSPHERE_ENV_JAVA_OPTS} "${log_setting[@]}" -classpath "$STRATOSPHERE_JM_CLASSPATH" eu.stratosphere.nephele.jobmanager.JobManager -executionMode $EXECUTIONMODE -configDir "$STRATOSPHERE_CONF_DIR" > "$out" 2>&1 < /dev/null &
echo $! > $pid
;;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b2bd4697/stratosphere-dist/src/main/stratosphere-bin/bin/taskmanager.sh
----------------------------------------------------------------------
diff --git a/stratosphere-dist/src/main/stratosphere-bin/bin/taskmanager.sh b/stratosphere-dist/src/main/stratosphere-bin/bin/taskmanager.sh
index ea6e468..aa92b6e 100755
--- a/stratosphere-dist/src/main/stratosphere-bin/bin/taskmanager.sh
+++ b/stratosphere-dist/src/main/stratosphere-bin/bin/taskmanager.sh
@@ -69,7 +69,7 @@ case $STARTSTOP in
rotateLogFile $out
echo Starting task manager on host $HOSTNAME
- $JAVA_RUN $JVM_ARGS $STRATOSPHERE_ENV_JAVA_OPTS "${log_setting[@]}" -classpath "$STRATOSPHERE_TM_CLASSPATH" eu.stratosphere.nephele.taskmanager.TaskManager -configDir "$STRATOSPHERE_CONF_DIR" > "$out" 2>&1 < /dev/null &
+ $JAVA_RUN $JVM_ARGS ${STRATOSPHERE_ENV_JAVA_OPTS} "${log_setting[@]}" -classpath "$STRATOSPHERE_TM_CLASSPATH" eu.stratosphere.nephele.taskmanager.TaskManager -configDir "$STRATOSPHERE_CONF_DIR" > "$out" 2>&1 < /dev/null &
echo $! > $pid
;;
[2/6] git commit: Projection constructor adjusts to max tuple size
Posted by se...@apache.org.
Projection constructor adjusts to max tuple size
This closes #27
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/6ee874aa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/6ee874aa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/6ee874aa
Branch: refs/heads/master
Commit: 6ee874aa05e0eeda133bcc6100cb2330e1d27154
Parents: 5c23b8f
Author: zentol <s....@web.de>
Authored: Wed Jun 18 14:06:12 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Jun 18 18:51:01 2014 +0200
----------------------------------------------------------------------
.../eu/stratosphere/api/java/operators/ProjectOperator.java | 5 +++--
.../stratosphere/api/java/operator/ProjectionOperatorTest.java | 2 +-
2 files changed, 4 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6ee874aa/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/ProjectOperator.java
----------------------------------------------------------------------
diff --git a/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/ProjectOperator.java b/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/ProjectOperator.java
index 31583e9..1907393 100644
--- a/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/ProjectOperator.java
+++ b/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/ProjectOperator.java
@@ -72,8 +72,9 @@ public class ProjectOperator<IN, OUT extends Tuple>
if(fieldIndexes.length == 0) {
throw new IllegalArgumentException("project() needs to select at least one (1) field.");
- } else if(fieldIndexes.length > 22) {
- throw new IllegalArgumentException("project() may select only up to twenty-two (22) fields.");
+ } else if(fieldIndexes.length > Tuple.MAX_ARITY - 1) {
+ throw new IllegalArgumentException(
+ "project() may select only up to (" + (Tuple.MAX_ARITY - 1) + ") fields.");
}
int maxFieldIndex = ((TupleTypeInfo<?>)ds.getType()).getArity();
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6ee874aa/stratosphere-java/src/test/java/eu/stratosphere/api/java/operator/ProjectionOperatorTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-java/src/test/java/eu/stratosphere/api/java/operator/ProjectionOperatorTest.java b/stratosphere-java/src/test/java/eu/stratosphere/api/java/operator/ProjectionOperatorTest.java
index a5ac562..ac496c2 100644
--- a/stratosphere-java/src/test/java/eu/stratosphere/api/java/operator/ProjectionOperatorTest.java
+++ b/stratosphere-java/src/test/java/eu/stratosphere/api/java/operator/ProjectionOperatorTest.java
@@ -62,7 +62,7 @@ public class ProjectionOperatorTest {
// should not work: too many fields
try {
- tupleDs.project(0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22);
+ tupleDs.project(0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25);
Assert.fail();
} catch(IllegalArgumentException iae) {
// we're good here
[6/6] git commit: [FLINK-944] Consolidated serialization logic
between different classes. Fixes for warnings.
Posted by se...@apache.org.
[FLINK-944] Consolidated serialization logic between different classes. Fixes for warnings.
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/9ecb6df7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/9ecb6df7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/9ecb6df7
Branch: refs/heads/master
Commit: 9ecb6df76ca5e7a42d0d62b1c9dca41e9e7ecd1b
Parents: 5fa5e50
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Jun 18 19:21:27 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Jun 18 20:51:52 2014 +0200
----------------------------------------------------------------------
.../api/common/io/ByteArrayInputView.java | 114 -----------------
.../api/common/io/ByteArrayOutputView.java | 126 -------------------
.../core/memory/InputViewDataInputWrapper.java | 113 +++++++++++++++++
.../memory/OutputViewDataOutputWrapper.java | 116 +++++++++++++++++
.../java/graph/TransitiveClosureNaive.java | 2 +-
.../api/java/io/CollectionInputFormat.java | 49 +++-----
.../api/java/io/CollectionInputFormatTest.java | 11 +-
.../plugable/DeserializationDelegate.java | 102 +--------------
.../runtime/plugable/SerializationDelegate.java | 106 +---------------
.../CoGroupConnectedComponentsSecondITCase.java | 4 -
10 files changed, 262 insertions(+), 481 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/9ecb6df7/stratosphere-core/src/main/java/eu/stratosphere/api/common/io/ByteArrayInputView.java
----------------------------------------------------------------------
diff --git a/stratosphere-core/src/main/java/eu/stratosphere/api/common/io/ByteArrayInputView.java b/stratosphere-core/src/main/java/eu/stratosphere/api/common/io/ByteArrayInputView.java
deleted file mode 100644
index 7e65f13..0000000
--- a/stratosphere-core/src/main/java/eu/stratosphere/api/common/io/ByteArrayInputView.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-
-package eu.stratosphere.api.common.io;
-
-import eu.stratosphere.core.memory.DataInputView;
-
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.io.IOException;
-
-/**
- * Wrapper to use ByteArrayInputStream with TypeSerializers
- */
-public class ByteArrayInputView implements DataInputView{
-
- private final ByteArrayInputStream byteArrayInputStream;
- private final DataInputStream inputStream;
-
- public ByteArrayInputView(byte[] buffer){
- byteArrayInputStream = new ByteArrayInputStream(buffer);
- inputStream = new DataInputStream(byteArrayInputStream);
- }
-
- @Override
- public void skipBytesToRead(int numBytes) throws IOException {
- inputStream.skipBytes(numBytes);
- }
-
- @Override
- public void readFully(byte[] b) throws IOException {
- inputStream.readFully(b);
- }
-
- @Override
- public void readFully(byte[] b, int off, int len) throws IOException {
- inputStream.readFully(b, off, len);
- }
-
- @Override
- public int skipBytes(int n) throws IOException {
- return inputStream.skipBytes(n);
- }
-
- @Override
- public boolean readBoolean() throws IOException {
- return inputStream.readBoolean();
- }
-
- @Override
- public byte readByte() throws IOException {
- return inputStream.readByte();
- }
-
- @Override
- public int readUnsignedByte() throws IOException {
- return inputStream.readUnsignedByte();
- }
-
- @Override
- public short readShort() throws IOException {
- return inputStream.readShort();
- }
-
- @Override
- public int readUnsignedShort() throws IOException {
- return inputStream.readUnsignedShort();
- }
-
- @Override
- public char readChar() throws IOException {
- return inputStream.readChar();
- }
-
- @Override
- public int readInt() throws IOException {
- return inputStream.readInt();
- }
-
- @Override
- public long readLong() throws IOException {
- return inputStream.readLong();
- }
-
- @Override
- public float readFloat() throws IOException {
- return inputStream.readFloat();
- }
-
- @Override
- public double readDouble() throws IOException {
- return inputStream.readDouble();
- }
-
- @Override
- public String readLine() throws IOException {
- return inputStream.readLine();
- }
-
- @Override
- public String readUTF() throws IOException {
- return inputStream.readUTF();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/9ecb6df7/stratosphere-core/src/main/java/eu/stratosphere/api/common/io/ByteArrayOutputView.java
----------------------------------------------------------------------
diff --git a/stratosphere-core/src/main/java/eu/stratosphere/api/common/io/ByteArrayOutputView.java b/stratosphere-core/src/main/java/eu/stratosphere/api/common/io/ByteArrayOutputView.java
deleted file mode 100644
index b96338f..0000000
--- a/stratosphere-core/src/main/java/eu/stratosphere/api/common/io/ByteArrayOutputView.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-
-package eu.stratosphere.api.common.io;
-
-import eu.stratosphere.core.memory.DataInputView;
-import eu.stratosphere.core.memory.DataOutputView;
-
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-
-/**
- * Wrapper class to use ByteArrayOutputStream with TypeSerializers.
- */
-public class ByteArrayOutputView implements DataOutputView {
- private final ByteArrayOutputStream byteOutputStream;
- private final DataOutputStream outputStream;
-
- public ByteArrayOutputView(){
- byteOutputStream = new ByteArrayOutputStream();
- outputStream = new DataOutputStream(byteOutputStream);
- }
-
- public byte[] getByteArray(){
- return byteOutputStream.toByteArray();
- }
-
- public void reset() {
- byteOutputStream.reset();
- }
-
- @Override
- public void skipBytesToWrite(int numBytes) throws IOException {
- for(int i=0; i<numBytes; i++){
- writeByte(0);
- }
- }
-
- @Override
- public void write(DataInputView source, int numBytes) throws IOException {
- byte[] buffer = new byte[numBytes];
- source.readFully(buffer);
- outputStream.write(buffer);
- }
-
- @Override
- public void write(int b) throws IOException {
- outputStream.write(b);
- }
-
- @Override
- public void write(byte[] b) throws IOException {
- outputStream.write(b);
- }
-
- @Override
- public void write(byte[] b, int off, int len) throws IOException {
- outputStream.write(b, off, len);
- }
-
- @Override
- public void writeBoolean(boolean v) throws IOException {
- outputStream.writeBoolean(v);
- }
-
- @Override
- public void writeByte(int v) throws IOException {
- outputStream.writeByte(v);
- }
-
- @Override
- public void writeShort(int v) throws IOException {
- outputStream.writeShort(v);
- }
-
- @Override
- public void writeChar(int v) throws IOException {
- outputStream.writeChar(v);
- }
-
- @Override
- public void writeInt(int v) throws IOException {
- outputStream.writeInt(v);
- }
-
- @Override
- public void writeLong(long v) throws IOException {
- outputStream.writeLong(v);
- }
-
- @Override
- public void writeFloat(float v) throws IOException {
- outputStream.writeFloat(v);
- }
-
- @Override
- public void writeDouble(double v) throws IOException {
- outputStream.writeDouble(v);
- }
-
- @Override
- public void writeBytes(String s) throws IOException {
- outputStream.writeBytes(s);
- }
-
- @Override
- public void writeChars(String s) throws IOException {
- outputStream.writeChars(s);
- }
-
- @Override
- public void writeUTF(String s) throws IOException {
- outputStream.writeUTF(s);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/9ecb6df7/stratosphere-core/src/main/java/eu/stratosphere/core/memory/InputViewDataInputWrapper.java
----------------------------------------------------------------------
diff --git a/stratosphere-core/src/main/java/eu/stratosphere/core/memory/InputViewDataInputWrapper.java b/stratosphere-core/src/main/java/eu/stratosphere/core/memory/InputViewDataInputWrapper.java
new file mode 100644
index 0000000..c2451ef
--- /dev/null
+++ b/stratosphere-core/src/main/java/eu/stratosphere/core/memory/InputViewDataInputWrapper.java
@@ -0,0 +1,113 @@
+/***********************************************************************************************************************
+ *
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ *
+ **********************************************************************************************************************/
+
+package eu.stratosphere.core.memory;
+
+import java.io.DataInput;
+import java.io.IOException;
+
+/**
+ * A utility that presents a {@link DataInput} as a {@link DataInputView}.
+ */
+public class InputViewDataInputWrapper implements DataInputView {
+
+ private DataInput delegate;
+
+ public void setDelegate(DataInput delegate) {
+ this.delegate = delegate;
+ }
+
+ @Override
+ public void readFully(byte[] b) throws IOException {
+ this.delegate.readFully(b);
+ }
+
+ @Override
+ public void readFully(byte[] b, int off, int len) throws IOException {
+ this.delegate.readFully(b, off, len);
+ }
+
+ @Override
+ public int skipBytes(int n) throws IOException {
+ return this.delegate.skipBytes(n);
+ }
+
+ @Override
+ public boolean readBoolean() throws IOException {
+ return this.delegate.readBoolean();
+ }
+
+ @Override
+ public byte readByte() throws IOException {
+ return this.delegate.readByte();
+ }
+
+ @Override
+ public int readUnsignedByte() throws IOException {
+ return this.delegate.readUnsignedByte();
+ }
+
+ @Override
+ public short readShort() throws IOException {
+ return this.delegate.readShort();
+ }
+
+ @Override
+ public int readUnsignedShort() throws IOException {
+ return this.delegate.readUnsignedShort();
+ }
+
+ @Override
+ public char readChar() throws IOException {
+ return this.delegate.readChar();
+ }
+
+ @Override
+ public int readInt() throws IOException {
+ return this.delegate.readInt();
+ }
+
+ @Override
+ public long readLong() throws IOException {
+ return this.delegate.readLong();
+ }
+
+ @Override
+ public float readFloat() throws IOException {
+ return this.delegate.readFloat();
+ }
+
+ @Override
+ public double readDouble() throws IOException {
+ return this.delegate.readDouble();
+ }
+
+ @Override
+ public String readLine() throws IOException {
+ return this.delegate.readLine();
+ }
+
+ @Override
+ public String readUTF() throws IOException {
+ return this.delegate.readUTF();
+ }
+
+ @Override
+ public void skipBytesToRead(int numBytes) throws IOException {
+ for (int i = 0; i < numBytes; i++) {
+ this.delegate.readByte();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/9ecb6df7/stratosphere-core/src/main/java/eu/stratosphere/core/memory/OutputViewDataOutputWrapper.java
----------------------------------------------------------------------
diff --git a/stratosphere-core/src/main/java/eu/stratosphere/core/memory/OutputViewDataOutputWrapper.java b/stratosphere-core/src/main/java/eu/stratosphere/core/memory/OutputViewDataOutputWrapper.java
new file mode 100644
index 0000000..7bb8f8c
--- /dev/null
+++ b/stratosphere-core/src/main/java/eu/stratosphere/core/memory/OutputViewDataOutputWrapper.java
@@ -0,0 +1,116 @@
+/***********************************************************************************************************************
+ *
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ *
+ **********************************************************************************************************************/
+
+package eu.stratosphere.core.memory;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * A utility that presents a {@link DataOutput} as a {@link DataOutputView}.
+ */
+public class OutputViewDataOutputWrapper implements DataOutputView {
+
+ private DataOutput delegate;
+
+ public void setDelegate(DataOutput delegate) {
+ this.delegate = delegate;
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ this.delegate.write(b);
+ }
+
+ @Override
+ public void write(byte[] b) throws IOException {
+ this.delegate.write(b);
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ this.delegate.write(b, off, len);
+ }
+
+ @Override
+ public void writeBoolean(boolean v) throws IOException {
+ this.delegate.writeBoolean(v);
+ }
+
+ @Override
+ public void writeByte(int v) throws IOException {
+ this.delegate.writeByte(v);
+ }
+
+ @Override
+ public void writeShort(int v) throws IOException {
+ this.delegate.writeShort(v);
+ }
+
+ @Override
+ public void writeChar(int v) throws IOException {
+ this.delegate.writeChar(v);
+ }
+
+ @Override
+ public void writeInt(int v) throws IOException {
+ this.delegate.writeInt(v);
+ }
+
+ @Override
+ public void writeLong(long v) throws IOException {
+ this.delegate.writeLong(v);
+ }
+
+ @Override
+ public void writeFloat(float v) throws IOException {
+ this.delegate.writeFloat(v);
+ }
+
+ @Override
+ public void writeDouble(double v) throws IOException {
+ this.delegate.writeDouble(v);
+ }
+
+ @Override
+ public void writeBytes(String s) throws IOException {
+ this.delegate.writeBytes(s);
+ }
+
+ @Override
+ public void writeChars(String s) throws IOException {
+ this.delegate.writeChars(s);
+ }
+
+ @Override
+ public void writeUTF(String s) throws IOException {
+ this.delegate.writeUTF(s);
+ }
+
+ @Override
+ public void skipBytesToWrite(int numBytes) throws IOException {
+ // skip by writing zeros.
+ for (int i = 0; i < numBytes; i++) {
+ this.delegate.writeByte(0);
+ }
+ }
+
+ @Override
+ public void write(DataInputView source, int numBytes) throws IOException {
+ for (int i = 0; i < numBytes; i++) {
+ this.delegate.writeByte(source.readByte());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/9ecb6df7/stratosphere-examples/stratosphere-java-examples/src/main/java/eu/stratosphere/example/java/graph/TransitiveClosureNaive.java
----------------------------------------------------------------------
diff --git a/stratosphere-examples/stratosphere-java-examples/src/main/java/eu/stratosphere/example/java/graph/TransitiveClosureNaive.java b/stratosphere-examples/stratosphere-java-examples/src/main/java/eu/stratosphere/example/java/graph/TransitiveClosureNaive.java
index 3fd1627..79585ec 100644
--- a/stratosphere-examples/stratosphere-java-examples/src/main/java/eu/stratosphere/example/java/graph/TransitiveClosureNaive.java
+++ b/stratosphere-examples/stratosphere-java-examples/src/main/java/eu/stratosphere/example/java/graph/TransitiveClosureNaive.java
@@ -25,7 +25,7 @@ import eu.stratosphere.util.Collector;
import java.util.Iterator;
-
+@SuppressWarnings("serial")
public class TransitiveClosureNaive implements ProgramDescription {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/9ecb6df7/stratosphere-java/src/main/java/eu/stratosphere/api/java/io/CollectionInputFormat.java
----------------------------------------------------------------------
diff --git a/stratosphere-java/src/main/java/eu/stratosphere/api/java/io/CollectionInputFormat.java b/stratosphere-java/src/main/java/eu/stratosphere/api/java/io/CollectionInputFormat.java
index 5e513b2..fd5ae36 100644
--- a/stratosphere-java/src/main/java/eu/stratosphere/api/java/io/CollectionInputFormat.java
+++ b/stratosphere-java/src/main/java/eu/stratosphere/api/java/io/CollectionInputFormat.java
@@ -23,13 +23,12 @@ import java.util.Collection;
import java.util.Iterator;
import java.util.List;
-import eu.stratosphere.api.common.io.ByteArrayInputView;
-import eu.stratosphere.api.common.io.ByteArrayOutputView;
import eu.stratosphere.api.common.io.GenericInputFormat;
import eu.stratosphere.api.common.io.NonParallelInput;
import eu.stratosphere.api.common.typeutils.TypeSerializer;
import eu.stratosphere.core.io.GenericInputSplit;
-import eu.stratosphere.core.memory.DataInputView;
+import eu.stratosphere.core.memory.InputViewDataInputWrapper;
+import eu.stratosphere.core.memory.OutputViewDataOutputWrapper;
/**
* An input format that returns objects from a collection.
@@ -75,43 +74,35 @@ public class CollectionInputFormat<T> extends GenericInputFormat<T> implements N
// --------------------------------------------------------------------------------------------
- private void writeObject(ObjectOutputStream out) throws IOException{
+ private void writeObject(ObjectOutputStream out) throws IOException {
out.writeObject(serializer);
out.writeInt(dataSet.size());
- ByteArrayOutputView outputView = new ByteArrayOutputView();
- for(T element : dataSet){
- serializer.serialize(element, outputView);
+
+ OutputViewDataOutputWrapper outWrapper = new OutputViewDataOutputWrapper();
+ outWrapper.setDelegate(out);
+
+ for (T element : dataSet){
+ serializer.serialize(element, outWrapper);
}
-
- byte[] blob = outputView.getByteArray();
- out.writeInt(blob.length);
- out.write(blob);
}
- private void readObject(ObjectInputStream in) throws IOException{
- try{
- Object obj = in.readObject();
-
- if(obj instanceof TypeSerializer<?>){
- serializer = (TypeSerializer<T>)obj;
- }
- }catch(ClassNotFoundException ex){
- throw new IOException(ex);
+ @SuppressWarnings("unchecked")
+ private void readObject(ObjectInputStream in) throws IOException {
+ try {
+ this.serializer = (TypeSerializer<T>) in.readObject();
+ } catch (ClassNotFoundException ex){
+ throw new IOException("Could not load the serializer class.", ex);
}
-
int collectionLength = in.readInt();
List<T> list = new ArrayList<T>(collectionLength);
+
+ InputViewDataInputWrapper inWrapper = new InputViewDataInputWrapper();
+ inWrapper.setDelegate(in);
- int blobLength = in.readInt();
- byte[] blob = new byte[blobLength];
- in.readFully(blob);
-
- DataInputView inputView = new ByteArrayInputView(blob);
-
- for(int i=0; i< collectionLength; i++){
+ for (int i = 0; i < collectionLength; i++){
T element = serializer.createInstance();
- element = serializer.deserialize(element, inputView);
+ element = serializer.deserialize(element, inWrapper);
list.add(element);
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/9ecb6df7/stratosphere-java/src/test/java/eu/stratosphere/api/java/io/CollectionInputFormatTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-java/src/test/java/eu/stratosphere/api/java/io/CollectionInputFormatTest.java b/stratosphere-java/src/test/java/eu/stratosphere/api/java/io/CollectionInputFormatTest.java
index 781e7a8..4388c9c 100644
--- a/stratosphere-java/src/test/java/eu/stratosphere/api/java/io/CollectionInputFormatTest.java
+++ b/stratosphere-java/src/test/java/eu/stratosphere/api/java/io/CollectionInputFormatTest.java
@@ -1,4 +1,4 @@
-/*
+/***********************************************************************************************************************
* Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
@@ -9,7 +9,7 @@
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
- */
+ **********************************************************************************************************************/
package eu.stratosphere.api.java.io;
@@ -17,10 +17,10 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-
import eu.stratosphere.api.java.typeutils.TypeExtractor;
import eu.stratosphere.core.io.GenericInputSplit;
import eu.stratosphere.types.TypeInformation;
+
import org.junit.Test;
import java.io.ByteArrayInputStream;
@@ -67,8 +67,8 @@ public class CollectionInputFormatTest {
inputCollection.add(element2);
inputCollection.add(element3);
- TypeInformation<ElementType> info = (TypeInformation<ElementType>)TypeExtractor.createTypeInfo(ElementType
- .class);
+ @SuppressWarnings("unchecked")
+ TypeInformation<ElementType> info = (TypeInformation<ElementType>) TypeExtractor.createTypeInfo(ElementType.class);
CollectionInputFormat<ElementType> inputFormat = new CollectionInputFormat<ElementType>(inputCollection,
info.createSerializer());
@@ -86,6 +86,7 @@ public class CollectionInputFormatTest {
assertNotNull(serializationResult);
assertTrue(serializationResult instanceof CollectionInputFormat<?>);
+ @SuppressWarnings("unchecked")
CollectionInputFormat<ElementType> result = (CollectionInputFormat<ElementType>) serializationResult;
GenericInputSplit inputSplit = new GenericInputSplit();
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/9ecb6df7/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/plugable/DeserializationDelegate.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/plugable/DeserializationDelegate.java b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/plugable/DeserializationDelegate.java
index a4958d6..00ac913 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/plugable/DeserializationDelegate.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/plugable/DeserializationDelegate.java
@@ -19,7 +19,7 @@ import java.io.IOException;
import eu.stratosphere.api.common.typeutils.TypeSerializer;
import eu.stratosphere.core.io.IOReadableWritable;
-import eu.stratosphere.core.memory.DataInputView;
+import eu.stratosphere.core.memory.InputViewDataInputWrapper;
public class DeserializationDelegate<T> implements IOReadableWritable {
@@ -28,12 +28,12 @@ public class DeserializationDelegate<T> implements IOReadableWritable {
private final TypeSerializer<T> serializer;
- private final InputViewWrapper wrapper;
+ private final InputViewDataInputWrapper wrapper;
public DeserializationDelegate(TypeSerializer<T> serializer) {
this.serializer = serializer;
- this.wrapper = new InputViewWrapper();
+ this.wrapper = new InputViewDataInputWrapper();
}
public void setInstance(T instance) {
@@ -54,100 +54,4 @@ public class DeserializationDelegate<T> implements IOReadableWritable {
this.wrapper.setDelegate(in);
this.instance = this.serializer.deserialize(this.instance, this.wrapper);
}
-
- // --------------------------------------------------------------------------------------------
-
- /**
- * Utility class that wraps a {@link DataInput} as a {@link DataInputView}.
- */
- private static final class InputViewWrapper implements DataInputView {
-
- private DataInput delegate;
-
- public void setDelegate(DataInput delegate) {
- this.delegate = delegate;
- }
-
- @Override
- public void readFully(byte[] b) throws IOException {
- this.delegate.readFully(b);
- }
-
- @Override
- public void readFully(byte[] b, int off, int len) throws IOException {
- this.delegate.readFully(b, off, len);
- }
-
- @Override
- public int skipBytes(int n) throws IOException {
- return this.delegate.skipBytes(n);
- }
-
- @Override
- public boolean readBoolean() throws IOException {
- return this.delegate.readBoolean();
- }
-
- @Override
- public byte readByte() throws IOException {
- return this.delegate.readByte();
- }
-
- @Override
- public int readUnsignedByte() throws IOException {
- return this.delegate.readUnsignedByte();
- }
-
- @Override
- public short readShort() throws IOException {
- return this.delegate.readShort();
- }
-
- @Override
- public int readUnsignedShort() throws IOException {
- return this.delegate.readUnsignedShort();
- }
-
- @Override
- public char readChar() throws IOException {
- return this.delegate.readChar();
- }
-
- @Override
- public int readInt() throws IOException {
- return this.delegate.readInt();
- }
-
- @Override
- public long readLong() throws IOException {
- return this.delegate.readLong();
- }
-
- @Override
- public float readFloat() throws IOException {
- return this.delegate.readFloat();
- }
-
- @Override
- public double readDouble() throws IOException {
- return this.delegate.readDouble();
- }
-
- @Override
- public String readLine() throws IOException {
- return this.delegate.readLine();
- }
-
- @Override
- public String readUTF() throws IOException {
- return this.delegate.readUTF();
- }
-
- @Override
- public void skipBytesToRead(int numBytes) throws IOException {
- for (int i = 0; i < numBytes; i++) {
- this.delegate.readByte();
- }
- }
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/9ecb6df7/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/plugable/SerializationDelegate.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/plugable/SerializationDelegate.java b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/plugable/SerializationDelegate.java
index 1c54f86..7e940cf 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/plugable/SerializationDelegate.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/plugable/SerializationDelegate.java
@@ -19,8 +19,7 @@ import java.io.IOException;
import eu.stratosphere.api.common.typeutils.TypeSerializer;
import eu.stratosphere.core.io.IOReadableWritable;
-import eu.stratosphere.core.memory.DataInputView;
-import eu.stratosphere.core.memory.DataOutputView;
+import eu.stratosphere.core.memory.OutputViewDataOutputWrapper;
public class SerializationDelegate<T> implements IOReadableWritable {
@@ -29,12 +28,12 @@ public class SerializationDelegate<T> implements IOReadableWritable {
private final TypeSerializer<T> serializer;
- private final OutputViewWrapper wrapper;
+ private final OutputViewDataOutputWrapper wrapper;
public SerializationDelegate(TypeSerializer<T> serializer) {
this.serializer = serializer;
- this.wrapper = new OutputViewWrapper();
+ this.wrapper = new OutputViewDataOutputWrapper();
}
public void setInstance(T instance) {
@@ -56,103 +55,4 @@ public class SerializationDelegate<T> implements IOReadableWritable {
public void read(DataInput in) throws IOException {
throw new IllegalStateException("Deserialization method called on SerializationDelegate.");
}
-
- // --------------------------------------------------------------------------------------------
-
- /**
- * Utility class that wraps a {@link DataOutput} as a {@link DataOutputView}.
- */
- private static final class OutputViewWrapper implements DataOutputView {
-
- private DataOutput delegate;
-
- public void setDelegate(DataOutput delegate) {
- this.delegate = delegate;
- }
-
- @Override
- public void write(int b) throws IOException {
- this.delegate.write(b);
- }
-
- @Override
- public void write(byte[] b) throws IOException {
- this.delegate.write(b);
- }
-
- @Override
- public void write(byte[] b, int off, int len) throws IOException {
- this.delegate.write(b, off, len);
- }
-
- @Override
- public void writeBoolean(boolean v) throws IOException {
- this.delegate.writeBoolean(v);
- }
-
- @Override
- public void writeByte(int v) throws IOException {
- this.delegate.writeByte(v);
- }
-
- @Override
- public void writeShort(int v) throws IOException {
- this.delegate.writeShort(v);
- }
-
- @Override
- public void writeChar(int v) throws IOException {
- this.delegate.writeChar(v);
- }
-
- @Override
- public void writeInt(int v) throws IOException {
- this.delegate.writeInt(v);
- }
-
- @Override
- public void writeLong(long v) throws IOException {
- this.delegate.writeLong(v);
- }
-
- @Override
- public void writeFloat(float v) throws IOException {
- this.delegate.writeFloat(v);
- }
-
- @Override
- public void writeDouble(double v) throws IOException {
- this.delegate.writeDouble(v);
- }
-
- @Override
- public void writeBytes(String s) throws IOException {
- this.delegate.writeBytes(s);
- }
-
- @Override
- public void writeChars(String s) throws IOException {
- this.delegate.writeChars(s);
- }
-
- @Override
- public void writeUTF(String s) throws IOException {
- this.delegate.writeUTF(s);
- }
-
- @Override
- public void skipBytesToWrite(int numBytes) throws IOException {
- // skip by writing zeros.
- for (int i = 0; i < numBytes; i++) {
- this.delegate.writeByte(0);
- }
- }
-
- @Override
- public void write(DataInputView source, int numBytes) throws IOException {
- for (int i = 0; i < numBytes; i++) {
- this.delegate.writeByte(source.readByte());
- }
- }
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/9ecb6df7/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/CoGroupConnectedComponentsSecondITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/CoGroupConnectedComponentsSecondITCase.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/CoGroupConnectedComponentsSecondITCase.java
index 1a0f443..103effa 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/CoGroupConnectedComponentsSecondITCase.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/CoGroupConnectedComponentsSecondITCase.java
@@ -32,8 +32,6 @@ import eu.stratosphere.example.java.graph.ConnectedComponents.NeighborWithCompon
import eu.stratosphere.test.testdata.ConnectedComponentsData;
import eu.stratosphere.test.util.JavaProgramTestBase;
import eu.stratosphere.util.Collector;
-import eu.stratosphere.util.LogUtils;
-
@SuppressWarnings("serial")
public class CoGroupConnectedComponentsSecondITCase extends JavaProgramTestBase {
@@ -51,8 +49,6 @@ public class CoGroupConnectedComponentsSecondITCase extends JavaProgramTestBase
// set up execution environment
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- LogUtils.initializeDefaultConsoleLogger();
-
// read vertex and edge data
DataSet<Long> vertices = env.fromElements(ConnectedComponentsData.getEnumeratingVertices(NUM_VERTICES).split("\n"))
.map(new VertexParser());
[4/6] git commit: Transitive closure example in Java API
Posted by se...@apache.org.
Transitive closure example in Java API
This closes #24
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/f3930e3d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/f3930e3d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/f3930e3d
Branch: refs/heads/master
Commit: f3930e3dba63d98f2b0fc7986fb507f9afa8283a
Parents: b2bd469
Author: Kostas Tzoumas <Ko...@gmail.com>
Authored: Fri Jun 6 17:22:30 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Jun 18 18:57:22 2014 +0200
----------------------------------------------------------------------
.../stratosphere-java-examples/pom.xml | 26 +++-
.../java/graph/TransitiveClosureNaive.java | 132 +++++++++++++++++++
.../test/testdata/TransitiveClosureData.java | 44 +++++++
.../TransitiveClosureITCase.java | 54 ++++++++
4 files changed, 255 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f3930e3d/stratosphere-examples/stratosphere-java-examples/pom.xml
----------------------------------------------------------------------
diff --git a/stratosphere-examples/stratosphere-java-examples/pom.xml b/stratosphere-examples/stratosphere-java-examples/pom.xml
index 3fd4c9b..705c46c 100644
--- a/stratosphere-examples/stratosphere-java-examples/pom.xml
+++ b/stratosphere-examples/stratosphere-java-examples/pom.xml
@@ -57,7 +57,31 @@
</includes>
</configuration>
</execution>
-
+
+ <!-- Transitive Closure -->
+ <execution>
+ <id>TransitiveClosure</id>
+ <phase>package</phase>
+ <goals>
+ <goal>jar</goal>
+ </goals>
+ <configuration>
+ <classifier>TransitiveClosure</classifier>
+
+ <archive>
+ <manifestEntries>
+ <program-class>eu.stratosphere.example.java.graph.TransitiveClosureNaive</program-class>
+ </manifestEntries>
+ </archive>
+
+ <includes>
+ <include>**/java/graph/TransitiveClosureNaive.class</include>
+ <include>**/java/graph/TransitiveClosureNaive$*.class</include>
+ <include>**/java/graph/util/ConnectedComponentsData.class</include>
+ </includes>
+ </configuration>
+ </execution>
+
<!-- Connected Components -->
<execution>
<id>ConnectedComponents</id>
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f3930e3d/stratosphere-examples/stratosphere-java-examples/src/main/java/eu/stratosphere/example/java/graph/TransitiveClosureNaive.java
----------------------------------------------------------------------
diff --git a/stratosphere-examples/stratosphere-java-examples/src/main/java/eu/stratosphere/example/java/graph/TransitiveClosureNaive.java b/stratosphere-examples/stratosphere-java-examples/src/main/java/eu/stratosphere/example/java/graph/TransitiveClosureNaive.java
new file mode 100644
index 0000000..3fd1627
--- /dev/null
+++ b/stratosphere-examples/stratosphere-java-examples/src/main/java/eu/stratosphere/example/java/graph/TransitiveClosureNaive.java
@@ -0,0 +1,132 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.example.java.graph;
+
+import eu.stratosphere.api.common.ProgramDescription;
+import eu.stratosphere.api.java.DataSet;
+import eu.stratosphere.api.java.ExecutionEnvironment;
+import eu.stratosphere.api.java.IterativeDataSet;
+import eu.stratosphere.api.java.functions.GroupReduceFunction;
+import eu.stratosphere.api.java.functions.JoinFunction;
+import eu.stratosphere.api.java.tuple.Tuple2;
+import eu.stratosphere.example.java.graph.util.ConnectedComponentsData;
+import eu.stratosphere.util.Collector;
+
+import java.util.Iterator;
+
+
+public class TransitiveClosureNaive implements ProgramDescription {
+
+
+ public static void main (String... args) throws Exception{
+
+ if (!parseParameters(args)) {
+ return;
+ }
+
+ // set up execution environment
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple2<Long, Long>> edges = getEdgeDataSet(env);
+
+ IterativeDataSet<Tuple2<Long,Long>> paths = edges.iterate(maxIterations);
+
+ DataSet<Tuple2<Long,Long>> nextPaths = paths
+ .join(edges)
+ .where(1)
+ .equalTo(0)
+ .with(new JoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>>() {
+ @Override
+ /**
+ left: Path (z,x) - x is reachable by z
+ right: Edge (x,y) - edge x-->y exists
+ out: Path (z,y) - y is reachable by z
+ */
+ public Tuple2<Long, Long> join(Tuple2<Long, Long> left, Tuple2<Long, Long> right) throws Exception {
+ return new Tuple2<Long, Long>(
+ new Long(left.f0),
+ new Long(right.f1));
+ }
+ })
+ .union(paths)
+ .groupBy(0, 1)
+ .reduceGroup(new GroupReduceFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>() {
+ @Override
+ public void reduce(Iterator<Tuple2<Long, Long>> values, Collector<Tuple2<Long, Long>> out) throws Exception {
+ out.collect(values.next());
+ }
+ });
+
+ DataSet<Tuple2<Long, Long>> transitiveClosure = paths.closeWith(nextPaths);
+
+
+ // emit result
+ if (fileOutput) {
+ transitiveClosure.writeAsCsv(outputPath, "\n", " ");
+ } else {
+ transitiveClosure.print();
+ }
+
+ // execute program
+ env.execute("Transitive Closure Example");
+
+ }
+
+ @Override
+ public String getDescription() {
+ return "Parameters: <edges-path> <result-path> <max-number-of-iterations>";
+ }
+
+ // *************************************************************************
+ // UTIL METHODS
+ // *************************************************************************
+
+ private static boolean fileOutput = false;
+ private static String edgesPath = null;
+ private static String outputPath = null;
+ private static int maxIterations = 10;
+
+ private static boolean parseParameters(String[] programArguments) {
+
+ if (programArguments.length > 0) {
+ // parse input arguments
+ fileOutput = true;
+ if (programArguments.length == 3) {
+ edgesPath = programArguments[0];
+ outputPath = programArguments[1];
+ maxIterations = Integer.parseInt(programArguments[2]);
+ } else {
+ System.err.println("Usage: TransitiveClosure <edges path> <result path> <max number of iterations>");
+ return false;
+ }
+ } else {
+ System.out.println("Executing TransitiveClosure example with default parameters and built-in default data.");
+ System.out.println(" Provide parameters to read input data from files.");
+ System.out.println(" See the documentation for the correct format of input files.");
+ System.out.println(" Usage: TransitiveClosure <edges path> <result path> <max number of iterations>");
+ }
+ return true;
+ }
+
+
+ private static DataSet<Tuple2<Long, Long>> getEdgeDataSet(ExecutionEnvironment env) {
+
+ if(fileOutput) {
+ return env.readCsvFile(edgesPath).fieldDelimiter(' ').types(Long.class, Long.class);
+ } else {
+ return ConnectedComponentsData.getDefaultEdgeDataSet(env);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f3930e3d/stratosphere-test-utils/src/main/java/eu/stratosphere/test/testdata/TransitiveClosureData.java
----------------------------------------------------------------------
diff --git a/stratosphere-test-utils/src/main/java/eu/stratosphere/test/testdata/TransitiveClosureData.java b/stratosphere-test-utils/src/main/java/eu/stratosphere/test/testdata/TransitiveClosureData.java
new file mode 100644
index 0000000..f39216a
--- /dev/null
+++ b/stratosphere-test-utils/src/main/java/eu/stratosphere/test/testdata/TransitiveClosureData.java
@@ -0,0 +1,44 @@
+/***********************************************************************************************************************
+ *
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ *
+ **********************************************************************************************************************/
+
+package eu.stratosphere.test.testdata;
+
+import org.junit.Assert;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.util.regex.Pattern;
+
+public class TransitiveClosureData {
+
+ public static void checkOddEvenResult(BufferedReader result) throws IOException {
+ Pattern split = Pattern.compile(" ");
+ String line;
+ while ((line = result.readLine()) != null) {
+ String[] res = split.split(line);
+ Assert.assertEquals("Malformed result: Wrong number of tokens in line.", 2, res.length);
+ try {
+ int from = Integer.parseInt(res[0]);
+ int to = Integer.parseInt(res[1]);
+
+ Assert.assertEquals("Vertex should not be reachable.", from % 2, to % 2);
+ } catch (NumberFormatException e) {
+ Assert.fail("Malformed result.");
+ }
+ }
+ }
+
+ private TransitiveClosureData() {}
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f3930e3d/stratosphere-tests/src/test/java/eu/stratosphere/test/exampleJavaPrograms/TransitiveClosureITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/exampleJavaPrograms/TransitiveClosureITCase.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/exampleJavaPrograms/TransitiveClosureITCase.java
new file mode 100644
index 0000000..96761c8
--- /dev/null
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/exampleJavaPrograms/TransitiveClosureITCase.java
@@ -0,0 +1,54 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.test.exampleJavaPrograms;
+
+
+import eu.stratosphere.example.java.graph.TransitiveClosureNaive;
+import eu.stratosphere.test.testdata.ConnectedComponentsData;
+import eu.stratosphere.test.testdata.TransitiveClosureData;
+import eu.stratosphere.test.util.JavaProgramTestBase;
+
+import java.io.BufferedReader;
+
+public class TransitiveClosureITCase extends JavaProgramTestBase {
+
+ private static final long SEED = 0xBADC0FFEEBEEFL;
+
+ private static final int NUM_VERTICES = 1000;
+
+ private static final int NUM_EDGES = 10000;
+
+ private String edgesPath;
+ private String resultPath;
+
+
+ @Override
+ protected void preSubmit() throws Exception {
+ edgesPath = createTempFile("edges.txt", ConnectedComponentsData.getRandomOddEvenEdges(NUM_EDGES, NUM_VERTICES, SEED));
+ resultPath = getTempFilePath("results");
+ }
+
+ @Override
+ protected void testProgram() throws Exception {
+ TransitiveClosureNaive.main(edgesPath, resultPath, "100");
+ }
+
+ @Override
+ protected void postSubmit() throws Exception {
+ for (BufferedReader reader : getResultReader(resultPath)) {
+ TransitiveClosureData.checkOddEvenResult(reader);
+ }
+ }
+}
+