You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2014/12/17 21:38:06 UTC

[4/5] incubator-flink git commit: [FLINK-610] Replace Avro by Kryo as the GenericType serializer

[FLINK-610] Replace Avro by Kryo as the GenericType serializer

The performance of data-intensive jobs using Kryo is probably going to be slow.

Set correct classloader

try to use Kryo.copy() with fallback to serialization copy


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/a70aa67a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/a70aa67a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/a70aa67a

Branch: refs/heads/master
Commit: a70aa67a0881afc5d66329d46fe536d7a0b89fa8
Parents: 1c1c83b
Author: Robert Metzger <rm...@apache.org>
Authored: Tue Dec 16 11:30:52 2014 +0100
Committer: mbalassi <mb...@apache.org>
Committed: Wed Dec 17 20:44:14 2014 +0100

----------------------------------------------------------------------
 .../common/typeutils/SerializerTestBase.java    |   5 +-
 .../typeutils/SerializerTestInstance.java       |   2 -
 flink-java/pom.xml                              |   6 +
 .../api/java/typeutils/GenericTypeInfo.java     |  19 +--
 .../flink/api/java/typeutils/TypeExtractor.java |  17 +-
 .../java/typeutils/runtime/KryoSerializer.java  |  58 ++++---
 .../type/extractor/PojoTypeExtractionTest.java  | 166 +++++++++++--------
 .../AbstractGenericTypeSerializerTest.java      |   2 +-
 .../runtime/KryoGenericTypeComparatorTest.java  |   2 +-
 .../runtime/KryoGenericTypeSerializerTest.java  |  44 ++++-
 flink-scala/pom.xml                             |   8 +
 .../runtime/KryoGenericTypeSerializerTest.scala | 128 ++++++++++++++
 .../javaApiOperators/GroupReduceITCase.java     |  71 +++++++-
 .../util/CollectionDataSets.java                |  77 +++++++++
 14 files changed, 481 insertions(+), 124 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a70aa67a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
