You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/06/18 20:52:27 UTC

[1/6] git commit: [FLINK-950] Fix spurious file deletion failures.

Repository: incubator-flink
Updated Branches:
  refs/heads/master 3a452e5bb -> 9ecb6df76


[FLINK-950] Fix spurious file deletion failures.


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/5c23b8fc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/5c23b8fc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/5c23b8fc

Branch: refs/heads/master
Commit: 5c23b8fc332f8f92fda36233b45dfa88fb86ede6
Parents: 3a452e5
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Jun 18 18:46:21 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Jun 18 18:46:21 2014 +0200

----------------------------------------------------------------------
 .../eu/stratosphere/core/fs/FileSystem.java     | 32 ++++++++++++++------
 1 file changed, 23 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/5c23b8fc/stratosphere-core/src/main/java/eu/stratosphere/core/fs/FileSystem.java
----------------------------------------------------------------------
diff --git a/stratosphere-core/src/main/java/eu/stratosphere/core/fs/FileSystem.java b/stratosphere-core/src/main/java/eu/stratosphere/core/fs/FileSystem.java
index 007a74f..11c7007 100644
--- a/stratosphere-core/src/main/java/eu/stratosphere/core/fs/FileSystem.java
+++ b/stratosphere-core/src/main/java/eu/stratosphere/core/fs/FileSystem.java
@@ -446,12 +446,20 @@ public abstract class FileSystem {
 			return false;
 		}
 		
+		FileStatus status = null;
+		try {
+			status = getFileStatus(outPath);
+		}
+		catch (FileNotFoundException e) {
+			// okay, the file is not there
+		}
+		
 		// check if path exists
