You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2014/12/18 19:46:16 UTC

[80/82] [abbrv] incubator-flink git commit: Add proper task resource removal in case that a task submission fails

Add proper task resource removal in case that a task submission fails


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/64b2602d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/64b2602d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/64b2602d

Branch: refs/heads/master
Commit: 64b2602d7d8719f22880b5fa14d5cd74ee7f78ba
Parents: 04b97c9
Author: Till Rohrmann <tr...@apache.org>
Authored: Thu Dec 18 01:23:22 2014 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Dec 18 18:58:33 2014 +0100

----------------------------------------------------------------------
 .../mapred/HadoopIOFormatsITCase.java           |   9 --
 .../org/apache/flink/yarn/YarnJobManager.scala  |   6 +-
 .../type/extractor/PojoTypeExtractionTest.java  |   1 -
 .../flink/runtime/jobmanager/JobManager.scala   |   7 +-
 .../flink/runtime/taskmanager/TaskManager.scala |  91 +++++--------
 .../runtime/testingUtils/TestingCluster.scala   |   1 -
 .../runtime/KryoGenericTypeSerializerTest.scala |   2 +-
 .../test/util/ForkableFlinkMiniCluster.scala    |   2 +-
 .../javaApiOperators/GroupReduceITCase.java     | 135 ++++++++++---------
 .../util/CollectionDataSets.java                |   1 -
 10 files changed, 114 insertions(+), 141 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/64b2602d/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopIOFormatsITCase.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopIOFormatsITCase.java b/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopIOFormatsITCase.java
index 6ef0f2e..32396b8 100644
--- a/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopIOFormatsITCase.java
+++ b/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopIOFormatsITCase.java
@@ -18,35 +18,26 @@
 
 package org.apache.flink.test.hadoopcompatibility.mapred;
 
-import org.apache.commons.lang.RandomStringUtils;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.hadoopcompatibility.mapred.HadoopInputFormat;
-import org.apache.flink.hadoopcompatibility.mapred.HadoopMapFunction;
 import org.apache.flink.test.util.JavaProgramTestBase;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.SequenceFileInputFormat;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameters;
 