index d509284..5122af9 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
@@ -59,7 +59,10 @@ public abstract class SerializerTestBase<T> {
 	public void testInstantiate() {
 		try {
 			TypeSerializer<T> serializer = getSerializer();
-			
+			if(serializer.getClass().getName().endsWith("KryoSerializer")) {
+				// the kryo serializer will return null. We ignore this test for Kryo.
+				return;
+			}
 			T instance = serializer.createInstance();
 			assertNotNull("The created instance must not be null.", instance);
 			

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a70aa67a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestInstance.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestInstance.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestInstance.java
index c48e879..7f65995 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestInstance.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestInstance.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.api.common.typeutils;
 
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-
 
 public class SerializerTestInstance<T> extends SerializerTestBase<T> {
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a70aa67a/flink-java/pom.xml
----------------------------------------------------------------------
diff --git a/flink-java/pom.xml b/flink-java/pom.xml
index 3b80b3e..6865cc9 100644
--- a/flink-java/pom.xml
+++ b/flink-java/pom.xml
@@ -58,6 +58,12 @@ under the License.
 			<artifactId>asm</artifactId>
 		</dependency>
 		
+		<dependency>
+			<groupId>com.twitter</groupId>
+			<artifactId>chill_2.10</artifactId>
+			<version>0.5.1</version>
+		</dependency>
+
 		<!--  guava needs to be in "provided" scope, to make sure it is not included into the jars by the shading -->
 		<dependency>
 			<groupId>com.google.guava</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a70aa67a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
index 8a1406b..5bc6cb9 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
@@ -22,26 +22,16 @@ import org.apache.flink.api.common.typeinfo.AtomicType;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.typeutils.runtime.AvroSerializer;
 import org.apache.flink.api.java.typeutils.runtime.GenericTypeComparator;
-
-import java.util.Collection;
+import org.apache.flink.api.java.typeutils.runtime.KryoSerializer;
 
 
 public class GenericTypeInfo<T> extends TypeInformation<T> implements AtomicType<T> {
 
 	private final Class<T> typeClass;
-	private final static Class<?>[] unsupportedByAvro = new Class[] {Collection.class};
-	
+
 	public GenericTypeInfo(Class<T> typeClass) {
 		this.typeClass = typeClass;
-		for (Class<?> unsupported: unsupportedByAvro) {
-			if(unsupported.isAssignableFrom(typeClass)) {
-				throw new RuntimeException("The type '"+typeClass+"' is currently not supported " +
-						"by the Avro Serializer that Flink is using for serializing " +
-						"arbitrary objects");
-			}
-		}
 	}
 	
 	@Override
@@ -76,10 +66,7 @@ public class GenericTypeInfo<T> extends TypeInformation<T> implements AtomicType
 
 	@Override
 	public TypeSerializer<T> createSerializer() {
-		// NOTE: The TypeExtractor / pojo logic is assuming that we are using a Avro Serializer here
-		// in particular classes implementing GenericContainer are handled as GenericTypeInfos 
-		// (this will probably not work with Kryo)
-		return new AvroSerializer<T>(this.typeClass);
+		return new KryoSerializer<T>(this.typeClass);
 	}
 
 	@SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a70aa67a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
index 3bceac5..e52e2af 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
@@ -30,7 +30,6 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
-import org.apache.avro.generic.GenericContainer;
 import org.apache.commons.lang3.Validate;
 import org.apache.flink.api.common.functions.CoGroupFunction;
 import org.apache.flink.api.common.functions.CrossFunction;
@@ -947,16 +946,16 @@ public class TypeExtractor {
 			// special case handling for Class, this should not be handled by the POJO logic
 			return new GenericTypeInfo<X>(clazz);
 		}
-		if(GenericContainer.class.isAssignableFrom(clazz)) {
-			// this is a type generated by Avro. GenericTypeInfo is able to handle this case because its using Avro.
-			return new GenericTypeInfo<X>(clazz);
-		}
+
 		try {
 			TypeInformation<X> pojoType = analyzePojo(clazz, new ArrayList<Type>(typeHierarchy), clazzTypeHint);
 			if (pojoType != null) {
 				return pojoType;
 			}
 		} catch (InvalidTypesException e) {
+			if(LOG.isDebugEnabled()) {
+				LOG.debug("Unable to handle type "+clazz+" as POJO. Message: "+e.getMessage(), e);
+			}
 			// ignore and create generic type info
 		}
 
@@ -1051,9 +1050,11 @@ public class TypeExtractor {
 				fieldTypeHierarchy.add(fieldType);
 				pojoFields.add(new PojoField(field, createTypeInfoWithTypeHierarchy(fieldTypeHierarchy, fieldType, null, null) ));
 			} catch (InvalidTypesException e) {
-				//pojoFields.add(new PojoField(field, new GenericTypeInfo( Object.class ))); // we need kryo to properly serialize this
-				throw new InvalidTypesException("Flink is currently unable to serialize this type: "+fieldType+""
-						+ "\nThe system is internally using the Avro serializer which is not able to handle that type.", e);
+				Class<?> genericClass = Object.class;
+				if(isClassType(fieldType)) {
+					genericClass = typeToClass(fieldType);
+				}
+				pojoFields.add(new PojoField(field, new GenericTypeInfo( genericClass )));
 			}
 		}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a70aa67a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java
index 7a98abf..f2c5848 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java
@@ -19,20 +19,23 @@
 package org.apache.flink.api.java.typeutils.runtime;
 
 import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.KryoException;
 import com.esotericsoftware.kryo.io.Input;
 import com.esotericsoftware.kryo.io.Output;
 
+import com.twitter.chill.ScalaKryoInstantiator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 
 public class KryoSerializer<T> extends TypeSerializer<T> {
-	private static final long serialVersionUID = 1L;
+	private static final long serialVersionUID = 2L;
 
 	private final Class<T> type;
-	private final Class<? extends T> typeToInstantiate;
 
 	private transient Kryo kryo;
 	private transient T copyInstance;
@@ -44,21 +47,13 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
 	private transient Output output;
 
 	public KryoSerializer(Class<T> type){
-		this(type,type);
-	}
-
-	public KryoSerializer(Class<T> type, Class<? extends T> typeToInstantiate){
-		if(type == null || typeToInstantiate == null){
+		if(type == null){
 			throw new NullPointerException("Type class cannot be null.");
 		}
-
 		this.type = type;
-		this.typeToInstantiate = typeToInstantiate;
-		kryo = new Kryo();
-		kryo.setAsmEnabled(true);
-		kryo.register(type);
 	}
 
+
 	@Override
 	public boolean isImmutableType() {
 		return false;
@@ -71,20 +66,36 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
 
 	@Override
 	public T createInstance() {
-		checkKryoInitialized();
-		return kryo.newInstance(typeToInstantiate);
+		return null;
 	}
 
 	@Override
 	public T copy(T from) {
+		if(from == null) {
+			return null;
+		}
 		checkKryoInitialized();
-		return kryo.copy(from);
+		try {
+			return kryo.copy(from);
+		} catch(KryoException ke) {
+			// kryo was unable to copy it, so we do it through serialization:
+			ByteArrayOutputStream baout = new ByteArrayOutputStream();
+			Output output = new Output(baout);
+
+			kryo.writeObject(output, from);
+
+			output.flush();
+
+			ByteArrayInputStream bain = new ByteArrayInputStream(baout.toByteArray());
+			Input input = new Input(bain);
+
+			return (T)kryo.readObject(input, from.getClass());
+		}
 	}
 	
 	@Override
 	public T copy(T from, T reuse) {
-		checkKryoInitialized();
-		return kryo.copy(from);
+		return copy(from);
 	}
 
 	@Override
@@ -101,7 +112,7 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
 			previousOut = target;
 		}
 		
-		kryo.writeObject(output, record);
+		kryo.writeClassAndObject(output, record);
 		output.flush();
 	}
 
@@ -113,7 +124,7 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
 			input = new NoFetchingInput(inputStream);
 			previousIn = source;
 		}
-		return kryo.readObject(input, typeToInstantiate);
+		return (T) kryo.readClassAndObject(input);
 	}
 	
 	@Override
@@ -136,14 +147,14 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
 	
 	@Override
 	public int hashCode() {
-		return type.hashCode() + 31 * typeToInstantiate.hashCode();
+		return type.hashCode();
 	}
 	
 	@Override
 	public boolean equals(Object obj) {
 		if (obj != null && obj instanceof KryoSerializer) {
 			KryoSerializer<?> other = (KryoSerializer<?>) obj;
-			return other.type == this.type && other.typeToInstantiate == this.typeToInstantiate;
+			return other.type == this.type;
 		} else {
 			return false;
 		}
@@ -153,9 +164,10 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
 
 	private void checkKryoInitialized() {
 		if (this.kryo == null) {
-			this.kryo = new Kryo();
-			this.kryo.setAsmEnabled(true);
+			this.kryo = new ScalaKryoInstantiator().newKryo();
+			this.kryo.setRegistrationRequired(false);
 			this.kryo.register(type);
+			this.kryo.setClassLoader(Thread.currentThread().getContextClassLoader());
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a70aa67a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java
index e5ac1ca..893e63c 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java
@@ -45,7 +45,7 @@ import com.google.common.collect.HashMultiset;
 
 /**
  *  Pojo Type tests
- *  
+ *
  *  A Pojo is a bean-style class with getters, setters and empty ctor
  *   OR a class with all fields public (or for every private field, there has to be a public getter/setter)
  *   everything else is a generic type (that can't be used for field selection)
@@ -55,12 +55,12 @@ public class PojoTypeExtractionTest {
 	public static class HasDuplicateField extends WC {
 		private int count; // duplicate
 	}
-	
+
 	@Test(expected=RuntimeException.class)
 	public void testDuplicateFieldException() {
 		TypeExtractor.createTypeInfo(HasDuplicateField.class);
 	}
-	
+
 	// test with correct pojo types
 	public static class WC { // is a pojo
 		public ComplexNestedClass complex; // is a pojo
@@ -84,6 +84,7 @@ public class PojoTypeExtractionTest {
 		public Tuple3<Long, Long, String> word; //Tuple Type with three basic types
 		public Object nothing; // generic type
 		public MyWritable hadoopCitizen;  // writableType
+		public List<String> collection;
 	}
 
 	// all public test
@@ -92,7 +93,7 @@ public class PojoTypeExtractionTest {
 		public HashMultiset<Integer> fancyIds; // generic type
 		public String[]	fancyArray;			 // generic type
 	}
-	
+
 	public static class ParentSettingGenerics extends PojoWithGenerics<Integer, Long> {
 		public String field3;
 	}
@@ -101,16 +102,16 @@ public class PojoTypeExtractionTest {
 		public T1 field1;
 		public T2 field2;
 	}
-	
+
 	public static class ComplexHierarchyTop extends ComplexHierarchy<Tuple1<String>> {}
 	public static class ComplexHierarchy<T> extends PojoWithGenerics<FromTuple,T> {}
-	
+
 	// extends from Tuple and adds a field
 	public static class FromTuple extends Tuple3<String, String, Long> {
 		private static final long serialVersionUID = 1L;
 		public int special;
 	}
-	
+
 	public static class IncorrectPojo {
 		private int isPrivate;
 		public int getIsPrivate() {
@@ -118,7 +119,7 @@ public class PojoTypeExtractionTest {
 		}
 		// setter is missing (intentional)
 	}
-	
+
 	// correct pojo
 	public static class BeanStylePojo {
 		public String abc;
@@ -136,7 +137,7 @@ public class PojoTypeExtractionTest {
 			this.a = a;
 		}
 	}
-	
+
 	// in this test, the location of the getters and setters is mixed across the type hierarchy.
 	public static class TypedPojoGetterSetterCheck extends GenericPojoGetterSetterCheck<String> {
 		public void setPackageProtected(String in) {
@@ -149,50 +150,64 @@ public class PojoTypeExtractionTest {
 			return packageProtected;
 		}
 	}
-	
+
 	@Test
 	public void testIncorrectPojos() {
 		TypeInformation<?> typeForClass = TypeExtractor.createTypeInfo(IncorrectPojo.class);
 		Assert.assertTrue(typeForClass instanceof GenericTypeInfo<?>);
-		
+
 		typeForClass = TypeExtractor.createTypeInfo(WrongCtorPojo.class);
 		Assert.assertTrue(typeForClass instanceof GenericTypeInfo<?>);
 	}
-	
+
 	@Test
 	public void testCorrectPojos() {
 		TypeInformation<?> typeForClass = TypeExtractor.createTypeInfo(BeanStylePojo.class);
 		Assert.assertTrue(typeForClass instanceof PojoTypeInfo<?>);
-		
+
 		typeForClass = TypeExtractor.createTypeInfo(TypedPojoGetterSetterCheck.class);
 		Assert.assertTrue(typeForClass instanceof PojoTypeInfo<?>);
 	}
-	
+
 	@Test
 	public void testPojoWC() {
 		TypeInformation<?> typeForClass = TypeExtractor.createTypeInfo(WC.class);
 		checkWCPojoAsserts(typeForClass);
-		
+
 		WC t = new WC();
 		t.complex = new ComplexNestedClass();
 		TypeInformation<?> typeForObject = TypeExtractor.getForObject(t);
 		checkWCPojoAsserts(typeForObject);
 	}
-	
+
 	private void checkWCPojoAsserts(TypeInformation<?> typeInfo) {
 		Assert.assertFalse(typeInfo.isBasicType());
 		Assert.assertFalse(typeInfo.isTupleType());
-		Assert.assertEquals(9, typeInfo.getTotalFields());
+		Assert.assertEquals(10, typeInfo.getTotalFields());
 		Assert.assertTrue(typeInfo instanceof PojoTypeInfo);
 		PojoTypeInfo<?> pojoType = (PojoTypeInfo<?>) typeInfo;
-		
+
 		List<FlatFieldDescriptor> ffd = new ArrayList<FlatFieldDescriptor>();
-		String[] fields = {"count","complex.date", "complex.hadoopCitizen", "complex.nothing",
-				"complex.someFloat", "complex.someNumber", "complex.word.f0",
-				"complex.word.f1", "complex.word.f2"};
-		int[] positions = {8,0,1,2,
-				3,4,5,
-				6,7};
+		String[] fields = {"count",
+				"complex.date",
+				"complex.hadoopCitizen",
+				"complex.collection",
+				"complex.nothing",
+				"complex.someFloat",
+				"complex.someNumber",
+				"complex.word.f0",
+				"complex.word.f1",
+				"complex.word.f2"};
+		int[] positions = {9,
+				1,
+				2,
+				0,
+				3,
+				4,
+				5,
+				6,
+				7,
+				8};
 		Assert.assertEquals(fields.length, positions.length);
 		for(int i = 0; i < fields.length; i++) {
 			pojoType.getKey(fields[i], 0, ffd);
@@ -200,86 +215,93 @@ public class PojoTypeExtractionTest {
 			Assert.assertEquals("position of field "+fields[i]+" wrong", positions[i], ffd.get(0).getPosition());
 			ffd.clear();
 		}
-		
+
 		pojoType.getKey("complex.word.*", 0, ffd);
 		Assert.assertEquals(3, ffd.size());
 		// check if it returns 5,6,7
 		for(FlatFieldDescriptor ffdE : ffd) {
 			final int pos = ffdE.getPosition();
-			Assert.assertTrue(pos <= 7 );
-			Assert.assertTrue(5 <= pos );
-			if(pos == 5) {
-				Assert.assertEquals(Long.class, ffdE.getType().getTypeClass());
-			}
+			Assert.assertTrue(pos <= 8 );
+			Assert.assertTrue(6 <= pos );
 			if(pos == 6) {
 				Assert.assertEquals(Long.class, ffdE.getType().getTypeClass());
 			}
 			if(pos == 7) {
+				Assert.assertEquals(Long.class, ffdE.getType().getTypeClass());
+			}
+			if(pos == 8) {
 				Assert.assertEquals(String.class, ffdE.getType().getTypeClass());
 			}
 		}
 		ffd.clear();
-		
+
 		// scala style full tuple selection for pojos
 		pojoType.getKey("complex.word._", 0, ffd);
 		Assert.assertEquals(3, ffd.size());
 		ffd.clear();
-		
+
 		pojoType.getKey("complex.*", 0, ffd);
-		Assert.assertEquals(8, ffd.size());
+		Assert.assertEquals(9, ffd.size());
 		// check if it returns 0-7
 		for(FlatFieldDescriptor ffdE : ffd) {
 			final int pos = ffdE.getPosition();
-			Assert.assertTrue(ffdE.getPosition() <= 7 );
+			Assert.assertTrue(ffdE.getPosition() <= 8 );
 			Assert.assertTrue(0 <= ffdE.getPosition() );
+
 			if(pos == 0) {
-				Assert.assertEquals(Date.class, ffdE.getType().getTypeClass());
+				Assert.assertEquals(List.class, ffdE.getType().getTypeClass());
 			}
 			if(pos == 1) {
-				Assert.assertEquals(MyWritable.class, ffdE.getType().getTypeClass());
+				Assert.assertEquals(Date.class, ffdE.getType().getTypeClass());
 			}
 			if(pos == 2) {
-				Assert.assertEquals(Object.class, ffdE.getType().getTypeClass());
+				Assert.assertEquals(MyWritable.class, ffdE.getType().getTypeClass());
 			}
 			if(pos == 3) {
-				Assert.assertEquals(Float.class, ffdE.getType().getTypeClass());
+				Assert.assertEquals(Object.class, ffdE.getType().getTypeClass());
 			}
 			if(pos == 4) {
-				Assert.assertEquals(Integer.class, ffdE.getType().getTypeClass());
+				Assert.assertEquals(Float.class, ffdE.getType().getTypeClass());
 			}
 			if(pos == 5) {
-				Assert.assertEquals(Long.class, ffdE.getType().getTypeClass());
+				Assert.assertEquals(Integer.class, ffdE.getType().getTypeClass());
 			}
 			if(pos == 6) {
 				Assert.assertEquals(Long.class, ffdE.getType().getTypeClass());
 			}
 			if(pos == 7) {
+				Assert.assertEquals(Long.class, ffdE.getType().getTypeClass());
+			}
+			if(pos == 8) {
 				Assert.assertEquals(String.class, ffdE.getType().getTypeClass());
 			}
+			if(pos == 9) {
+				Assert.assertEquals(Integer.class, ffdE.getType().getTypeClass());
+			}
 		}
 		ffd.clear();
-		
+
 		pojoType.getKey("*", 0, ffd);
-		Assert.assertEquals(9, ffd.size());
+		Assert.assertEquals(10, ffd.size());
 		// check if it returns 0-8
 		for(FlatFieldDescriptor ffdE : ffd) {
-			Assert.assertTrue(ffdE.getPosition() <= 8 );
+			Assert.assertTrue(ffdE.getPosition() <= 9 );
 			Assert.assertTrue(0 <= ffdE.getPosition() );
-			if(ffdE.getPosition() == 8) {
+			if(ffdE.getPosition() == 9) {
 				Assert.assertEquals(Integer.class, ffdE.getType().getTypeClass());
 			}
 		}
 		ffd.clear();
-		
+
 		TypeInformation<?> typeComplexNested = pojoType.getTypeAt(0); // ComplexNestedClass complex
 		Assert.assertTrue(typeComplexNested instanceof PojoTypeInfo);
-		
-		Assert.assertEquals(6, typeComplexNested.getArity());
-		Assert.assertEquals(8, typeComplexNested.getTotalFields());
+
+		Assert.assertEquals(7, typeComplexNested.getArity());
+		Assert.assertEquals(9, typeComplexNested.getTotalFields());
 		PojoTypeInfo<?> pojoTypeComplexNested = (PojoTypeInfo<?>) typeComplexNested;
-		
+
 		boolean dateSeen = false, intSeen = false, floatSeen = false,
-				tupleSeen = false, objectSeen = false, writableSeen = false;
+				tupleSeen = false, objectSeen = false, writableSeen = false, collectionSeen = false;
 		for(int i = 0; i < pojoTypeComplexNested.getArity(); i++) {
 			PojoField field = pojoTypeComplexNested.getPojoFieldAt(i);
 			String name = field.field.getName();
@@ -330,6 +352,13 @@ public class PojoTypeExtractionTest {
 				writableSeen = true;
 				Assert.assertEquals(new WritableTypeInfo<MyWritable>(MyWritable.class), field.type);
 				Assert.assertEquals(MyWritable.class, field.type.getTypeClass());
+			} else if(name.equals("collection")) {
+				if(collectionSeen) {
+					Assert.fail("already seen");
+				}
+				collectionSeen = true;
+				Assert.assertEquals(new GenericTypeInfo(List.class), field.type);
+
 			} else {
 				Assert.fail("field "+field+" is not expected");
 			}
@@ -340,29 +369,29 @@ public class PojoTypeExtractionTest {
 		Assert.assertTrue("Field was not present", tupleSeen);
 		Assert.assertTrue("Field was not present", objectSeen);
 		Assert.assertTrue("Field was not present", writableSeen);
-		
+		Assert.assertTrue("Field was not present", collectionSeen);
+
 		TypeInformation<?> typeAtOne = pojoType.getTypeAt(1); // int count
 		Assert.assertTrue(typeAtOne instanceof BasicTypeInfo);
-		
+
 		Assert.assertEquals(typeInfo.getTypeClass(), WC.class);
 		Assert.assertEquals(typeInfo.getArity(), 2);
 	}
 
 	// Kryo is required for this, so disable for now.
-	@Ignore
 	@Test
 	public void testPojoAllPublic() {
 		TypeInformation<?> typeForClass = TypeExtractor.createTypeInfo(AllPublic.class);
 		checkAllPublicAsserts(typeForClass);
-		
+
 		TypeInformation<?> typeForObject = TypeExtractor.getForObject(new AllPublic() );
 		checkAllPublicAsserts(typeForObject);
 	}
-	
+
 	private void checkAllPublicAsserts(TypeInformation<?> typeInformation) {
 		Assert.assertTrue(typeInformation instanceof PojoTypeInfo);
-		Assert.assertEquals(9, typeInformation.getArity());
-		Assert.assertEquals(11, typeInformation.getTotalFields());
+		Assert.assertEquals(10, typeInformation.getArity());
+		Assert.assertEquals(12, typeInformation.getTotalFields());
 		// check if the three additional fields are identified correctly
 		boolean arrayListSeen = false, multisetSeen = false, strArraySeen = false;
 		PojoTypeInfo<?> pojoTypeForClass = (PojoTypeInfo<?>) typeInformation;
@@ -390,9 +419,9 @@ public class PojoTypeExtractionTest {
 				strArraySeen = true;
 				Assert.assertEquals(BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO, field.type);
 				Assert.assertEquals(String[].class, field.type.getTypeClass());
-			} else if(Arrays.asList("date", "someNumber", "someFloat", "word", "nothing", "hadoopCitizen").contains(name)) {
+			} else if(Arrays.asList("date", "someNumber", "someFloat", "word", "nothing", "hadoopCitizen", "collection").contains(name)) {
 				// ignore these, they are inherited from the ComplexNestedClass
-			} 
+			}
 			else {
 				Assert.fail("field "+field+" is not expected");
 			}
@@ -401,18 +430,18 @@ public class PojoTypeExtractionTest {
 		Assert.assertTrue("Field was not present", multisetSeen);
 		Assert.assertTrue("Field was not present", strArraySeen);
 	}
-	
+
 	@Test
 	public void testPojoExtendingTuple() {
 		TypeInformation<?> typeForClass = TypeExtractor.createTypeInfo(FromTuple.class);
 		checkFromTuplePojo(typeForClass);
-		
+
 		FromTuple ft = new FromTuple();
 		ft.f0 = ""; ft.f1 = ""; ft.f2 = 0L;
 		TypeInformation<?> typeForObject = TypeExtractor.getForObject(ft);
 		checkFromTuplePojo(typeForObject);
 	}
-	
+
 	private void checkFromTuplePojo(TypeInformation<?> typeInformation) {
 		Assert.assertTrue(typeInformation instanceof PojoTypeInfo<?>);
 		Assert.assertEquals(4, typeInformation.getTotalFields());
@@ -431,7 +460,7 @@ public class PojoTypeExtractionTest {
 			}
 		}
 	}
-	
+
 	@Test
 	public void testPojoWithGenerics() {
 		TypeInformation<?> typeForClass = TypeExtractor.createTypeInfo(ParentSettingGenerics.class);
@@ -453,13 +482,12 @@ public class PojoTypeExtractionTest {
 			}
 		}
 	}
-	
+
 	/**
 	 * Test if the TypeExtractor is accepting untyped generics,
 	 * making them GenericTypes
 	 */
 	@Test
-	@Ignore // kryo needed.
 	public void testPojoWithGenericsSomeFieldsGeneric() {
 		TypeInformation<?> typeForClass = TypeExtractor.createTypeInfo(PojoWithGenerics.class);
 		Assert.assertTrue(typeForClass instanceof PojoTypeInfo<?>);
@@ -478,8 +506,8 @@ public class PojoTypeExtractionTest {
 			}
 		}
 	}
-	
-	
+
+
 	@Test
 	public void testPojoWithComplexHierarchy() {
 		TypeInformation<?> typeForClass = TypeExtractor.createTypeInfo(ComplexHierarchyTop.class);
@@ -554,10 +582,10 @@ public class PojoTypeExtractionTest {
 		public VertexTyped() {
 		}
 	}
-	
+
 	@Test
 	public void testGetterSetterWithVertex() {
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		DataSet<VertexTyped> set = env.fromElements(new VertexTyped(0L, 3.0), new VertexTyped(1L, 1.0));
 	}
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a70aa67a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AbstractGenericTypeSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AbstractGenericTypeSerializerTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AbstractGenericTypeSerializerTest.java
index cacc05b..d604105 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AbstractGenericTypeSerializerTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AbstractGenericTypeSerializerTest.java
@@ -143,7 +143,7 @@ abstract public class AbstractGenericTypeSerializerTest {
 		}
 	}
 
-	private final <T> void runTests(T... instances) {
+	protected final <T> void runTests(T... instances) {
 		if (instances == null || instances.length == 0) {
 			throw new IllegalArgumentException();
 		}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a70aa67a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericTypeComparatorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericTypeComparatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericTypeComparatorTest.java
index c6ef4db..37dba4e 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericTypeComparatorTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericTypeComparatorTest.java
@@ -25,4 +25,4 @@ public class KryoGenericTypeComparatorTest extends AbstractGenericTypeComparator
 	protected <T> TypeSerializer<T> createSerializer(Class<T> type) {
 		return new KryoSerializer<T>(type);
 	}
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a70aa67a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericTypeSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericTypeSerializerTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericTypeSerializerTest.java
index f6fc987..3c22b15 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericTypeSerializerTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericTypeSerializerTest.java
@@ -18,11 +18,53 @@
 
 package org.apache.flink.api.java.typeutils.runtime;
 
+
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.LinkedList;
 
 public class KryoGenericTypeSerializerTest extends AbstractGenericTypeSerializerTest {
+
+	@Test
+	public void testJavaList(){
+		Collection<Integer> a = new ArrayList<Integer>();
+
+		fillCollection(a);
+
+		runTests(a);
+	}
+
+	@Test
+	public void testJavaSet(){
+		Collection<Integer> b = new HashSet<Integer>();
+
+		fillCollection(b);
+
+		runTests(b);
+	}
+
+	@Test
+	public void testJavaDequeue(){
+		Collection<Integer> c = new LinkedList<Integer>();
+
+		fillCollection(c);
+
+		runTests(c);
+	}
+
+	private void fillCollection(Collection<Integer> coll){
+		coll.add(42);
+		coll.add(1337);
+		coll.add(49);
+		coll.add(1);
+	}
+
 	@Override
 	protected <T> TypeSerializer<T> createSerializer(Class<T> type) {
 		return new KryoSerializer<T>(type);
 	}
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a70aa67a/flink-scala/pom.xml
----------------------------------------------------------------------
diff --git a/flink-scala/pom.xml b/flink-scala/pom.xml
index dab1bd5..5139fb5 100644
--- a/flink-scala/pom.xml
+++ b/flink-scala/pom.xml
@@ -95,6 +95,14 @@ under the License.
 			<scope>test</scope>
 		</dependency>
 
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-core</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+			<type>test-jar</type>
+		</dependency>
+
 	</dependencies>
 
 	<build>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a70aa67a/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/KryoGenericTypeSerializerTest.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/KryoGenericTypeSerializerTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/KryoGenericTypeSerializerTest.scala
new file mode 100644
index 0000000..ddbe322
--- /dev/null
+++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/KryoGenericTypeSerializerTest.scala
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.runtime
+
+import org.apache.flink.api.common.typeutils.SerializerTestInstance
+import org.apache.flink.api.java.typeutils.GenericTypeInfo
+import org.junit.Test
+
+import scala.reflect._
+
+class KryoGenericTypeSerializerTest {
+
+  @Test
+  def testScalaListSerialization: Unit = {
+    val a = List(42,1,49,1337)
+
+    runTests(a)
+  }
+
+  @Test
+  def testScalaMutablelistSerialization: Unit = {
+    val a = scala.collection.mutable.ListBuffer(42,1,49,1337)
+
+    runTests(a)
+  }
+
+  @Test
+  def testScalaMapSerialization: Unit = {
+    val a = Map(("1" -> 1), ("2" -> 2), ("42" -> 42), ("1337" -> 1337))
+
+    runTests(a)
+  }
+
+  @Test
+  def testMutableMapSerialization: Unit ={
+    val a = scala.collection.mutable.Map((1 -> "1"), (2 -> "2"), (3 -> "3"))
+
+    runTests(a)
+  }
+
+  @Test
+  def testScalaListComplexTypeSerialization: Unit = {
+    val a = ComplexType("1234", 42, List(1,2,3,4))
+    val b = ComplexType("4321", 24, List(4,3,2,1))
+    val c = ComplexType("1337", 1, List(1))
+    val list = List(a, b, c)
+
+    runTests(list)
+  }
+
+  @Test
+  def testHeterogenousScalaList: Unit = {
+    val a = new DerivedType("foo", "bar")
+    val b = new BaseType("foobar")
+    val c = new DerivedType2("bar", "foo")
+    val list = List(a,b,c)
+
+    runTests(list)
+  }
+
+  case class ComplexType(id: String, number: Int, values: List[Int]){
+    override def equals(obj: Any): Boolean ={
+      if(obj != null && obj.isInstanceOf[ComplexType]){
+        val complexType = obj.asInstanceOf[ComplexType]
+        id.equals(complexType.id) && number.equals(complexType.number) && values.equals(
+          complexType.values)
+      }else{
+        false
+      }
+    }
+  }
+
+  class BaseType(val name: String){
+    override def equals(obj: Any): Boolean = {
+      if(obj != null && obj.isInstanceOf[BaseType]){
+        obj.asInstanceOf[BaseType].name.equals(name)
+      }else{
+        false
+      }
+    }
+  }
+
+  class DerivedType(name: String, val sub: String) extends BaseType(name){
+    override def equals(obj: Any): Boolean = {
+      if(obj != null && obj.isInstanceOf[DerivedType]){
+        super.equals(obj) && obj.asInstanceOf[DerivedType].sub.equals(sub)
+      }else{
+        false
+      }
+    }
+  }
+
+  class DerivedType2(name: String, val sub: String) extends BaseType(name){
+    override def equals(obj: Any): Boolean = {
+      if(obj != null && obj.isInstanceOf[DerivedType2]){
+        super.equals(obj) && obj.asInstanceOf[DerivedType2].sub.equals(sub)
+      }else{
+        false
+      }
+    }
+  }
+
+  def runTests[T : ClassTag](objects: T *): Unit ={
+    val clsTag = classTag[T]
+    val typeInfo = new GenericTypeInfo[T](clsTag.runtimeClass.asInstanceOf[Class[T]])
+    val serializer = typeInfo.createSerializer()
+    val typeClass = typeInfo.getTypeClass
+
+    val instance = new SerializerTestInstance[T](serializer, typeClass, -1, objects: _*)
+
+    instance.testAll()
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a70aa67a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
index 60a0d89..8994ba9 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
@@ -53,7 +53,7 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 @RunWith(Parameterized.class)
 public class GroupReduceITCase extends JavaProgramTestBase {
 	
-	private static int NUM_PROGRAMS = 26;
+	private static int NUM_PROGRAMS = 28;
 	
 	private int curProgId = config.getInteger("ProgramId", -1);
 	private String resultPath;
@@ -763,7 +763,74 @@ public class GroupReduceITCase extends JavaProgramTestBase {
 					// return expected result
 					return "b\nccc\nee\n";
 				}
-				
+
+				case 27: {
+					/*
+					 * Test Java collections within pojos ( == test kryo)
+					 */
+					final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+					env.setDegreeOfParallelism(1);
+
+					DataSet<CollectionDataSets.PojoWithCollection> ds = CollectionDataSets.getPojoWithCollection(env);
+					// f0.f0 is first integer
+					DataSet<String> reduceDs = ds.groupBy("key")
+							.reduceGroup(new GroupReduceFunction<CollectionDataSets.PojoWithCollection, String>() {
+								@Override
+								public void reduce(
+										Iterable<CollectionDataSets.PojoWithCollection> values,
+										Collector<String> out) throws Exception {
+									StringBuilder concat = new StringBuilder();
+									concat.append("call");
+									for(CollectionDataSets.PojoWithCollection value : values) {
+										concat.append("For key "+value.key+" we got: ");
+										for(CollectionDataSets.Pojo1 p :value.pojos) {
+											concat.append("pojo.a="+p.a);
+										}
+									}
+									out.collect(concat.toString());
+								}
+							});
+					reduceDs.writeAsText(resultPath);
+					env.execute();
+
+					// return expected result
+					return "callFor key 0 we got: pojo.a=apojo.a=bFor key 0 we got: pojo.a=a2pojo.a=b2\n";
+				}
+
+				case 28: {
+					/*
+					 * Group by generic type
+					 */
+					final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+					env.setDegreeOfParallelism(1);
+
+					DataSet<CollectionDataSets.PojoWithCollection> ds = CollectionDataSets.getPojoWithCollection(env);
+					// f0.f0 is first integer
+					DataSet<String> reduceDs = ds.groupBy("bigInt")
+							.reduceGroup(new GroupReduceFunction<CollectionDataSets.PojoWithCollection, String>() {
+								@Override
+								public void reduce(
+										Iterable<CollectionDataSets.PojoWithCollection> values,
+										Collector<String> out) throws Exception {
+									StringBuilder concat = new StringBuilder();
+									concat.append("call");
+									for(CollectionDataSets.PojoWithCollection value : values) {
+										concat.append("\nFor key "+value.bigInt+" we got:\n"+value);
+									}
+									out.collect(concat.toString());
+								}
+							});
+					reduceDs.writeAsText(resultPath);
+					env.execute();
+
+					// return expected result
+					return "call\n" +
+							"For key 92233720368547758070 we got:\n" +
+							"PojoWithCollection{pojos.size()=2, key=0, sqlDate=2033-05-18, bigInt=92233720368547758070, bigDecimalKeepItNull=null, scalaBigInt=10, mixed=[{someKey=1}, /this/is/wrong, uhlala]}\n" +
+							"For key 92233720368547758070 we got:\n" +
+							"PojoWithCollection{pojos.size()=2, key=0, sqlDate=1976-05-03, bigInt=92233720368547758070, bigDecimalKeepItNull=null, scalaBigInt=31104000, mixed=null}\n";
+				}
+
 				default: {
 					throw new IllegalArgumentException("Invalid program id");
 				}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a70aa67a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java
index 1f812d9..895e996 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java
@@ -18,11 +18,17 @@
 
 package org.apache.flink.test.javaApiOperators.util;
 
+import java.io.File;
 import java.io.Serializable;
+import java.math.BigDecimal;
+import java.math.BigInteger;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Date;
+import java.util.HashMap;
+import java.util.Hashtable;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -33,6 +39,7 @@ import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.hadoop.io.IntWritable;
+import scala.math.BigInt;
 
 /**
  * #######################################################################################################
@@ -496,6 +503,13 @@ public class CollectionDataSets {
 	public static class Pojo1 {
 		public String a;
 		public String b;
+
+		public Pojo1() {}
+
+		public Pojo1(String a, String b) {
+			this.a = a;
+			this.b = b;
+		}
 	}
 
 	public static class Pojo2 {
@@ -561,5 +575,68 @@ public class CollectionDataSets {
 		return env.fromCollection(data);
 	}
 
+	public static class PojoWithCollection {
+		public List<Pojo1> pojos;
+		public int key;
+		public java.sql.Date sqlDate;
+		public BigInteger bigInt;
+		public BigDecimal bigDecimalKeepItNull;
+		public BigInt scalaBigInt;
+		public List<Object> mixed;
+
+		@Override
+		public String toString() {
+			return "PojoWithCollection{" +
+					"pojos.size()=" + pojos.size() +
+					", key=" + key +
+					", sqlDate=" + sqlDate +
+					", bigInt=" + bigInt +
+					", bigDecimalKeepItNull=" + bigDecimalKeepItNull +
+					", scalaBigInt=" + scalaBigInt +
+					", mixed=" + mixed +
+					'}';
+		}
+	}
+
+	public static DataSet<PojoWithCollection> getPojoWithCollection(ExecutionEnvironment env) {
+		List<PojoWithCollection> data = new ArrayList<PojoWithCollection>();
+
+		List<Pojo1> pojosList1 = new ArrayList<Pojo1>();
+		pojosList1.add(new Pojo1("a", "aa"));
+		pojosList1.add(new Pojo1("b", "bb"));
+
+		List<Pojo1> pojosList2 = new ArrayList<Pojo1>();
+		pojosList2.add(new Pojo1("a2", "aa2"));
+		pojosList2.add(new Pojo1("b2", "bb2"));
+
+		PojoWithCollection pwc1 = new PojoWithCollection();
+		pwc1.pojos = pojosList1;
+		pwc1.key = 0;
+		pwc1.bigInt = BigInteger.valueOf(Long.MAX_VALUE).multiply(BigInteger.TEN);
+		pwc1.scalaBigInt = BigInt.int2bigInt(10);
+		pwc1.bigDecimalKeepItNull = null;
+		pwc1.sqlDate = new java.sql.Date(2000000000000L); // 2033 ;)
+		pwc1.mixed = new ArrayList<Object>();
+		Map<String, Integer> map = new HashMap<String, Integer>();
+		map.put("someKey", 1); // map.put("anotherKey", 2); map.put("third", 3);
+		pwc1.mixed.add(map);
+		pwc1.mixed.add(new File("/this/is/wrong"));
+		pwc1.mixed.add("uhlala");
+
+		PojoWithCollection pwc2 = new PojoWithCollection();
+		pwc2.pojos = pojosList2;
+		pwc2.key = 0;
+		pwc2.bigInt = BigInteger.valueOf(Long.MAX_VALUE).multiply(BigInteger.TEN);
+		pwc2.scalaBigInt = BigInt.int2bigInt(31104000);
+		pwc2.bigDecimalKeepItNull = null;
+		pwc2.sqlDate = new java.sql.Date(200000000000L); // 1976
+
+
+		data.add(pwc1);
+		data.add(pwc2);
+
+		return env.fromCollection(data);
+	}
+
 }