-		if(this.exists(outPath)) {
+		if (status != null) {
 			// path exists, check write mode
-			switch(writeMode) {
+			switch (writeMode) {
 			case NO_OVERWRITE:
-				if(this.getFileStatus(outPath).isDir()) {
+				if (status.isDir()) {
 					return true;
 				} else {
 					// file may not be overwritten
@@ -460,8 +468,8 @@ public abstract class FileSystem {
 							" mode to overwrite existing files and directories.");
 				}
 			case OVERWRITE:
-				if(this.getFileStatus(outPath).isDir()) {
-					if(createDirectory) {
+				if (status.isDir()) {
+					if (createDirectory) {
 						// directory exists and does not need to be created
 						return true;
 					} else {
@@ -469,7 +477,9 @@ public abstract class FileSystem {
 						try {
 							this.delete(outPath, true);
 						} catch(IOException ioe) {
-							throw new IOException("Could not prepare output path. ",ioe);
+							// due to races in some file systems, it may spuriously occur that a deleted the file looks
+							// as if it still exists and is gone a millisecond later, once the change is committed
+							// we ignore the exception
 						}
 					}
 				} else {
@@ -488,7 +498,7 @@ public abstract class FileSystem {
 			}
 		}
 		
-		if(createDirectory) {
+		if (createDirectory) {
 			// Output directory needs to be created
 			try {
 				if(!this.exists(outPath)) {
@@ -501,9 +511,13 @@ public abstract class FileSystem {
 			}
 	
 			// double check that the output directory exists
-			return this.exists(outPath) && this.getFileStatus(outPath).isDir();
+			try {
+				FileStatus check = getFileStatus(outPath);
+				return check.isDir();
+			} catch (FileNotFoundException e) {
+				return false;
+			}
 		} else {
-			
 			// check that the output path does not exist and an output file can be created by the output format.
 			return !this.exists(outPath);
 		}


[5/6] git commit: [FLINK-944] CollectionInputFormat now uses the TypeSerializer to serialize the collection entries. This allows to use objects not implementing the Serializable interface as collection elements.

Posted by se...@apache.org.
[FLINK-944] CollectionInputFormat now uses the TypeSerializer to serialize the collection entries. This allows to use objects not implementing the Serializable interface as collection elements.

This closes #25


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/5fa5e502
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/5fa5e502
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/5fa5e502

Branch: refs/heads/master
Commit: 5fa5e50205afb41ebc39f197da07e87c0fdd0334
Parents: f3930e3
Author: Till Rohrmann <ti...@gmail.com>
Authored: Tue Jun 17 19:21:02 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Jun 18 19:03:18 2014 +0200

----------------------------------------------------------------------
 .../api/common/io/ByteArrayInputView.java       | 114 +++++++++++++++++
 .../api/common/io/ByteArrayOutputView.java      | 126 +++++++++++++++++++
 .../api/java/ExecutionEnvironment.java          |   2 +-
 .../api/java/io/CollectionInputFormat.java      |  68 ++++++++--
 .../api/java/io/CollectionInputFormatTest.java  | 107 ++++++++++++++++
 5 files changed, 407 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/5fa5e502/stratosphere-core/src/main/java/eu/stratosphere/api/common/io/ByteArrayInputView.java
----------------------------------------------------------------------
diff --git a/stratosphere-core/src/main/java/eu/stratosphere/api/common/io/ByteArrayInputView.java b/stratosphere-core/src/main/java/eu/stratosphere/api/common/io/ByteArrayInputView.java
new file mode 100644
index 0000000..7e65f13
--- /dev/null
+++ b/stratosphere-core/src/main/java/eu/stratosphere/api/common/io/ByteArrayInputView.java
@@ -0,0 +1,114 @@
+/*
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package eu.stratosphere.api.common.io;
+
+import eu.stratosphere.core.memory.DataInputView;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+
+/**
+ * Wrapper to use ByteArrayInputStream with TypeSerializers
+ */
+public class ByteArrayInputView implements DataInputView{
+
+	private final ByteArrayInputStream byteArrayInputStream;
+	private final DataInputStream inputStream;
+
+	public ByteArrayInputView(byte[] buffer){
+		byteArrayInputStream = new ByteArrayInputStream(buffer);
+		inputStream = new DataInputStream(byteArrayInputStream);
+	}
+
+	@Override
+	public void skipBytesToRead(int numBytes) throws IOException {
+		inputStream.skipBytes(numBytes);
+	}
+
+	@Override
+	public void readFully(byte[] b) throws IOException {
+		inputStream.readFully(b);
+	}
+
+	@Override
+	public void readFully(byte[] b, int off, int len) throws IOException {
+		inputStream.readFully(b, off, len);
+	}
+
+	@Override
+	public int skipBytes(int n) throws IOException {
+		return inputStream.skipBytes(n);
+	}
+
+	@Override
+	public boolean readBoolean() throws IOException {
+		return inputStream.readBoolean();
+	}
+
+	@Override
+	public byte readByte() throws IOException {
+		return inputStream.readByte();
+	}
+
+	@Override
+	public int readUnsignedByte() throws IOException {
+		return inputStream.readUnsignedByte();
+	}
+
+	@Override
+	public short readShort() throws IOException {
+		return inputStream.readShort();
+	}
+
+	@Override
+	public int readUnsignedShort() throws IOException {
+		return inputStream.readUnsignedShort();
+	}
+
+	@Override
+	public char readChar() throws IOException {
+		return inputStream.readChar();
+	}
+
+	@Override
+	public int readInt() throws IOException {
+		return inputStream.readInt();
+	}
+
+	@Override
+	public long readLong() throws IOException {
+		return inputStream.readLong();
+	}
+
+	@Override
+	public float readFloat() throws IOException {
+		return inputStream.readFloat();
+	}
+
+	@Override
+	public double readDouble() throws IOException {
+		return inputStream.readDouble();
+	}
+
+	@Override
+	public String readLine() throws IOException {
+		return inputStream.readLine();
+	}
+
+	@Override
+	public String readUTF() throws IOException {
+		return inputStream.readUTF();
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/5fa5e502/stratosphere-core/src/main/java/eu/stratosphere/api/common/io/ByteArrayOutputView.java
----------------------------------------------------------------------
diff --git a/stratosphere-core/src/main/java/eu/stratosphere/api/common/io/ByteArrayOutputView.java b/stratosphere-core/src/main/java/eu/stratosphere/api/common/io/ByteArrayOutputView.java
new file mode 100644
index 0000000..b96338f
--- /dev/null
+++ b/stratosphere-core/src/main/java/eu/stratosphere/api/common/io/ByteArrayOutputView.java
@@ -0,0 +1,126 @@
+/*
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package eu.stratosphere.api.common.io;
+
+import eu.stratosphere.core.memory.DataInputView;
+import eu.stratosphere.core.memory.DataOutputView;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+/**
+ * Wrapper class to use ByteArrayOutputStream with TypeSerializers.
+ */
+public class ByteArrayOutputView implements DataOutputView {
+	private final ByteArrayOutputStream byteOutputStream;
+	private final DataOutputStream outputStream;
+
+	public ByteArrayOutputView(){
+		byteOutputStream = new ByteArrayOutputStream();
+		outputStream = new DataOutputStream(byteOutputStream);
+	}
+
+	public byte[] getByteArray(){
+		return byteOutputStream.toByteArray();
+	}
+
+	public void reset() {
+		byteOutputStream.reset();
+	}
+
+	@Override
+	public void skipBytesToWrite(int numBytes) throws IOException {
+		for(int i=0; i<numBytes; i++){
+			writeByte(0);
+		}
+	}
+
+	@Override
+	public void write(DataInputView source, int numBytes) throws IOException {
+		byte[] buffer = new byte[numBytes];
+		source.readFully(buffer);
+		outputStream.write(buffer);
+	}
+
+	@Override
+	public void write(int b) throws IOException {
+		outputStream.write(b);
+	}
+
+	@Override
+	public void write(byte[] b) throws IOException {
+		outputStream.write(b);
+	}
+
+	@Override
+	public void write(byte[] b, int off, int len) throws IOException {
+		outputStream.write(b, off, len);
+	}
+
+	@Override
+	public void writeBoolean(boolean v) throws IOException {
+		outputStream.writeBoolean(v);
+	}
+
+	@Override
+	public void writeByte(int v) throws IOException {
+		outputStream.writeByte(v);
+	}
+
+	@Override
+	public void writeShort(int v) throws IOException {
+		outputStream.writeShort(v);
+	}
+
+	@Override
+	public void writeChar(int v) throws IOException {
+		outputStream.writeChar(v);
+	}
+
+	@Override
+	public void writeInt(int v) throws IOException {
+		outputStream.writeInt(v);
+	}
+
+	@Override
+	public void writeLong(long v) throws IOException {
+		outputStream.writeLong(v);
+	}
+
+	@Override
+	public void writeFloat(float v) throws IOException {
+		outputStream.writeFloat(v);
+	}
+
+	@Override
+	public void writeDouble(double v) throws IOException {
+		outputStream.writeDouble(v);
+	}
+
+	@Override
+	public void writeBytes(String s) throws IOException {
+		outputStream.writeBytes(s);
+	}
+
+	@Override
+	public void writeChars(String s) throws IOException {
+		outputStream.writeChars(s);
+	}
+
+	@Override
+	public void writeUTF(String s) throws IOException {
+		outputStream.writeUTF(s);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/5fa5e502/stratosphere-java/src/main/java/eu/stratosphere/api/java/ExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/stratosphere-java/src/main/java/eu/stratosphere/api/java/ExecutionEnvironment.java b/stratosphere-java/src/main/java/eu/stratosphere/api/java/ExecutionEnvironment.java
index 2f7aef3..1340477 100644
--- a/stratosphere-java/src/main/java/eu/stratosphere/api/java/ExecutionEnvironment.java
+++ b/stratosphere-java/src/main/java/eu/stratosphere/api/java/ExecutionEnvironment.java
@@ -362,7 +362,7 @@ public abstract class ExecutionEnvironment {
 	public <X> DataSource<X> fromCollection(Collection<X> data, TypeInformation<X> type) {
 		CollectionInputFormat.checkCollection(data, type.getTypeClass());
 		
-		return new DataSource<X>(this, new CollectionInputFormat<X>(data), type);
+		return new DataSource<X>(this, new CollectionInputFormat<X>(data, type.createSerializer()), type);
 	}
 	
 	/**

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/5fa5e502/stratosphere-java/src/main/java/eu/stratosphere/api/java/io/CollectionInputFormat.java
----------------------------------------------------------------------
diff --git a/stratosphere-java/src/main/java/eu/stratosphere/api/java/io/CollectionInputFormat.java b/stratosphere-java/src/main/java/eu/stratosphere/api/java/io/CollectionInputFormat.java
index 8d051af..5e513b2 100644
--- a/stratosphere-java/src/main/java/eu/stratosphere/api/java/io/CollectionInputFormat.java
+++ b/stratosphere-java/src/main/java/eu/stratosphere/api/java/io/CollectionInputFormat.java
@@ -16,14 +16,20 @@
 package eu.stratosphere.api.java.io;
 
 import java.io.IOException;
-import java.io.Serializable;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Iterator;
+import java.util.List;
 
-import eu.stratosphere.api.common.InvalidProgramException;
+import eu.stratosphere.api.common.io.ByteArrayInputView;
+import eu.stratosphere.api.common.io.ByteArrayOutputView;
 import eu.stratosphere.api.common.io.GenericInputFormat;
 import eu.stratosphere.api.common.io.NonParallelInput;
+import eu.stratosphere.api.common.typeutils.TypeSerializer;
 import eu.stratosphere.core.io.GenericInputSplit;
+import eu.stratosphere.core.memory.DataInputView;
 
 /**
  * An input format that returns objects from a collection.
@@ -32,16 +38,19 @@ public class CollectionInputFormat<T> extends GenericInputFormat<T> implements N
 
 	private static final long serialVersionUID = 1L;
 
-	private final Collection<T> dataSet; // input data as collection
+	private Collection<T> dataSet; // input data as collection
+
+	private TypeSerializer<T> serializer;
 
 	private transient Iterator<T> iterator;
-	
 
 	
-	public CollectionInputFormat(Collection<T> dataSet) {
+	public CollectionInputFormat(Collection<T> dataSet, TypeSerializer<T> serializer) {
 		if (dataSet == null) {
 			throw new NullPointerException();
 		}
+
+		this.serializer = serializer;
 		
 		this.dataSet = dataSet;
 	}
@@ -63,6 +72,51 @@ public class CollectionInputFormat<T> extends GenericInputFormat<T> implements N
 	public T nextRecord(T record) throws IOException {
 		return this.iterator.next();
 	}
+
+	// --------------------------------------------------------------------------------------------
+
+	private void writeObject(ObjectOutputStream out) throws IOException{
+		out.writeObject(serializer);
+		out.writeInt(dataSet.size());
+		ByteArrayOutputView outputView = new ByteArrayOutputView();
+		for(T element : dataSet){
+			serializer.serialize(element, outputView);
+		}
+
+		byte[] blob = outputView.getByteArray();
+		out.writeInt(blob.length);
+		out.write(blob);
+	}
+
+	private void readObject(ObjectInputStream in) throws IOException{
+		try{
+			Object obj = in.readObject();
+
+			if(obj instanceof TypeSerializer<?>){
+				serializer = (TypeSerializer<T>)obj;
+			}
+		}catch(ClassNotFoundException ex){
+			throw new IOException(ex);
+		}
+
+
+		int collectionLength = in.readInt();
+		List<T> list = new ArrayList<T>(collectionLength);
+
+		int blobLength = in.readInt();
+		byte[] blob = new byte[blobLength];
+		in.readFully(blob);
+
+		DataInputView inputView = new ByteArrayInputView(blob);
+
+		for(int i=0; i< collectionLength; i++){
+			T element = serializer.createInstance();
+			element = serializer.deserialize(element, inputView);
+			list.add(element);
+		}
+
+		dataSet = list;
+	}
 	
 	// --------------------------------------------------------------------------------------------
 	
@@ -78,10 +132,6 @@ public class CollectionInputFormat<T> extends GenericInputFormat<T> implements N
 			throw new NullPointerException();
 		}
 		
-		if (!Serializable.class.isAssignableFrom(viewedAs)) {
-			throw new InvalidProgramException("The elements are not serializable (java.io.Serializable).");
-		}
-		
 		for (X elem : elements) {
 			if (elem == null) {
 				throw new IllegalArgumentException("The collection must not contain null elements.");

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/5fa5e502/stratosphere-java/src/test/java/eu/stratosphere/api/java/io/CollectionInputFormatTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-java/src/test/java/eu/stratosphere/api/java/io/CollectionInputFormatTest.java b/stratosphere-java/src/test/java/eu/stratosphere/api/java/io/CollectionInputFormatTest.java
new file mode 100644
index 0000000..781e7a8
--- /dev/null
+++ b/stratosphere-java/src/test/java/eu/stratosphere/api/java/io/CollectionInputFormatTest.java
@@ -0,0 +1,107 @@
+/*
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package eu.stratosphere.api.java.io;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import eu.stratosphere.api.java.typeutils.TypeExtractor;
+import eu.stratosphere.core.io.GenericInputSplit;
+import eu.stratosphere.types.TypeInformation;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+
+public class CollectionInputFormatTest {
+	public static class ElementType{
+		private int id;
+
+		public ElementType(){
+			this(-1);
+		}
+
+		public ElementType(int id){
+			this.id = id;
+		}
+
+		public int getId(){return id;}
+
+		@Override
+		public boolean equals(Object obj){
+			if(obj != null && obj instanceof ElementType){
+				ElementType et = (ElementType) obj;
+
+				return et.getId() == this.getId();
+			}else {
+				return false;
+			}
+		}
+	}
+
+	@Test
+	public void testSerializability(){
+		Collection<ElementType> inputCollection = new ArrayList<ElementType>();
+		ElementType element1 = new ElementType(1);
+		ElementType element2 = new ElementType(2);
+		ElementType element3 = new ElementType(3);
+		inputCollection.add(element1);
+		inputCollection.add(element2);
+		inputCollection.add(element3);
+
+		TypeInformation<ElementType> info = (TypeInformation<ElementType>)TypeExtractor.createTypeInfo(ElementType
+				.class);
+
+		CollectionInputFormat<ElementType> inputFormat = new CollectionInputFormat<ElementType>(inputCollection,
+				info.createSerializer());
+
+		try{
+			ByteArrayOutputStream buffer = new ByteArrayOutputStream();
+			ObjectOutputStream out = new ObjectOutputStream(buffer);
+
+			out.writeObject(inputFormat);
+
+			ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(buffer.toByteArray()));
+
+			Object serializationResult = in.readObject();
+
+			assertNotNull(serializationResult);
+			assertTrue(serializationResult instanceof CollectionInputFormat<?>);
+
+			CollectionInputFormat<ElementType> result = (CollectionInputFormat<ElementType>) serializationResult;
+
+			GenericInputSplit inputSplit = new GenericInputSplit();
+			inputFormat.open(inputSplit);
+			result.open(inputSplit);
+
+			while(!inputFormat.reachedEnd() && !result.reachedEnd()){
+				ElementType expectedElement = inputFormat.nextRecord(null);
+				ElementType actualElement = result.nextRecord(null);
+
+				assertEquals(expectedElement, actualElement);
+			}
+		}catch(IOException ex){
+			fail(ex.toString());
+		}catch(ClassNotFoundException ex){
+			fail(ex.toString());
+		}
+	}
+}


[3/6] git commit: [FLINK-943] Remove leading and ending double quotes from 'env.java.opts' config value in startup scripts

Posted by se...@apache.org.
[FLINK-943] Remove leading and ending double quotes from 'env.java.opts' config value in startup scripts

This closes #26


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/b2bd4697
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/b2bd4697
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/b2bd4697

Branch: refs/heads/master
Commit: b2bd4697af1984a00af3ded443b93528cf146769
Parents: 6ee874a
Author: uce <u....@fu-berlin.de>
Authored: Wed Jun 18 00:48:15 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Jun 18 18:54:10 2014 +0200

----------------------------------------------------------------------
 stratosphere-dist/src/main/stratosphere-bin/bin/config.sh      | 3 +++
 stratosphere-dist/src/main/stratosphere-bin/bin/jobmanager.sh  | 2 +-
 stratosphere-dist/src/main/stratosphere-bin/bin/taskmanager.sh | 2 +-
 3 files changed, 5 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b2bd4697/stratosphere-dist/src/main/stratosphere-bin/bin/config.sh
----------------------------------------------------------------------
diff --git a/stratosphere-dist/src/main/stratosphere-bin/bin/config.sh b/stratosphere-dist/src/main/stratosphere-bin/bin/config.sh
index 9168813..64f8ea7 100755
--- a/stratosphere-dist/src/main/stratosphere-bin/bin/config.sh
+++ b/stratosphere-dist/src/main/stratosphere-bin/bin/config.sh
@@ -169,6 +169,9 @@ fi
 
 if [ -z "${STRATOSPHERE_ENV_JAVA_OPTS}" ]; then
     STRATOSPHERE_ENV_JAVA_OPTS=$(readFromConfig ${KEY_ENV_JAVA_OPTS} "${DEFAULT_ENV_JAVA_OPTS}" "${YAML_CONF}")
+
+    # Remove leading and ending double quotes (if present) of value
+    STRATOSPHERE_ENV_JAVA_OPTS="$( echo "${STRATOSPHERE_ENV_JAVA_OPTS}" | sed -e 's/^"//'  -e 's/"$//' )"
 fi
 
 if [ -z "${STRATOSPHERE_SSH_OPTS}" ]; then

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b2bd4697/stratosphere-dist/src/main/stratosphere-bin/bin/jobmanager.sh
----------------------------------------------------------------------
diff --git a/stratosphere-dist/src/main/stratosphere-bin/bin/jobmanager.sh b/stratosphere-dist/src/main/stratosphere-bin/bin/jobmanager.sh
index e474e5e..a2ef1be 100755
--- a/stratosphere-dist/src/main/stratosphere-bin/bin/jobmanager.sh
+++ b/stratosphere-dist/src/main/stratosphere-bin/bin/jobmanager.sh
@@ -72,7 +72,7 @@ case $STARTSTOP in
         rotateLogFile $out
 
         echo Starting job manager
-        $JAVA_RUN $JVM_ARGS $STRATOSPHERE_ENV_JAVA_OPTS "${log_setting[@]}" -classpath "$STRATOSPHERE_JM_CLASSPATH" eu.stratosphere.nephele.jobmanager.JobManager -executionMode $EXECUTIONMODE -configDir "$STRATOSPHERE_CONF_DIR"  > "$out" 2>&1 < /dev/null &
+        $JAVA_RUN $JVM_ARGS ${STRATOSPHERE_ENV_JAVA_OPTS} "${log_setting[@]}" -classpath "$STRATOSPHERE_JM_CLASSPATH" eu.stratosphere.nephele.jobmanager.JobManager -executionMode $EXECUTIONMODE -configDir "$STRATOSPHERE_CONF_DIR"  > "$out" 2>&1 < /dev/null &
         echo $! > $pid
     ;;
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b2bd4697/stratosphere-dist/src/main/stratosphere-bin/bin/taskmanager.sh
----------------------------------------------------------------------
diff --git a/stratosphere-dist/src/main/stratosphere-bin/bin/taskmanager.sh b/stratosphere-dist/src/main/stratosphere-bin/bin/taskmanager.sh
index ea6e468..aa92b6e 100755
--- a/stratosphere-dist/src/main/stratosphere-bin/bin/taskmanager.sh
+++ b/stratosphere-dist/src/main/stratosphere-bin/bin/taskmanager.sh
@@ -69,7 +69,7 @@ case $STARTSTOP in
         rotateLogFile $out
 
         echo Starting task manager on host $HOSTNAME
-        $JAVA_RUN $JVM_ARGS $STRATOSPHERE_ENV_JAVA_OPTS "${log_setting[@]}" -classpath "$STRATOSPHERE_TM_CLASSPATH" eu.stratosphere.nephele.taskmanager.TaskManager -configDir "$STRATOSPHERE_CONF_DIR" > "$out" 2>&1 < /dev/null &
+        $JAVA_RUN $JVM_ARGS ${STRATOSPHERE_ENV_JAVA_OPTS} "${log_setting[@]}" -classpath "$STRATOSPHERE_TM_CLASSPATH" eu.stratosphere.nephele.taskmanager.TaskManager -configDir "$STRATOSPHERE_CONF_DIR" > "$out" 2>&1 < /dev/null &
         echo $! > $pid
     ;;
 


[2/6] git commit: Projection constructor adjusts to max tuple size

Posted by se...@apache.org.
Projection constructor adjusts to max tuple size

This closes #27


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/6ee874aa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/6ee874aa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/6ee874aa

Branch: refs/heads/master
Commit: 6ee874aa05e0eeda133bcc6100cb2330e1d27154
Parents: 5c23b8f
Author: zentol <s....@web.de>
Authored: Wed Jun 18 14:06:12 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Jun 18 18:51:01 2014 +0200

----------------------------------------------------------------------
 .../eu/stratosphere/api/java/operators/ProjectOperator.java     | 5 +++--
 .../stratosphere/api/java/operator/ProjectionOperatorTest.java  | 2 +-
 2 files changed, 4 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6ee874aa/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/ProjectOperator.java
----------------------------------------------------------------------
diff --git a/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/ProjectOperator.java b/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/ProjectOperator.java
index 31583e9..1907393 100644
--- a/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/ProjectOperator.java
+++ b/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/ProjectOperator.java
@@ -72,8 +72,9 @@ public class ProjectOperator<IN, OUT extends Tuple>
 			
 			if(fieldIndexes.length == 0) {
 				throw new IllegalArgumentException("project() needs to select at least one (1) field.");
-			} else if(fieldIndexes.length > 22) {
-				throw new IllegalArgumentException("project() may select only up to twenty-two (22) fields.");
+			} else if(fieldIndexes.length > Tuple.MAX_ARITY - 1) {
+				throw new IllegalArgumentException(
+						"project() may select only up to (" + (Tuple.MAX_ARITY - 1) + ") fields.");
 			}
 			
 			int maxFieldIndex = ((TupleTypeInfo<?>)ds.getType()).getArity();

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6ee874aa/stratosphere-java/src/test/java/eu/stratosphere/api/java/operator/ProjectionOperatorTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-java/src/test/java/eu/stratosphere/api/java/operator/ProjectionOperatorTest.java b/stratosphere-java/src/test/java/eu/stratosphere/api/java/operator/ProjectionOperatorTest.java
index a5ac562..ac496c2 100644
--- a/stratosphere-java/src/test/java/eu/stratosphere/api/java/operator/ProjectionOperatorTest.java
+++ b/stratosphere-java/src/test/java/eu/stratosphere/api/java/operator/ProjectionOperatorTest.java
@@ -62,7 +62,7 @@ public class ProjectionOperatorTest {
 		
 		// should not work: too many fields
 		try {
-			tupleDs.project(0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22);
+			tupleDs.project(0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25);
 			Assert.fail();
 		} catch(IllegalArgumentException iae) {
 			// we're good here


[6/6] git commit: [FLINK-944] Consolidated serialization logic between different classes. Fixes for warnings.

Posted by se...@apache.org.
[FLINK-944] Consolidated serialization logic between different classes. Fixes for warnings.


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/9ecb6df7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/9ecb6df7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/9ecb6df7

Branch: refs/heads/master
Commit: 9ecb6df76ca5e7a42d0d62b1c9dca41e9e7ecd1b
Parents: 5fa5e50
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Jun 18 19:21:27 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Jun 18 20:51:52 2014 +0200

----------------------------------------------------------------------
 .../api/common/io/ByteArrayInputView.java       | 114 -----------------
 .../api/common/io/ByteArrayOutputView.java      | 126 -------------------
 .../core/memory/InputViewDataInputWrapper.java  | 113 +++++++++++++++++
 .../memory/OutputViewDataOutputWrapper.java     | 116 +++++++++++++++++
 .../java/graph/TransitiveClosureNaive.java      |   2 +-
 .../api/java/io/CollectionInputFormat.java      |  49 +++-----
 .../api/java/io/CollectionInputFormatTest.java  |  11 +-
 .../plugable/DeserializationDelegate.java       | 102 +--------------
 .../runtime/plugable/SerializationDelegate.java | 106 +---------------
 .../CoGroupConnectedComponentsSecondITCase.java |   4 -
 10 files changed, 262 insertions(+), 481 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/9ecb6df7/stratosphere-core/src/main/java/eu/stratosphere/api/common/io/ByteArrayInputView.java
----------------------------------------------------------------------
diff --git a/stratosphere-core/src/main/java/eu/stratosphere/api/common/io/ByteArrayInputView.java b/stratosphere-core/src/main/java/eu/stratosphere/api/common/io/ByteArrayInputView.java
deleted file mode 100644
index 7e65f13..0000000
--- a/stratosphere-core/src/main/java/eu/stratosphere/api/common/io/ByteArrayInputView.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-
-package eu.stratosphere.api.common.io;
-
-import eu.stratosphere.core.memory.DataInputView;
-
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.io.IOException;
-
-/**
- * Wrapper to use ByteArrayInputStream with TypeSerializers
- */
-public class ByteArrayInputView implements DataInputView{
-
-	private final ByteArrayInputStream byteArrayInputStream;
-	private final DataInputStream inputStream;
-
-	public ByteArrayInputView(byte[] buffer){
-		byteArrayInputStream = new ByteArrayInputStream(buffer);
-		inputStream = new DataInputStream(byteArrayInputStream);
-	}
-
-	@Override
-	public void skipBytesToRead(int numBytes) throws IOException {
-		inputStream.skipBytes(numBytes);
-	}
-
-	@Override
-	public void readFully(byte[] b) throws IOException {
-		inputStream.readFully(b);
-	}
-
-	@Override
-	public void readFully(byte[] b, int off, int len) throws IOException {
-		inputStream.readFully(b, off, len);
-	}
-
-	@Override
-	public int skipBytes(int n) throws IOException {
-		return inputStream.skipBytes(n);
-	}
-
-	@Override
-	public boolean readBoolean() throws IOException {
-		return inputStream.readBoolean();
-	}
-
-	@Override
-	public byte readByte() throws IOException {
-		return inputStream.readByte();
-	}
-
-	@Override
-	public int readUnsignedByte() throws IOException {
-		return inputStream.readUnsignedByte();
-	}
-
-	@Override
-	public short readShort() throws IOException {
-		return inputStream.readShort();
-	}
-
-	@Override
-	public int readUnsignedShort() throws IOException {
-		return inputStream.readUnsignedShort();
-	}
-
-	@Override
-	public char readChar() throws IOException {
-		return inputStream.readChar();
-	}
-
-	@Override
-	public int readInt() throws IOException {
-		return inputStream.readInt();
-	}
-
-	@Override
-	public long readLong() throws IOException {
-		return inputStream.readLong();
-	}
-
-	@Override
-	public float readFloat() throws IOException {
-		return inputStream.readFloat();
-	}
-
-	@Override
-	public double readDouble() throws IOException {
-		return inputStream.readDouble();
-	}
-
-	@Override
-	public String readLine() throws IOException {
-		return inputStream.readLine();
-	}
-
-	@Override
-	public String readUTF() throws IOException {
-		return inputStream.readUTF();
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/9ecb6df7/stratosphere-core/src/main/java/eu/stratosphere/api/common/io/ByteArrayOutputView.java
----------------------------------------------------------------------
diff --git a/stratosphere-core/src/main/java/eu/stratosphere/api/common/io/ByteArrayOutputView.java b/stratosphere-core/src/main/java/eu/stratosphere/api/common/io/ByteArrayOutputView.java
deleted file mode 100644
index b96338f..0000000
--- a/stratosphere-core/src/main/java/eu/stratosphere/api/common/io/ByteArrayOutputView.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-
-package eu.stratosphere.api.common.io;
-
-import eu.stratosphere.core.memory.DataInputView;
-import eu.stratosphere.core.memory.DataOutputView;
-
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-
-/**
- * Wrapper class to use ByteArrayOutputStream with TypeSerializers.
- */
-public class ByteArrayOutputView implements DataOutputView {
-	private final ByteArrayOutputStream byteOutputStream;
-	private final DataOutputStream outputStream;
-
-	public ByteArrayOutputView(){
-		byteOutputStream = new ByteArrayOutputStream();
-		outputStream = new DataOutputStream(byteOutputStream);
-	}
-
-	public byte[] getByteArray(){
-		return byteOutputStream.toByteArray();
-	}
-
-	public void reset() {
-		byteOutputStream.reset();
-	}
-
-	@Override
-	public void skipBytesToWrite(int numBytes) throws IOException {
-		for(int i=0; i<numBytes; i++){
-			writeByte(0);
-		}
-	}
-
-	@Override
-	public void write(DataInputView source, int numBytes) throws IOException {
-		byte[] buffer = new byte[numBytes];
-		source.readFully(buffer);
-		outputStream.write(buffer);
-	}
-
-	@Override
-	public void write(int b) throws IOException {
-		outputStream.write(b);
-	}
-
-	@Override
-	public void write(byte[] b) throws IOException {
-		outputStream.write(b);
-	}
-
-	@Override
-	public void write(byte[] b, int off, int len) throws IOException {
-		outputStream.write(b, off, len);
-	}
-
-	@Override
-	public void writeBoolean(boolean v) throws IOException {
-		outputStream.writeBoolean(v);
-	}
-
-	@Override
-	public void writeByte(int v) throws IOException {
-		outputStream.writeByte(v);
-	}
-
-	@Override
-	public void writeShort(int v) throws IOException {
-		outputStream.writeShort(v);
-	}
-
-	@Override
-	public void writeChar(int v) throws IOException {
-		outputStream.writeChar(v);
-	}
-
-	@Override
-	public void writeInt(int v) throws IOException {
-		outputStream.writeInt(v);
-	}
-
-	@Override
-	public void writeLong(long v) throws IOException {
-		outputStream.writeLong(v);
-	}
-
-	@Override
-	public void writeFloat(float v) throws IOException {
-		outputStream.writeFloat(v);
-	}
-
-	@Override
-	public void writeDouble(double v) throws IOException {
-		outputStream.writeDouble(v);
-	}
-
-	@Override
-	public void writeBytes(String s) throws IOException {
-		outputStream.writeBytes(s);
-	}
-
-	@Override
-	public void writeChars(String s) throws IOException {
-		outputStream.writeChars(s);
-	}
-
-	@Override
-	public void writeUTF(String s) throws IOException {
-		outputStream.writeUTF(s);
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/9ecb6df7/stratosphere-core/src/main/java/eu/stratosphere/core/memory/InputViewDataInputWrapper.java
----------------------------------------------------------------------
diff --git a/stratosphere-core/src/main/java/eu/stratosphere/core/memory/InputViewDataInputWrapper.java b/stratosphere-core/src/main/java/eu/stratosphere/core/memory/InputViewDataInputWrapper.java
new file mode 100644
index 0000000..c2451ef
--- /dev/null
+++ b/stratosphere-core/src/main/java/eu/stratosphere/core/memory/InputViewDataInputWrapper.java
@@ -0,0 +1,113 @@
+/***********************************************************************************************************************
+ *
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ *
+ **********************************************************************************************************************/
+
+package eu.stratosphere.core.memory;
+
+import java.io.DataInput;
+import java.io.IOException;
+
+/**
+ * A utility that presents a {@link DataInput} as a {@link DataInputView}.
+ */
+public class InputViewDataInputWrapper implements DataInputView {
+	
+	private DataInput delegate;
+	
+	public void setDelegate(DataInput delegate) {
+		this.delegate = delegate;
+	}
+
+	@Override
+	public void readFully(byte[] b) throws IOException {
+		this.delegate.readFully(b);
+	}
+
+	@Override
+	public void readFully(byte[] b, int off, int len) throws IOException {
+		this.delegate.readFully(b, off, len);
+	}
+
+	@Override
+	public int skipBytes(int n) throws IOException {
+		return this.delegate.skipBytes(n);
+	}
+
+	@Override
+	public boolean readBoolean() throws IOException {
+		return this.delegate.readBoolean();
+	}
+
+	@Override
+	public byte readByte() throws IOException {
+		return this.delegate.readByte();
+	}
+
+	@Override
+	public int readUnsignedByte() throws IOException {
+		return this.delegate.readUnsignedByte();
+	}
+
+	@Override
+	public short readShort() throws IOException {
+		return this.delegate.readShort();
+	}
+
+	@Override
+	public int readUnsignedShort() throws IOException {
+		return this.delegate.readUnsignedShort();
+	}
+
+	@Override
+	public char readChar() throws IOException {
+		return this.delegate.readChar();
+	}
+
+	@Override
+	public int readInt() throws IOException {
+		return this.delegate.readInt();
+	}
+
+	@Override
+	public long readLong() throws IOException {
+		return this.delegate.readLong();
+	}
+
+	@Override
+	public float readFloat() throws IOException {
+		return this.delegate.readFloat();
+	}
+
+	@Override
+	public double readDouble() throws IOException {
+		return this.delegate.readDouble();
+	}
+
+	@Override
+	public String readLine() throws IOException {
+		return this.delegate.readLine();
+	}
+
+	@Override
+	public String readUTF() throws IOException {
+		return this.delegate.readUTF();
+	}
+
+	@Override
+	public void skipBytesToRead(int numBytes) throws IOException {
+		for (int i = 0; i < numBytes; i++) {
+			this.delegate.readByte();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/9ecb6df7/stratosphere-core/src/main/java/eu/stratosphere/core/memory/OutputViewDataOutputWrapper.java
----------------------------------------------------------------------
diff --git a/stratosphere-core/src/main/java/eu/stratosphere/core/memory/OutputViewDataOutputWrapper.java b/stratosphere-core/src/main/java/eu/stratosphere/core/memory/OutputViewDataOutputWrapper.java
new file mode 100644
index 0000000..7bb8f8c
--- /dev/null
+++ b/stratosphere-core/src/main/java/eu/stratosphere/core/memory/OutputViewDataOutputWrapper.java
@@ -0,0 +1,116 @@
+/***********************************************************************************************************************
+ *
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ *
+ **********************************************************************************************************************/
+
+package eu.stratosphere.core.memory;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * A utility that presents a {@link DataOutput} as a {@link DataOutputView}.
+ */
+public class OutputViewDataOutputWrapper implements DataOutputView {
+	
+	private DataOutput delegate;
+	
+	public void setDelegate(DataOutput delegate) {
+		this.delegate = delegate;
+	}
+
+	@Override
+	public void write(int b) throws IOException {
+		this.delegate.write(b);
+	}
+
+	@Override
+	public void write(byte[] b) throws IOException {
+		this.delegate.write(b);
+	}
+
+	@Override
+	public void write(byte[] b, int off, int len) throws IOException {
+		this.delegate.write(b, off, len);
+	}
+
+	@Override
+	public void writeBoolean(boolean v) throws IOException {
+		this.delegate.writeBoolean(v);
+	}
+
+	@Override
+	public void writeByte(int v) throws IOException {
+		this.delegate.writeByte(v);
+	}
+
+	@Override
+	public void writeShort(int v) throws IOException {
+		this.delegate.writeShort(v);
+	}
+
+	@Override
+	public void writeChar(int v) throws IOException {
+		this.delegate.writeChar(v);
+	}
+
+	@Override
+	public void writeInt(int v) throws IOException {
+		this.delegate.writeInt(v);
+	}
+
+	@Override
+	public void writeLong(long v) throws IOException {
+		this.delegate.writeLong(v);
+	}
+
+	@Override
+	public void writeFloat(float v) throws IOException {
+		this.delegate.writeFloat(v);
+	}
+
+	@Override
+	public void writeDouble(double v) throws IOException {
+		this.delegate.writeDouble(v);
+	}
+
+	@Override
+	public void writeBytes(String s) throws IOException {
+		this.delegate.writeBytes(s);
+	}
+
+	@Override
+	public void writeChars(String s) throws IOException {
+		this.delegate.writeChars(s);
+	}
+
+	@Override
+	public void writeUTF(String s) throws IOException {
+		this.delegate.writeUTF(s);
+	}
+
+	@Override
+	public void skipBytesToWrite(int numBytes) throws IOException {
+		// skip by writing zeros.
+		for (int i = 0; i < numBytes; i++) {
+			this.delegate.writeByte(0);
+		}
+	}
+
+	@Override
+	public void write(DataInputView source, int numBytes) throws IOException {
+		for (int i = 0; i < numBytes; i++) {
+			this.delegate.writeByte(source.readByte());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/9ecb6df7/stratosphere-examples/stratosphere-java-examples/src/main/java/eu/stratosphere/example/java/graph/TransitiveClosureNaive.java
----------------------------------------------------------------------
diff --git a/stratosphere-examples/stratosphere-java-examples/src/main/java/eu/stratosphere/example/java/graph/TransitiveClosureNaive.java b/stratosphere-examples/stratosphere-java-examples/src/main/java/eu/stratosphere/example/java/graph/TransitiveClosureNaive.java
index 3fd1627..79585ec 100644
--- a/stratosphere-examples/stratosphere-java-examples/src/main/java/eu/stratosphere/example/java/graph/TransitiveClosureNaive.java
+++ b/stratosphere-examples/stratosphere-java-examples/src/main/java/eu/stratosphere/example/java/graph/TransitiveClosureNaive.java
@@ -25,7 +25,7 @@ import eu.stratosphere.util.Collector;
 
 import java.util.Iterator;
 
-
+@SuppressWarnings("serial")
 public class TransitiveClosureNaive implements ProgramDescription {
 
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/9ecb6df7/stratosphere-java/src/main/java/eu/stratosphere/api/java/io/CollectionInputFormat.java
----------------------------------------------------------------------
diff --git a/stratosphere-java/src/main/java/eu/stratosphere/api/java/io/CollectionInputFormat.java b/stratosphere-java/src/main/java/eu/stratosphere/api/java/io/CollectionInputFormat.java
index 5e513b2..fd5ae36 100644
--- a/stratosphere-java/src/main/java/eu/stratosphere/api/java/io/CollectionInputFormat.java
+++ b/stratosphere-java/src/main/java/eu/stratosphere/api/java/io/CollectionInputFormat.java
@@ -23,13 +23,12 @@ import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 
-import eu.stratosphere.api.common.io.ByteArrayInputView;
-import eu.stratosphere.api.common.io.ByteArrayOutputView;
 import eu.stratosphere.api.common.io.GenericInputFormat;
 import eu.stratosphere.api.common.io.NonParallelInput;
 import eu.stratosphere.api.common.typeutils.TypeSerializer;
 import eu.stratosphere.core.io.GenericInputSplit;
-import eu.stratosphere.core.memory.DataInputView;
+import eu.stratosphere.core.memory.InputViewDataInputWrapper;
+import eu.stratosphere.core.memory.OutputViewDataOutputWrapper;
 
 /**
  * An input format that returns objects from a collection.
@@ -75,43 +74,35 @@ public class CollectionInputFormat<T> extends GenericInputFormat<T> implements N
 
 	// --------------------------------------------------------------------------------------------
 
-	private void writeObject(ObjectOutputStream out) throws IOException{
+	private void writeObject(ObjectOutputStream out) throws IOException {
 		out.writeObject(serializer);
 		out.writeInt(dataSet.size());
-		ByteArrayOutputView outputView = new ByteArrayOutputView();
-		for(T element : dataSet){
-			serializer.serialize(element, outputView);
+		
+		OutputViewDataOutputWrapper outWrapper = new OutputViewDataOutputWrapper();
+		outWrapper.setDelegate(out);
+		
+		for (T element : dataSet){
+			serializer.serialize(element, outWrapper);
 		}
-
-		byte[] blob = outputView.getByteArray();
-		out.writeInt(blob.length);
-		out.write(blob);
 	}
 
-	private void readObject(ObjectInputStream in) throws IOException{
-		try{
-			Object obj = in.readObject();
-
-			if(obj instanceof TypeSerializer<?>){
-				serializer = (TypeSerializer<T>)obj;
-			}
-		}catch(ClassNotFoundException ex){
-			throw new IOException(ex);
+	@SuppressWarnings("unchecked")
+	private void readObject(ObjectInputStream in) throws IOException {
+		try {
+			this.serializer = (TypeSerializer<T>) in.readObject();
+		} catch (ClassNotFoundException ex){
+			throw new IOException("Could not load the serializer class.", ex);
 		}
 
-
 		int collectionLength = in.readInt();
 		List<T> list = new ArrayList<T>(collectionLength);
+		
+		InputViewDataInputWrapper inWrapper = new InputViewDataInputWrapper();
+		inWrapper.setDelegate(in);
 
-		int blobLength = in.readInt();
-		byte[] blob = new byte[blobLength];
-		in.readFully(blob);
-
-		DataInputView inputView = new ByteArrayInputView(blob);
-
-		for(int i=0; i< collectionLength; i++){
+		for (int i = 0; i < collectionLength; i++){
 			T element = serializer.createInstance();
-			element = serializer.deserialize(element, inputView);
+			element = serializer.deserialize(element, inWrapper);
 			list.add(element);
 		}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/9ecb6df7/stratosphere-java/src/test/java/eu/stratosphere/api/java/io/CollectionInputFormatTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-java/src/test/java/eu/stratosphere/api/java/io/CollectionInputFormatTest.java b/stratosphere-java/src/test/java/eu/stratosphere/api/java/io/CollectionInputFormatTest.java
index 781e7a8..4388c9c 100644
--- a/stratosphere-java/src/test/java/eu/stratosphere/api/java/io/CollectionInputFormatTest.java
+++ b/stratosphere-java/src/test/java/eu/stratosphere/api/java/io/CollectionInputFormatTest.java
@@ -1,4 +1,4 @@
-/*
+/***********************************************************************************************************************
  * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
  *
  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
@@ -9,7 +9,7 @@
  * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  * specific language governing permissions and limitations under the License.
- */
+ **********************************************************************************************************************/
 
 package eu.stratosphere.api.java.io;
 
@@ -17,10 +17,10 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-
 import eu.stratosphere.api.java.typeutils.TypeExtractor;
 import eu.stratosphere.core.io.GenericInputSplit;
 import eu.stratosphere.types.TypeInformation;
+
 import org.junit.Test;
 
 import java.io.ByteArrayInputStream;
@@ -67,8 +67,8 @@ public class CollectionInputFormatTest {
 		inputCollection.add(element2);
 		inputCollection.add(element3);
 
-		TypeInformation<ElementType> info = (TypeInformation<ElementType>)TypeExtractor.createTypeInfo(ElementType
-				.class);
+		@SuppressWarnings("unchecked")
+		TypeInformation<ElementType> info = (TypeInformation<ElementType>) TypeExtractor.createTypeInfo(ElementType.class);
 
 		CollectionInputFormat<ElementType> inputFormat = new CollectionInputFormat<ElementType>(inputCollection,
 				info.createSerializer());
@@ -86,6 +86,7 @@ public class CollectionInputFormatTest {
 			assertNotNull(serializationResult);
 			assertTrue(serializationResult instanceof CollectionInputFormat<?>);
 
+			@SuppressWarnings("unchecked")
 			CollectionInputFormat<ElementType> result = (CollectionInputFormat<ElementType>) serializationResult;
 
 			GenericInputSplit inputSplit = new GenericInputSplit();

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/9ecb6df7/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/plugable/DeserializationDelegate.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/plugable/DeserializationDelegate.java b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/plugable/DeserializationDelegate.java
index a4958d6..00ac913 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/plugable/DeserializationDelegate.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/plugable/DeserializationDelegate.java
@@ -19,7 +19,7 @@ import java.io.IOException;
 
 import eu.stratosphere.api.common.typeutils.TypeSerializer;
 import eu.stratosphere.core.io.IOReadableWritable;
-import eu.stratosphere.core.memory.DataInputView;
+import eu.stratosphere.core.memory.InputViewDataInputWrapper;
 
 
 public class DeserializationDelegate<T> implements IOReadableWritable {
@@ -28,12 +28,12 @@ public class DeserializationDelegate<T> implements IOReadableWritable {
 	
 	private final TypeSerializer<T> serializer;
 	
-	private final InputViewWrapper wrapper;
+	private final InputViewDataInputWrapper wrapper;
 	
 	
 	public DeserializationDelegate(TypeSerializer<T> serializer) {
 		this.serializer = serializer;
-		this.wrapper = new InputViewWrapper();
+		this.wrapper = new InputViewDataInputWrapper();
 	}
 	
 	public void setInstance(T instance) {
@@ -54,100 +54,4 @@ public class DeserializationDelegate<T> implements IOReadableWritable {
 		this.wrapper.setDelegate(in);
 		this.instance = this.serializer.deserialize(this.instance, this.wrapper);
 	}
-	
-	// --------------------------------------------------------------------------------------------
-	
-	/**
-	 * Utility class that wraps a {@link DataInput} as a {@link DataInputView}.
-	 */
-	private static final class InputViewWrapper implements DataInputView {
-		
-		private DataInput delegate;
-		
-		public void setDelegate(DataInput delegate) {
-			this.delegate = delegate;
-		}
-
-		@Override
-		public void readFully(byte[] b) throws IOException {
-			this.delegate.readFully(b);
-		}
-
-		@Override
-		public void readFully(byte[] b, int off, int len) throws IOException {
-			this.delegate.readFully(b, off, len);
-		}
-
-		@Override
-		public int skipBytes(int n) throws IOException {
-			return this.delegate.skipBytes(n);
-		}
-
-		@Override
-		public boolean readBoolean() throws IOException {
-			return this.delegate.readBoolean();
-		}
-
-		@Override
-		public byte readByte() throws IOException {
-			return this.delegate.readByte();
-		}
-
-		@Override
-		public int readUnsignedByte() throws IOException {
-			return this.delegate.readUnsignedByte();
-		}
-
-		@Override
-		public short readShort() throws IOException {
-			return this.delegate.readShort();
-		}
-
-		@Override
-		public int readUnsignedShort() throws IOException {
-			return this.delegate.readUnsignedShort();
-		}
-
-		@Override
-		public char readChar() throws IOException {
-			return this.delegate.readChar();
-		}
-
-		@Override
-		public int readInt() throws IOException {
-			return this.delegate.readInt();
-		}
-
-		@Override
-		public long readLong() throws IOException {
-			return this.delegate.readLong();
-		}
-
-		@Override
-		public float readFloat() throws IOException {
-			return this.delegate.readFloat();
-		}
-
-		@Override
-		public double readDouble() throws IOException {
-			return this.delegate.readDouble();
-		}
-
-		@Override
-		public String readLine() throws IOException {
-			return this.delegate.readLine();
-		}
-
-		@Override
-		public String readUTF() throws IOException {
-			return this.delegate.readUTF();
-		}
-
-		@Override
-		public void skipBytesToRead(int numBytes) throws IOException {
-			for (int i = 0; i < numBytes; i++) {
-				this.delegate.readByte();
-			}
-		}
-	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/9ecb6df7/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/plugable/SerializationDelegate.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/plugable/SerializationDelegate.java b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/plugable/SerializationDelegate.java
index 1c54f86..7e940cf 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/plugable/SerializationDelegate.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/plugable/SerializationDelegate.java
@@ -19,8 +19,7 @@ import java.io.IOException;
 
 import eu.stratosphere.api.common.typeutils.TypeSerializer;
 import eu.stratosphere.core.io.IOReadableWritable;
-import eu.stratosphere.core.memory.DataInputView;
-import eu.stratosphere.core.memory.DataOutputView;
+import eu.stratosphere.core.memory.OutputViewDataOutputWrapper;
 
 
 public class SerializationDelegate<T> implements IOReadableWritable {
@@ -29,12 +28,12 @@ public class SerializationDelegate<T> implements IOReadableWritable {
 	
 	private final TypeSerializer<T> serializer;
 	
-	private final OutputViewWrapper wrapper;
+	private final OutputViewDataOutputWrapper wrapper;
 	
 	
 	public SerializationDelegate(TypeSerializer<T> serializer) {
 		this.serializer = serializer;
-		this.wrapper = new OutputViewWrapper();
+		this.wrapper = new OutputViewDataOutputWrapper();
 	}
 	
 	public void setInstance(T instance) {
@@ -56,103 +55,4 @@ public class SerializationDelegate<T> implements IOReadableWritable {
 	public void read(DataInput in) throws IOException {
 		throw new IllegalStateException("Deserialization method called on SerializationDelegate.");
 	}
-	
-	// --------------------------------------------------------------------------------------------
-	
-	/**
-	 * Utility class that wraps a {@link DataOutput} as a {@link DataOutputView}.
-	 */
-	private static final class OutputViewWrapper implements DataOutputView {
-		
-		private DataOutput delegate;
-		
-		public void setDelegate(DataOutput delegate) {
-			this.delegate = delegate;
-		}
-
-		@Override
-		public void write(int b) throws IOException {
-			this.delegate.write(b);
-		}
-
-		@Override
-		public void write(byte[] b) throws IOException {
-			this.delegate.write(b);
-		}
-
-		@Override
-		public void write(byte[] b, int off, int len) throws IOException {
-			this.delegate.write(b, off, len);
-		}
-
-		@Override
-		public void writeBoolean(boolean v) throws IOException {
-			this.delegate.writeBoolean(v);
-		}
-
-		@Override
-		public void writeByte(int v) throws IOException {
-			this.delegate.writeByte(v);
-		}
-
-		@Override
-		public void writeShort(int v) throws IOException {
-			this.delegate.writeShort(v);
-		}
-
-		@Override
-		public void writeChar(int v) throws IOException {
-			this.delegate.writeChar(v);
-		}
-
-		@Override
-		public void writeInt(int v) throws IOException {
-			this.delegate.writeInt(v);
-		}
-
-		@Override
-		public void writeLong(long v) throws IOException {
-			this.delegate.writeLong(v);
-		}
-
-		@Override
-		public void writeFloat(float v) throws IOException {
-			this.delegate.writeFloat(v);
-		}
-
-		@Override
-		public void writeDouble(double v) throws IOException {
-			this.delegate.writeDouble(v);
-		}
-
-		@Override
-		public void writeBytes(String s) throws IOException {
-			this.delegate.writeBytes(s);
-		}
-
-		@Override
-		public void writeChars(String s) throws IOException {
-			this.delegate.writeChars(s);
-		}
-
-		@Override
-		public void writeUTF(String s) throws IOException {
-			this.delegate.writeUTF(s);
-		}
-
-		@Override
-		public void skipBytesToWrite(int numBytes) throws IOException {
-			// skip by writing zeros.
-			for (int i = 0; i < numBytes; i++) {
-				this.delegate.writeByte(0);
-			}
-		}
-
-		@Override
-		public void write(DataInputView source, int numBytes) throws IOException {
-			for (int i = 0; i < numBytes; i++) {
-				this.delegate.writeByte(source.readByte());
-			}
-		}
-	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/9ecb6df7/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/CoGroupConnectedComponentsSecondITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/CoGroupConnectedComponentsSecondITCase.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/CoGroupConnectedComponentsSecondITCase.java
index 1a0f443..103effa 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/CoGroupConnectedComponentsSecondITCase.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/CoGroupConnectedComponentsSecondITCase.java
@@ -32,8 +32,6 @@ import eu.stratosphere.example.java.graph.ConnectedComponents.NeighborWithCompon
 import eu.stratosphere.test.testdata.ConnectedComponentsData;
 import eu.stratosphere.test.util.JavaProgramTestBase;
 import eu.stratosphere.util.Collector;
-import eu.stratosphere.util.LogUtils;
-
 
 @SuppressWarnings("serial")
 public class CoGroupConnectedComponentsSecondITCase extends JavaProgramTestBase {
@@ -51,8 +49,6 @@ public class CoGroupConnectedComponentsSecondITCase extends JavaProgramTestBase
 		// set up execution environment
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		
-		LogUtils.initializeDefaultConsoleLogger();
-		
 		// read vertex and edge data
 		DataSet<Long> vertices = env.fromElements(ConnectedComponentsData.getEnumeratingVertices(NUM_VERTICES).split("\n"))
 				.map(new VertexParser());


[4/6] git commit: Transitive closure example in Java API

Posted by se...@apache.org.
Transitive closure example in Java API

This closes #24


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/f3930e3d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/f3930e3d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/f3930e3d

Branch: refs/heads/master
Commit: f3930e3dba63d98f2b0fc7986fb507f9afa8283a
Parents: b2bd469
Author: Kostas Tzoumas <Ko...@gmail.com>
Authored: Fri Jun 6 17:22:30 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Jun 18 18:57:22 2014 +0200

----------------------------------------------------------------------
 .../stratosphere-java-examples/pom.xml          |  26 +++-
 .../java/graph/TransitiveClosureNaive.java      | 132 +++++++++++++++++++
 .../test/testdata/TransitiveClosureData.java    |  44 +++++++
 .../TransitiveClosureITCase.java                |  54 ++++++++
 4 files changed, 255 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f3930e3d/stratosphere-examples/stratosphere-java-examples/pom.xml
----------------------------------------------------------------------
diff --git a/stratosphere-examples/stratosphere-java-examples/pom.xml b/stratosphere-examples/stratosphere-java-examples/pom.xml
index 3fd4c9b..705c46c 100644
--- a/stratosphere-examples/stratosphere-java-examples/pom.xml
+++ b/stratosphere-examples/stratosphere-java-examples/pom.xml
@@ -57,7 +57,31 @@
 							</includes>
 						</configuration>
 					</execution>
-					
+
+                    <!-- Transitive Closure -->
+                    <execution>
+                        <id>TransitiveClosure</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>jar</goal>
+                        </goals>
+                        <configuration>
+                            <classifier>TransitiveClosure</classifier>
+
+                            <archive>
+                                <manifestEntries>
+                                    <program-class>eu.stratosphere.example.java.graph.TransitiveClosureNaive</program-class>
+                                </manifestEntries>
+                            </archive>
+
+                            <includes>
+                                <include>**/java/graph/TransitiveClosureNaive.class</include>
+                                <include>**/java/graph/TransitiveClosureNaive$*.class</include>
+                                <include>**/java/graph/util/ConnectedComponentsData.class</include>
+                            </includes>
+                        </configuration>
+                    </execution>
+
 					<!-- Connected Components -->
 					<execution>
 						<id>ConnectedComponents</id>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f3930e3d/stratosphere-examples/stratosphere-java-examples/src/main/java/eu/stratosphere/example/java/graph/TransitiveClosureNaive.java
----------------------------------------------------------------------
diff --git a/stratosphere-examples/stratosphere-java-examples/src/main/java/eu/stratosphere/example/java/graph/TransitiveClosureNaive.java b/stratosphere-examples/stratosphere-java-examples/src/main/java/eu/stratosphere/example/java/graph/TransitiveClosureNaive.java
new file mode 100644
index 0000000..3fd1627
--- /dev/null
+++ b/stratosphere-examples/stratosphere-java-examples/src/main/java/eu/stratosphere/example/java/graph/TransitiveClosureNaive.java
@@ -0,0 +1,132 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.example.java.graph;
+
+import eu.stratosphere.api.common.ProgramDescription;
+import eu.stratosphere.api.java.DataSet;
+import eu.stratosphere.api.java.ExecutionEnvironment;
+import eu.stratosphere.api.java.IterativeDataSet;
+import eu.stratosphere.api.java.functions.GroupReduceFunction;
+import eu.stratosphere.api.java.functions.JoinFunction;
+import eu.stratosphere.api.java.tuple.Tuple2;
+import eu.stratosphere.example.java.graph.util.ConnectedComponentsData;
+import eu.stratosphere.util.Collector;
+
+import java.util.Iterator;
+
+
+public class TransitiveClosureNaive implements ProgramDescription {
+
+
+	public static void main (String... args) throws Exception{
+
+		if (!parseParameters(args)) {
+			return;
+		}
+
+		// set up execution environment
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple2<Long, Long>> edges = getEdgeDataSet(env);
+
+		IterativeDataSet<Tuple2<Long,Long>> paths = edges.iterate(maxIterations);
+
+		DataSet<Tuple2<Long,Long>> nextPaths = paths
+				.join(edges)
+				.where(1)
+				.equalTo(0)
+				.with(new JoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>>() {
+					@Override
+					/**
+						left: Path (z,x) - x is reachable by z
+						right: Edge (x,y) - edge x-->y exists
+						out: Path (z,y) - y is reachable by z
+					 */
+					public Tuple2<Long, Long> join(Tuple2<Long, Long> left, Tuple2<Long, Long> right) throws Exception {
+						return new Tuple2<Long, Long>(
+								new Long(left.f0),
+								new Long(right.f1));
+					}
+				})
+				.union(paths)
+				.groupBy(0, 1)
+				.reduceGroup(new GroupReduceFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>() {
+					@Override
+					public void reduce(Iterator<Tuple2<Long, Long>> values, Collector<Tuple2<Long, Long>> out) throws Exception {
+						out.collect(values.next());
+					}
+				});
+
+		DataSet<Tuple2<Long, Long>> transitiveClosure = paths.closeWith(nextPaths);
+
+
+		// emit result
+		if (fileOutput) {
+			transitiveClosure.writeAsCsv(outputPath, "\n", " ");
+		} else {
+			transitiveClosure.print();
+		}
+
+		// execute program
+		env.execute("Transitive Closure Example");
+
+	}
+
+	@Override
+	public String getDescription() {
+		return "Parameters: <edges-path> <result-path> <max-number-of-iterations>";
+	}
+
+	// *************************************************************************
+	//     UTIL METHODS
+	// *************************************************************************
+
+	private static boolean fileOutput = false;
+	private static String edgesPath = null;
+	private static String outputPath = null;
+	private static int maxIterations = 10;
+
+	private static boolean parseParameters(String[] programArguments) {
+
+		if (programArguments.length > 0) {
+			// parse input arguments
+			fileOutput = true;
+			if (programArguments.length == 3) {
+				edgesPath = programArguments[0];
+				outputPath = programArguments[1];
+				maxIterations = Integer.parseInt(programArguments[2]);
+			} else {
+				System.err.println("Usage: TransitiveClosure <edges path> <result path> <max number of iterations>");
+				return false;
+			}
+		} else {
+			System.out.println("Executing TransitiveClosure example with default parameters and built-in default data.");
+			System.out.println("  Provide parameters to read input data from files.");
+			System.out.println("  See the documentation for the correct format of input files.");
+			System.out.println("  Usage: TransitiveClosure <edges path> <result path> <max number of iterations>");
+		}
+		return true;
+	}
+
+
+	private static DataSet<Tuple2<Long, Long>> getEdgeDataSet(ExecutionEnvironment env) {
+
+		if(fileOutput) {
+			return env.readCsvFile(edgesPath).fieldDelimiter(' ').types(Long.class, Long.class);
+		} else {
+			return ConnectedComponentsData.getDefaultEdgeDataSet(env);
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f3930e3d/stratosphere-test-utils/src/main/java/eu/stratosphere/test/testdata/TransitiveClosureData.java
----------------------------------------------------------------------
diff --git a/stratosphere-test-utils/src/main/java/eu/stratosphere/test/testdata/TransitiveClosureData.java b/stratosphere-test-utils/src/main/java/eu/stratosphere/test/testdata/TransitiveClosureData.java
new file mode 100644
index 0000000..f39216a
--- /dev/null
+++ b/stratosphere-test-utils/src/main/java/eu/stratosphere/test/testdata/TransitiveClosureData.java
@@ -0,0 +1,44 @@
+/***********************************************************************************************************************
+ *
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ *
+ **********************************************************************************************************************/
+
+package eu.stratosphere.test.testdata;
+
+import org.junit.Assert;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.util.regex.Pattern;
+
+public class TransitiveClosureData {
+
+	public static void checkOddEvenResult(BufferedReader result) throws IOException {
+		Pattern split = Pattern.compile(" ");
+		String line;
+		while ((line = result.readLine()) != null) {
+			String[] res = split.split(line);
+			Assert.assertEquals("Malformed result: Wrong number of tokens in line.", 2, res.length);
+			try {
+				int from = Integer.parseInt(res[0]);
+				int to = Integer.parseInt(res[1]);
+
+				Assert.assertEquals("Vertex should not be reachable.", from % 2, to % 2);
+			} catch (NumberFormatException e) {
+				Assert.fail("Malformed result.");
+			}
+		}
+	}
+
+	private TransitiveClosureData() {}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f3930e3d/stratosphere-tests/src/test/java/eu/stratosphere/test/exampleJavaPrograms/TransitiveClosureITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/exampleJavaPrograms/TransitiveClosureITCase.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/exampleJavaPrograms/TransitiveClosureITCase.java
new file mode 100644
index 0000000..96761c8
--- /dev/null
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/exampleJavaPrograms/TransitiveClosureITCase.java
@@ -0,0 +1,54 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.test.exampleJavaPrograms;
+
+
+import eu.stratosphere.example.java.graph.TransitiveClosureNaive;
+import eu.stratosphere.test.testdata.ConnectedComponentsData;
+import eu.stratosphere.test.testdata.TransitiveClosureData;
+import eu.stratosphere.test.util.JavaProgramTestBase;
+
+import java.io.BufferedReader;
+
+public class TransitiveClosureITCase extends JavaProgramTestBase {
+
+	private static final long SEED = 0xBADC0FFEEBEEFL;
+
+	private static final int NUM_VERTICES = 1000;
+
+	private static final int NUM_EDGES = 10000;
+
+	private String edgesPath;
+	private String resultPath;
+
+
+	@Override
+	protected void preSubmit() throws Exception {
+		edgesPath = createTempFile("edges.txt", ConnectedComponentsData.getRandomOddEvenEdges(NUM_EDGES, NUM_VERTICES, SEED));
+		resultPath = getTempFilePath("results");
+	}
+
+	@Override
+	protected void testProgram() throws Exception {
+		TransitiveClosureNaive.main(edgesPath, resultPath, "100");
+	}
+
+	@Override
+	protected void postSubmit() throws Exception {
+		for (BufferedReader reader : getResultReader(resultPath)) {
+			TransitiveClosureData.checkOddEvenResult(reader);
+		}
+	}
+}
+