-import java.io.DataInput;
-import java.io.DataOutput;
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/64b2602d/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
----------------------------------------------------------------------
diff --git a/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala b/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
index aa5eb13..63c9b71 100644
--- a/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
+++ b/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
@@ -69,7 +69,7 @@ trait YarnJobManager extends ActorLogMessages {
     case StopYarnSession(status) =>
       log.info("Stopping Yarn Session.")
 
-      instanceManager.getAllRegisteredInstances foreach {
+      instanceManager.getAllRegisteredInstances.asScala foreach {
         instance =>
           instance.getTaskManager ! StopYarnSession(status)
       }
@@ -196,7 +196,7 @@ trait YarnJobManager extends ActorLogMessages {
         case Some(rmClient) => {
           val response = rmClient.allocate(completedContainers.toFloat / numTaskManager)
 
-          for (container <- response.getAllocatedContainers) {
+          for (container <- response.getAllocatedContainers.asScala) {
             log.info(s"Got new container for TM ${container.getId} on host ${
               container.getNodeId.getHost}")
 
@@ -220,7 +220,7 @@ trait YarnJobManager extends ActorLogMessages {
             }
           }
 
-          for (status <- response.getCompletedContainersStatuses) {
+          for (status <- response.getCompletedContainersStatuses.asScala) {
             completedContainers += 1
             log.info(s"Completed container ${status.getContainerId}. Total completed " +
               s"$completedContainers.")

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/64b2602d/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java
index 7cff856..96ba16b 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java
@@ -39,7 +39,6 @@ import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.api.java.typeutils.TypeInfoParserTest.MyWritable;
 import org.apache.flink.api.java.typeutils.WritableTypeInfo;
 import org.junit.Assert;
-import org.junit.Ignore;
 import org.junit.Test;
 
 import com.google.common.collect.HashMultiset;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/64b2602d/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 0dbbd92..94d5c6e 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -46,13 +46,14 @@ import org.apache.flink.runtime.messages.TaskManagerMessages.{NextInputSplit, He
 import org.apache.flink.runtime.profiling.ProfilingUtils
 import org.slf4j.LoggerFactory
 
-import scala.collection.convert.WrapAsScala
 import scala.concurrent.{Future}
 import scala.concurrent.duration._
 
 class JobManager(val configuration: Configuration) extends
-Actor with ActorLogMessages with ActorLogging with WrapAsScala {
+Actor with ActorLogMessages with ActorLogging {
   import context._
+  import scala.collection.JavaConverters._
+
   implicit val timeout = FiniteDuration(configuration.getInteger(ConfigConstants.AKKA_ASK_TIMEOUT,
     ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT), TimeUnit.SECONDS)
 
@@ -158,7 +159,7 @@ Actor with ActorLogMessages with ActorLogging with WrapAsScala {
               .getName}}).")
           }
 
-          for (vertex <- jobGraph.getVertices) {
+          for (vertex <- jobGraph.getVertices.asScala) {
             val executableClass = vertex.getInvokableClassName
             if (executableClass == null || executableClass.length == 0) {
               throw new JobException(s"The vertex ${vertex.getID} (${vertex.getName}) has no " +

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/64b2602d/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index bb4a241..9a612fb 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -58,7 +58,6 @@ import org.apache.flink.runtime.util.EnvironmentInformation
 import org.apache.flink.util.ExceptionUtils
 import org.slf4j.LoggerFactory
 
-import scala.collection.convert.{WrapAsScala, DecorateAsScala}
 import scala.concurrent.Future
 import scala.concurrent.duration._
 import scala.util.Failure
@@ -67,10 +66,11 @@ import scala.util.Success
 class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkkaURL: String,
                   val taskManagerConfig: TaskManagerConfiguration,
                   val networkConnectionConfig: NetworkConnectionConfiguration)
-  extends Actor with ActorLogMessages with ActorLogging with DecorateAsScala with WrapAsScala {
+  extends Actor with ActorLogMessages with ActorLogging {
 
   import context._
   import taskManagerConfig.{timeout => tmTimeout, _}
+  import scala.collection.JavaConverters._
   implicit val timeout = tmTimeout
 
   log.info(s"Starting task manager at ${self.path}.")
@@ -230,7 +230,6 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkka
       val executionID = tdd.getExecutionId
       val taskIndex = tdd.getIndexInSubtaskGroup
       val numSubtasks = tdd.getCurrentNumberOfSubtasks
-      var jarsRegistered = false
       var startRegisteringTask = 0L
 
       try {
@@ -243,7 +242,6 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkka
           log.debug(s"Register task ${executionID} took ${(System.currentTimeMillis() -
             startRegisteringTask)/1000.0}s")
         }
-        jarsRegistered = true
 
         val userCodeClassLoader = libraryCacheManager.getClassLoader(jobID)
 
@@ -285,7 +283,7 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkka
 
         val cpTasks = new util.HashMap[String, FutureTask[Path]]()
 
-        for (entry <- DistributedCache.readFileInfoFromConfig(tdd.getJobConfiguration)) {
+        for (entry <- DistributedCache.readFileInfoFromConfig(tdd.getJobConfiguration).asScala) {
           val cp = fileCache.createTmpFile(entry.getKey, entry.getValue, jobID)
           cpTasks.put(entry.getKey, cp)
         }
@@ -299,28 +297,18 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkka
       } catch {
         case t: Throwable =>
           log.error(t, s"Could not instantiate task with execution ID ${executionID}.")
-          runningTasks.remove(executionID)
 
-          for (entry <- DistributedCache.readFileInfoFromConfig(tdd.getJobConfiguration)) {
-            fileCache.deleteTmpFile(entry.getKey, entry.getValue, jobID)
-          }
-
-          if (jarsRegistered) {
-            try {
-              libraryCacheManager.unregisterTask(jobID, executionID)
-            } catch {
-              case ioe: IOException =>
-                if(log.isDebugEnabled) {
-                  log.debug(s"Unregistering the execution ${executionID} caused an IOException.")
-                }
-            }
-          }
+          unregisterTask(executionID)
 
           sender ! new TaskOperationResult(executionID, false,
             ExceptionUtils.stringifyException(t))
       }
     }
 
+    case UnregisterTask(executionID) => {
+      unregisterTask(executionID)
+    }
+
     case SendHeartbeat => {
       currentJobManager ! Heartbeat(instanceID)
     }
@@ -348,38 +336,6 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkka
       }
     }
 
-    case UnregisterTask(executionID) => {
-      log.info(s"Unregister task with execution ID ${executionID}.")
-      runningTasks.remove(executionID) match {
-        case Some(task) =>
-          if(task.getEnvironment != null) {
-            for (entry <- DistributedCache.readFileInfoFromConfig(task.getEnvironment
-              .getJobConfiguration)) {
-              fileCache.deleteTmpFile(entry.getKey, entry.getValue, task.getJobID)
-            }
-          }
-
-          channelManager foreach {
-            _.unregister(executionID, task)
-          }
-
-          profiler foreach {
-            _ ! UnmonitorTask(task.getExecutionId)
-          }
-
-          task.unregisterMemoryManager(memoryManager)
-
-          try {
-            libraryCacheManager.unregisterTask(task.getJobID, executionID)
-          } catch {
-            case ioe: IOException =>
-              log.error(ioe, s"Unregistering the execution ${executionID} caused an IOException.")
-          }
-        case None =>
-          log.error(s"Cannot find task with ID ${executionID} to unregister.")
-      }
-    }
-
     case Terminated(jobManager) => {
       log.info(s"Job manager ${jobManager.path} is no longer reachable. Try to reregister.")
       tryJobManagerRegistration()
@@ -393,13 +349,11 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkka
     val futureResponse = (currentJobManager ? UpdateTaskExecutionState(new TaskExecutionState
     (jobID, executionID, executionState, optionalError)))(timeout)
 
-    val receiver = this.self
-
     futureResponse.mapTo[Boolean].onComplete {
       case Success(result) =>
         if (!result || executionState == ExecutionState.FINISHED || executionState ==
           ExecutionState.CANCELED || executionState == ExecutionState.FAILED) {
-          receiver ! UnregisterTask(executionID)
+          self ! UnregisterTask(executionID)
         }
       case Failure(t) =>
         log.warning(s"Execution state change notification failed for task ${executionID} " +
@@ -461,6 +415,33 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkka
       }
     }
   }
+
+  def unregisterTask(executionID: ExecutionAttemptID): Unit = {
+    log.info(s"Unregister task with execution ID ${executionID}.")
+    runningTasks.remove(executionID) match {
+      case Some(task) =>
+        if(task.getEnvironment != null) {
+          for (entry <- DistributedCache.readFileInfoFromConfig(task.getEnvironment
+            .getJobConfiguration).asScala) {
+            fileCache.deleteTmpFile(entry.getKey, entry.getValue, task.getJobID)
+          }
+        }
+
+        channelManager foreach {
+          _.unregister(executionID, task)
+        }
+
+        profiler foreach {
+          _ ! UnmonitorTask(task.getExecutionId)
+        }
+
+        task.unregisterMemoryManager(memoryManager)
+
+        libraryCacheManager.unregisterTask(task.getJobID, executionID)
+      case None =>
+        log.error(s"Cannot find task with ID ${executionID} to unregister.")
+    }
+  }
 }
 
 object TaskManager {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/64b2602d/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
index 806d9b4..dbed4ff 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.testingUtils
 
 import akka.actor.{ActorSystem, Props}
 import org.apache.flink.configuration.{ConfigConstants, Configuration}
-import org.apache.flink.runtime.instance.InstanceConnectionInfo
 import org.apache.flink.runtime.jobmanager.JobManager
 import org.apache.flink.runtime.minicluster.FlinkMiniCluster
 import org.apache.flink.runtime.net.NetUtils

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/64b2602d/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/KryoGenericTypeSerializerTest.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/KryoGenericTypeSerializerTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/KryoGenericTypeSerializerTest.scala
index ddbe322..cadd7ff 100644
--- a/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/KryoGenericTypeSerializerTest.scala
+++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/KryoGenericTypeSerializerTest.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/64b2602d/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala b/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
index 649e094..71a634e 100644
--- a/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
+++ b/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
@@ -22,7 +22,7 @@ import akka.actor.{Props, ActorSystem, ActorRef}
 import org.apache.flink.configuration.{ConfigConstants, Configuration}
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster
 import org.apache.flink.runtime.taskmanager.TaskManager
-import org.apache.flink.runtime.testingUtils.{TestingUtils, TestingTaskManager}
+import org.apache.flink.runtime.testingUtils.{TestingTaskManager}
 
 class ForkableFlinkMiniCluster(userConfiguration: Configuration, singleActorSystem: Boolean)
   extends LocalFlinkMiniCluster(userConfiguration, singleActorSystem) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/64b2602d/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
index aed7007..6cd484d 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
@@ -771,76 +771,79 @@ public class GroupReduceITCase extends MultipleProgramsTestBase {
 			}
 			out.collect(concat.toString());
 		}
-				case 27: {
-					/*
-					 * Test Java collections within pojos ( == test kryo)
-					 */
-					final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-					env.setDegreeOfParallelism(1);
-
-					DataSet<CollectionDataSets.PojoWithCollection> ds = CollectionDataSets.getPojoWithCollection(env);
-					// f0.f0 is first integer
-					DataSet<String> reduceDs = ds.groupBy("key")
-							.reduceGroup(new GroupReduceFunction<CollectionDataSets.PojoWithCollection, String>() {
-								@Override
-								public void reduce(
-										Iterable<CollectionDataSets.PojoWithCollection> values,
-										Collector<String> out) throws Exception {
-									StringBuilder concat = new StringBuilder();
-									concat.append("call");
-									for(CollectionDataSets.PojoWithCollection value : values) {
-										concat.append("For key "+value.key+" we got: ");
-										for(CollectionDataSets.Pojo1 p :value.pojos) {
-											concat.append("pojo.a="+p.a);
-										}
-									}
-									out.collect(concat.toString());
-								}
-							});
-					reduceDs.writeAsText(resultPath);
-					env.execute();
-
-					// return expected result
-					return "callFor key 0 we got: pojo.a=apojo.a=bFor key 0 we got: pojo.a=a2pojo.a=b2\n";
-				}
+	}
 
-				case 28: {
-					/*
-					 * Group by generic type
-					 */
-					final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-					env.setDegreeOfParallelism(1);
-
-					DataSet<CollectionDataSets.PojoWithCollection> ds = CollectionDataSets.getPojoWithCollection(env);
-					// f0.f0 is first integer
-					DataSet<String> reduceDs = ds.groupBy("bigInt")
-							.reduceGroup(new GroupReduceFunction<CollectionDataSets.PojoWithCollection, String>() {
-								@Override
-								public void reduce(
-										Iterable<CollectionDataSets.PojoWithCollection> values,
-										Collector<String> out) throws Exception {
-									StringBuilder concat = new StringBuilder();
-									concat.append("call");
-									for(CollectionDataSets.PojoWithCollection value : values) {
-										concat.append("\nFor key "+value.bigInt+" we got:\n"+value);
-									}
-									out.collect(concat.toString());
-								}
-							});
-					reduceDs.writeAsText(resultPath);
-					env.execute();
-
-					// return expected result
-					return "call\n" +
-							"For key 92233720368547758070 we got:\n" +
-							"PojoWithCollection{pojos.size()=2, key=0, sqlDate=2033-05-18, bigInt=92233720368547758070, bigDecimalKeepItNull=null, scalaBigInt=10, mixed=[{someKey=1}, /this/is/wrong, uhlala]}\n" +
-							"For key 92233720368547758070 we got:\n" +
-							"PojoWithCollection{pojos.size()=2, key=0, sqlDate=1976-05-03, bigInt=92233720368547758070, bigDecimalKeepItNull=null, scalaBigInt=31104000, mixed=null}\n";
+	@Test
+	public void testJavaCollectionsWithinPojos() throws Exception {
+		/*
+		 * Test Java collections within pojos ( == test kryo)
+		 */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.setDegreeOfParallelism(1);
+
+		DataSet<CollectionDataSets.PojoWithCollection> ds = CollectionDataSets.getPojoWithCollection(env);
+		// f0.f0 is first integer
+		DataSet<String> reduceDs = ds.groupBy("key")
+				.reduceGroup(new GroupReducer7());
+		reduceDs.writeAsText(resultPath);
+		env.execute();
+
+		expected = "callFor key 0 we got: pojo.a=apojo.a=bFor key 0 we got: pojo.a=a2pojo.a=b2\n";
+	}
+
+	public static class GroupReducer7 implements GroupReduceFunction<CollectionDataSets.PojoWithCollection, String> {
+		@Override
+		public void reduce(
+				Iterable<CollectionDataSets.PojoWithCollection> values,
+				Collector<String> out) throws Exception {
+			StringBuilder concat = new StringBuilder();
+			concat.append("call");
+			for(CollectionDataSets.PojoWithCollection value : values) {
+				concat.append("For key "+value.key+" we got: ");
+				for(CollectionDataSets.Pojo1 p :value.pojos) {
+					concat.append("pojo.a="+p.a);
 				}
+			}
+			out.collect(concat.toString());
+		}
+	}
+
+	@Test
+	public void testGroupByGenericType() throws Exception {
+		/*
+		 * Group by generic type
+		 */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.setDegreeOfParallelism(1);
+
+		DataSet<CollectionDataSets.PojoWithCollection> ds = CollectionDataSets.getPojoWithCollection(env);
+		// f0.f0 is first integer
+		DataSet<String> reduceDs = ds.groupBy("bigInt")
+				.reduceGroup(new GroupReducer8());
+		reduceDs.writeAsText(resultPath);
+		env.execute();
 
+		expected = "call\n" +
+				"For key 92233720368547758070 we got:\n" +
+				"PojoWithCollection{pojos.size()=2, key=0, sqlDate=2033-05-18, bigInt=92233720368547758070, bigDecimalKeepItNull=null, scalaBigInt=10, mixed=[{someKey=1}, /this/is/wrong, uhlala]}\n" +
+				"For key 92233720368547758070 we got:\n" +
+				"PojoWithCollection{pojos.size()=2, key=0, sqlDate=1976-05-03, bigInt=92233720368547758070, bigDecimalKeepItNull=null, scalaBigInt=31104000, mixed=null}\n";
 	}
-	
-	
+
+	public static class GroupReducer8 implements GroupReduceFunction<CollectionDataSets.PojoWithCollection, String> {
+		@Override
+		public void reduce(
+				Iterable<CollectionDataSets.PojoWithCollection> values,
+				Collector<String> out) throws Exception {
+			StringBuilder concat = new StringBuilder();
+			concat.append("call");
+			for(CollectionDataSets.PojoWithCollection value : values) {
+				concat.append("\nFor key "+value.bigInt+" we got:\n"+value);
+			}
+			out.collect(concat.toString());
+		}
+	}
+
 	public static class NestedTupleReducer implements GroupReduceFunction<Tuple2<Tuple2<Integer,Integer>,String>, String> {
 		@Override
 		public void reduce(

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/64b2602d/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java
index 895e996..aee0031 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java
@@ -26,7 +26,6 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
-import java.util.Hashtable;
 import java.util.List;
 import java.util.Map;