You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2014/08/08 15:30:14 UTC

[3/3] git commit: Disable POJO types

Disable POJO types

There are various issues when working with POJOs in the Java API, for
example [1].

This commit squashes the following commmits:

1. Disable tests, which depend on POJO types

2. Replace expression keys with key selector in test

3. PackagedProgramEndToEndITCase used a KMeans variant, which relies on
expression keys to select the field to group on. This commit replaces
this with a key selector.

4. Disable WordCountPOJO test case and package

5. Disables WordCountPOJOITCase, which executes the WordCountPOJO example.
The example code is still included with the source code (with notice
that it is currently not working), but will *not* be packaged as a JAR
anymore.

6. Disable tests, which depend on POJO types (continued)

[1] https://mail-archives.apache.org/mod_mbox/incubator-flink-dev/201407.mbox/%3C53D96049.1060509%40cse.uta.edu%3E

This closes #93.


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/cb81fac8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/cb81fac8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/cb81fac8

Branch: refs/heads/release-0.6
Commit: cb81fac88b25d2dcadca32c11e88fba293d97700
Parents: 18f13ea
Author: uce <u....@fu-berlin.de>
Authored: Fri Aug 8 11:18:52 2014 +0200
Committer: Robert Metzger <rm...@apache.org>
Committed: Fri Aug 8 15:29:14 2014 +0200

----------------------------------------------------------------------
 .../io/AvroInputFormatTypeExtractionTest.java   | 38 ++++++++++----------
 flink-examples/flink-java-examples/pom.xml      |  8 +++--
 .../example/java/wordcount/WordCountPOJO.java   |  9 +++++
 .../flink/api/java/typeutils/TypeExtractor.java | 10 +++---
 .../api/java/operator/CoGroupOperatorTest.java  |  7 +++-
 .../flink/api/java/operator/GroupingTest.java   |  5 +++
 .../api/java/operator/JoinOperatorTest.java     |  5 +++
 .../type/extractor/PojoTypeInformationTest.java |  7 +++-
 .../java/type/extractor/TypeExtractorTest.java  | 12 +++----
 .../typeutils/runtime/PojoSerializerTest.java   | 32 +++++++++--------
 .../WordCountPOJOITCase.java                    |  3 +-
 .../PackagedProgramEndToEndITCase.java          |  1 -
 .../flink/test/util/testjar/KMeansForTest.java  |  8 ++++-
 13 files changed, 91 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cb81fac8/flink-addons/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroInputFormatTypeExtractionTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroInputFormatTypeExtractionTest.java b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroInputFormatTypeExtractionTest.java
index aa08006..efe45fd 100644
--- a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroInputFormatTypeExtractionTest.java
+++ b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroInputFormatTypeExtractionTest.java
@@ -19,16 +19,15 @@
 
 package org.apache.flink.api.java.io;
 
-import org.junit.Assert;
-import org.junit.Test;
 import org.apache.flink.api.common.io.InputFormat;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.io.AvroInputFormat;
-import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.types.TypeInformation;
+import org.junit.Assert;
+import org.junit.Test;
 
 public class AvroInputFormatTypeExtractionTest {
 
@@ -36,44 +35,43 @@ public class AvroInputFormatTypeExtractionTest {
 	public void testTypeExtraction() {
 		try {
 			InputFormat<MyAvroType, ?> format = new AvroInputFormat<MyAvroType>(new Path("file:///ignore/this/file"), MyAvroType.class);
-			
+
 			TypeInformation<?> typeInfoDirect = TypeExtractor.getInputFormatTypes(format);
-			
+
 			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 			DataSet<MyAvroType> input = env.createInput(format);
 			TypeInformation<?> typeInfoDataSet = input.getType();
-			
-			
-			Assert.assertTrue(typeInfoDirect instanceof PojoTypeInfo);
-			Assert.assertTrue(typeInfoDataSet instanceof PojoTypeInfo);
-			
+
+
+			Assert.assertTrue(typeInfoDirect instanceof GenericTypeInfo);
+			Assert.assertTrue(typeInfoDataSet instanceof GenericTypeInfo);
+
 			Assert.assertEquals(MyAvroType.class, typeInfoDirect.getTypeClass());
 			Assert.assertEquals(MyAvroType.class, typeInfoDataSet.getTypeClass());
-		}
-		catch (Exception e) {
+		} catch (Exception e) {
 			e.printStackTrace();
 			Assert.fail(e.getMessage());
 		}
 	}
-	
+
 	public static final class MyAvroType {
-		
+
 		public String theString;
-		
+
 		private double aDouble;
-		
+
 		public double getaDouble() {
 			return aDouble;
 		}
-		
+
 		public void setaDouble(double aDouble) {
 			this.aDouble = aDouble;
 		}
-		
+
 		public void setTheString(String theString) {
 			this.theString = theString;
 		}
-		
+
 		public String getTheString() {
 			return theString;
 		}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cb81fac8/flink-examples/flink-java-examples/pom.xml
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/pom.xml b/flink-examples/flink-java-examples/pom.xml
index ea0db5d..97ebdec 100644
--- a/flink-examples/flink-java-examples/pom.xml
+++ b/flink-examples/flink-java-examples/pom.xml
@@ -291,7 +291,10 @@ under the License.
 						</configuration>
 					</execution>
 
-					<!-- WordCountPOJO -->
+					<!-- WordCountPOJO
+
+					Note: disabled for now, because of problems with POJO types
+
 					<execution>
 						<id>WordCountPOJO</id>
 						<phase>package</phase>
@@ -313,8 +316,7 @@ under the License.
 								<include>**/java/wordcount/util/WordCountData.class</include>
 							</includes>
 						</configuration>
-					</execution>
-					
+					</execution> -->
 				</executions>
 			</plugin>
 		</plugins>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cb81fac8/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/wordcount/WordCountPOJO.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/wordcount/WordCountPOJO.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/wordcount/WordCountPOJO.java
index 04810a1..d993e60 100644
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/wordcount/WordCountPOJO.java
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/wordcount/WordCountPOJO.java
@@ -53,6 +53,15 @@ public class WordCountPOJO {
 	// *************************************************************************
 
 	public static void main(String[] args) throws Exception {
+		// ====================================================================
+		// IMPORTANT
+		//
+		// Note: this example is currently not working, because support for
+		// POJO types has been disabled. As soon as all known issues (see [1])
+		// are fixed, we will enable POJO support again.
+		//
+		// [1] https://mail-archives.apache.org/mod_mbox/incubator-flink-dev/201407.mbox/%3C53D96049.1060509%40cse.uta.edu%3E
+		// ====================================================================
 
 		parseParameters(args);
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cb81fac8/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
index d03cc49..b596a87 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
@@ -792,11 +792,13 @@ public class TypeExtractor {
 			// special case handling for Class, this should not be handled by the POJO logic
 			return new GenericTypeInfo<X>(clazz);
 		}
-		TypeInformation<X> pojoType =  analyzePojo(clazz);
-		if (pojoType != null) {
-			return pojoType;
-		}
 
+//		Disable POJO types for now (see https://mail-archives.apache.org/mod_mbox/incubator-flink-dev/201407.mbox/%3C53D96049.1060509%40cse.uta.edu%3E)
+//
+//		TypeInformation<X> pojoType =  analyzePojo(clazz);
+//		if (pojoType != null) {
+//			return pojoType;
+//		}
 
 		// return a generic type
 		return new GenericTypeInfo<X>(clazz);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cb81fac8/flink-java/src/test/java/org/apache/flink/api/java/operator/CoGroupOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/CoGroupOperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operator/CoGroupOperatorTest.java
index 4bc7ac2..360050b 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operator/CoGroupOperatorTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/CoGroupOperatorTest.java
@@ -30,6 +30,7 @@ import org.apache.flink.api.java.tuple.Tuple5;
 import org.apache.flink.api.java.typeutils.BasicTypeInfo;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.junit.BeforeClass;
+import org.junit.Ignore;
 import org.junit.Test;
 
 import org.apache.flink.api.java.DataSet;
@@ -128,6 +129,7 @@ public class CoGroupOperatorTest {
 		ds1.coGroup(ds2).where(5).equalTo(0);
 	}
 
+	@Ignore
 	@Test
 	public void testCoGroupKeyExpressions1() {
 
@@ -143,6 +145,7 @@ public class CoGroupOperatorTest {
 		}
 	}
 
+	@Ignore
 	@Test(expected = InvalidProgramException.class)
 	public void testCoGroupKeyExpressions2() {
 
@@ -154,6 +157,7 @@ public class CoGroupOperatorTest {
 		ds1.coGroup(ds2).where("myInt").equalTo("myString");
 	}
 
+	@Ignore
 	@Test(expected = InvalidProgramException.class)
 	public void testCoGroupKeyExpressions3() {
 
@@ -165,6 +169,7 @@ public class CoGroupOperatorTest {
 		ds1.coGroup(ds2).where("myInt", "myString").equalTo("myString");
 	}
 
+	@Ignore
 	@Test(expected = IllegalArgumentException.class)
 	public void testCoGroupKeyExpressions4() {
 
@@ -175,7 +180,7 @@ public class CoGroupOperatorTest {
 		// should not work, cogroup key non-existent
 		ds1.coGroup(ds2).where("myNonExistent").equalTo("myInt");
 	}
-	
+
 	@Test
 	public void testCoGroupKeySelectors1() {
 		

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cb81fac8/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupingTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupingTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupingTest.java
index 0938bd9..98b6998 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupingTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupingTest.java
@@ -30,6 +30,7 @@ import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple5;
 import org.apache.flink.api.java.typeutils.BasicTypeInfo;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.junit.Ignore;
 import org.junit.Test;
 
 import org.apache.flink.api.java.DataSet;
@@ -112,6 +113,7 @@ public class GroupingTest {
 		tupleDs.groupBy(-1);
 	}
 
+	@Ignore
 	@Test
 	public void testGroupByKeyExpressions1() {
 
@@ -129,6 +131,7 @@ public class GroupingTest {
 		}
 	}
 
+	@Ignore
 	@Test(expected = UnsupportedOperationException.class)
 	public void testGroupByKeyExpressions2() {
 
@@ -139,6 +142,7 @@ public class GroupingTest {
 		longDs.groupBy("myInt");
 	}
 
+	@Ignore
 	@Test(expected = InvalidProgramException.class)
 	public void testGroupByKeyExpressions3() {
 
@@ -152,6 +156,7 @@ public class GroupingTest {
 
 	}
 
+	@Ignore
 	@Test(expected = IllegalArgumentException.class)
 	public void testGroupByKeyExpressions4() {
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cb81fac8/flink-java/src/test/java/org/apache/flink/api/java/operator/JoinOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/JoinOperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operator/JoinOperatorTest.java
index 8b88288..11eca83 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operator/JoinOperatorTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/JoinOperatorTest.java
@@ -30,6 +30,7 @@ import org.apache.flink.api.java.tuple.Tuple5;
 import org.apache.flink.api.java.typeutils.BasicTypeInfo;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.junit.BeforeClass;
+import org.junit.Ignore;
 import org.junit.Test;
 
 import org.apache.flink.api.java.DataSet;
@@ -128,6 +129,7 @@ public class JoinOperatorTest {
 		ds1.join(ds2).where(5).equalTo(0);
 	}
 
+	@Ignore
 	@Test
 	public void testJoinKeyExpressions1() {
 
@@ -143,6 +145,7 @@ public class JoinOperatorTest {
 		}
 	}
 
+	@Ignore
 	@Test(expected = InvalidProgramException.class)
 	public void testJoinKeyExpressions2() {
 
@@ -154,6 +157,7 @@ public class JoinOperatorTest {
 		ds1.join(ds2).where("myInt").equalTo("myString");
 	}
 
+	@Ignore
 	@Test(expected = InvalidProgramException.class)
 	public void testJoinKeyExpressions3() {
 
@@ -165,6 +169,7 @@ public class JoinOperatorTest {
 		ds1.join(ds2).where("myInt", "myString").equalTo("myString");
 	}
 
+	@Ignore
 	@Test(expected = IllegalArgumentException.class)
 	public void testJoinKeyExpressions4() {
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cb81fac8/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeInformationTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeInformationTest.java b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeInformationTest.java
index b963574..cba8e88 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeInformationTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeInformationTest.java
@@ -24,6 +24,7 @@ import static org.junit.Assert.assertTrue;
 import org.apache.flink.api.java.typeutils.PojoTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.types.TypeInformation;
+import org.junit.Ignore;
 import org.junit.Test;
 
 @SuppressWarnings("unused")
@@ -51,6 +52,7 @@ public class PojoTypeInformationTest {
 		Integer[] intWrapperArray;
 	}
 
+	@Ignore
 	@Test
 	public void testSimplePojoTypeExtraction() {
 		TypeInformation<SimplePojo> type = TypeExtractor.getForClass(SimplePojo.class);
@@ -66,6 +68,7 @@ public class PojoTypeInformationTest {
 		NestedPojoInner inner;
 	}
 
+	@Ignore
 	@Test
 	public void testNestedPojoTypeExtraction() {
 		TypeInformation<NestedPojoOuter> type = TypeExtractor.getForClass(NestedPojoOuter.class);
@@ -82,6 +85,7 @@ public class PojoTypeInformationTest {
 		Recursive1Pojo rec;
 	}
 
+	@Ignore
 	@Test
 	public void testRecursivePojoTypeExtraction() {
 		// This one tests whether a recursive pojo is detected using the set of visited
@@ -89,7 +93,8 @@ public class PojoTypeInformationTest {
 		TypeInformation<Recursive1Pojo> type = TypeExtractor.getForClass(Recursive1Pojo.class);
 		assertTrue("Extracted type is not a Pojo type but should be.", type instanceof PojoTypeInfo);
 	}
-	
+
+	@Ignore
 	@Test
 	public void testRecursivePojoObjectTypeExtraction() {
 		TypeInformation<Recursive1Pojo> type = TypeExtractor.getForObject(new Recursive1Pojo());

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cb81fac8/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java
index c8a8ee9..412f751 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java
@@ -41,7 +41,6 @@ import org.apache.flink.api.java.typeutils.BasicArrayTypeInfo;
 import org.apache.flink.api.java.typeutils.BasicTypeInfo;
 import org.apache.flink.api.java.typeutils.GenericTypeInfo;
 import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
-import org.apache.flink.api.java.typeutils.PojoTypeInfo;
 import org.apache.flink.api.java.typeutils.PrimitiveArrayTypeInfo;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
@@ -62,7 +61,6 @@ import org.junit.Test;
 
 public class TypeExtractorTest {
 
-	
 	@SuppressWarnings({ "rawtypes", "unchecked" })
 	@Test
 	public void testBasicType() {
@@ -307,11 +305,11 @@ public class TypeExtractorTest {
 
 		Assert.assertFalse(ti.isBasicType());
 		Assert.assertFalse(ti.isTupleType());
-		Assert.assertTrue(ti instanceof PojoTypeInfo);
+		Assert.assertTrue(ti instanceof GenericTypeInfo);
 		Assert.assertEquals(ti.getTypeClass(), CustomType.class);
 
 		// use getForClass()
-		Assert.assertTrue(TypeExtractor.getForClass(CustomType.class) instanceof PojoTypeInfo);
+		Assert.assertTrue(TypeExtractor.getForClass(CustomType.class) instanceof GenericTypeInfo);
 		Assert.assertEquals(TypeExtractor.getForClass(CustomType.class).getTypeClass(), ti.getTypeClass());
 
 		// use getForObject()
@@ -320,7 +318,7 @@ public class TypeExtractorTest {
 
 		Assert.assertFalse(ti2.isBasicType());
 		Assert.assertFalse(ti2.isTupleType());
-		Assert.assertTrue(ti2 instanceof PojoTypeInfo);
+		Assert.assertTrue(ti2 instanceof GenericTypeInfo);
 		Assert.assertEquals(ti2.getTypeClass(), CustomType.class);
 	}
 
@@ -360,7 +358,7 @@ public class TypeExtractorTest {
 		Assert.assertEquals(Tuple2.class, tti.getTypeClass());
 		
 		Assert.assertEquals(Long.class, tti.getTypeAt(0).getTypeClass());
-		Assert.assertTrue(tti.getTypeAt(1) instanceof PojoTypeInfo);
+		Assert.assertTrue(tti.getTypeAt(1) instanceof GenericTypeInfo);
 		Assert.assertEquals(CustomType.class, tti.getTypeAt(1).getTypeClass());
 
 		// use getForObject()
@@ -373,7 +371,7 @@ public class TypeExtractorTest {
 		
 		Assert.assertEquals(Tuple2.class, tti2.getTypeClass());
 		Assert.assertEquals(Long.class, tti2.getTypeAt(0).getTypeClass());
-		Assert.assertTrue(tti2.getTypeAt(1) instanceof PojoTypeInfo);
+		Assert.assertTrue(tti2.getTypeAt(1) instanceof GenericTypeInfo);
 		Assert.assertEquals(CustomType.class, tti2.getTypeAt(1).getTypeClass());
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cb81fac8/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java
index 5aef84e..2d17764 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java
@@ -18,48 +18,48 @@
 
 package org.apache.flink.api.java.typeutils.runtime;
 
-import java.util.Random;
-
+import com.google.common.base.Objects;
 import org.apache.flink.api.common.typeutils.SerializerTestBase;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.api.java.typeutils.runtime.PojoSerializer;
 import org.apache.flink.types.TypeInformation;
+import org.junit.Ignore;
 
-import com.google.common.base.Objects;
+import java.util.Random;
 
 /**
  * A test for the {@link org.apache.flink.api.java.typeutils.runtime.PojoSerializer}.
  */
+@Ignore
 public class PojoSerializerTest extends SerializerTestBase<PojoSerializerTest.TestUserClass> {
 	private TypeInformation<TestUserClass> type = TypeExtractor.getForClass(TestUserClass.class);
 
 	@Override
 	protected TypeSerializer<TestUserClass> createSerializer() {
 		TypeSerializer<TestUserClass> serializer = type.createSerializer();
-		assert(serializer instanceof PojoSerializer);
+		assert (serializer instanceof PojoSerializer);
 		return serializer;
 	}
-	
+
 	@Override
 	protected int getLength() {
 		return -1;
 	}
-	
+
 	@Override
 	protected Class<TestUserClass> getTypeClass() {
 		return TestUserClass.class;
 	}
-	
+
 	@Override
 	protected TestUserClass[] getTestData() {
 		Random rnd = new Random(874597969123412341L);
 
-		return new TestUserClass[] {
-			new TestUserClass(rnd.nextInt(), "foo", rnd.nextDouble(), new int[] {1,2,3},
-					new NestedTestUserClass(rnd.nextInt(), "foo@boo", rnd.nextDouble(), new int[] {10, 11, 12})),
-			new TestUserClass(rnd.nextInt(), "bar", rnd.nextDouble(), new int[] {4,5,6},
-					new NestedTestUserClass(rnd.nextInt(), "bar@bas", rnd.nextDouble(), new int[] {20, 21, 22}))
+		return new TestUserClass[]{
+				new TestUserClass(rnd.nextInt(), "foo", rnd.nextDouble(), new int[]{1, 2, 3},
+						new NestedTestUserClass(rnd.nextInt(), "foo@boo", rnd.nextDouble(), new int[]{10, 11, 12})),
+				new TestUserClass(rnd.nextInt(), "bar", rnd.nextDouble(), new int[]{4, 5, 6},
+						new NestedTestUserClass(rnd.nextInt(), "bar@bas", rnd.nextDouble(), new int[]{20, 21, 22}))
 		};
 
 	}
@@ -73,7 +73,8 @@ public class PojoSerializerTest extends SerializerTestBase<PojoSerializerTest.Te
 
 		private NestedTestUserClass nestedClass;
 
-		public TestUserClass() {}
+		public TestUserClass() {
+		}
 
 		public TestUserClass(int dumm1, String dumm2, double dumm3, int[] dumm4, NestedTestUserClass nestedClass) {
 			this.dumm1 = dumm1;
@@ -124,7 +125,8 @@ public class PojoSerializerTest extends SerializerTestBase<PojoSerializerTest.Te
 		public double dumm3;
 		private int[] dumm4;
 
-		public NestedTestUserClass() {}
+		public NestedTestUserClass() {
+		}
 
 		public NestedTestUserClass(int dumm1, String dumm2, double dumm3, int[] dumm4) {
 			this.dumm1 = dumm1;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cb81fac8/flink-tests/src/test/java/org/apache/flink/test/exampleJavaPrograms/WordCountPOJOITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/exampleJavaPrograms/WordCountPOJOITCase.java b/flink-tests/src/test/java/org/apache/flink/test/exampleJavaPrograms/WordCountPOJOITCase.java
index f463924..7e16fd4 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/exampleJavaPrograms/WordCountPOJOITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/exampleJavaPrograms/WordCountPOJOITCase.java
@@ -21,8 +21,9 @@ package org.apache.flink.test.exampleJavaPrograms;
 import org.apache.flink.example.java.wordcount.WordCountPOJO;
 import org.apache.flink.test.testdata.WordCountData;
 import org.apache.flink.test.util.JavaProgramTestBase;
+import org.junit.Ignore;
 
-
+@Ignore
 public class WordCountPOJOITCase extends JavaProgramTestBase {
 
 	protected String textPath;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cb81fac8/flink-tests/src/test/java/org/apache/flink/test/localDistributed/PackagedProgramEndToEndITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/localDistributed/PackagedProgramEndToEndITCase.java b/flink-tests/src/test/java/org/apache/flink/test/localDistributed/PackagedProgramEndToEndITCase.java
index 0e93f2a..d244811 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/localDistributed/PackagedProgramEndToEndITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/localDistributed/PackagedProgramEndToEndITCase.java
@@ -64,7 +64,6 @@ public class PackagedProgramEndToEndITCase {
 			fwClusters.close();
 
 			String jarPath = "target/maven-test-jar.jar";
-//			String jarPath = "/home/aljoscha/maven-test-jar.jar";
 
 			// run KMeans
 			cluster.setNumTaskTracker(2);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cb81fac8/flink-tests/src/test/java/org/apache/flink/test/util/testjar/KMeansForTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/util/testjar/KMeansForTest.java b/flink-tests/src/test/java/org/apache/flink/test/util/testjar/KMeansForTest.java
index 7149cd3..917615d 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/util/testjar/KMeansForTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/util/testjar/KMeansForTest.java
@@ -24,6 +24,7 @@ import java.util.Collection;
 
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.Program;
+import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.functions.RichMapFunction;
 import org.apache.flink.api.java.functions.RichReduceFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -79,7 +80,12 @@ public class KMeansForTest implements Program {
 			.map(new SelectNearestCenter()).withBroadcastSet(loop, "centroids")
 			// count and sum point coordinates for each centroid
 			.map(new CountAppender())
-			.groupBy("f0").reduce(new CentroidAccumulator())
+			.groupBy(new KeySelector<DummyTuple3IntPointLong, Integer>() {
+				@Override
+				public Integer getKey(DummyTuple3IntPointLong value) throws Exception {
+					return value.f0;
+				}
+			}).reduce(new CentroidAccumulator())
 			// compute new centroids from point counts and coordinate sums
 			.map(new CentroidAverager());