You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2014/12/18 19:46:16 UTC
[80/82] [abbrv] incubator-flink git commit: Add proper task resource
removal in case that a task submission fails
Add proper task resource removal in case that a task submission fails
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/64b2602d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/64b2602d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/64b2602d
Branch: refs/heads/master
Commit: 64b2602d7d8719f22880b5fa14d5cd74ee7f78ba
Parents: 04b97c9
Author: Till Rohrmann <tr...@apache.org>
Authored: Thu Dec 18 01:23:22 2014 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Dec 18 18:58:33 2014 +0100
----------------------------------------------------------------------
.../mapred/HadoopIOFormatsITCase.java | 9 --
.../org/apache/flink/yarn/YarnJobManager.scala | 6 +-
.../type/extractor/PojoTypeExtractionTest.java | 1 -
.../flink/runtime/jobmanager/JobManager.scala | 7 +-
.../flink/runtime/taskmanager/TaskManager.scala | 91 +++++--------
.../runtime/testingUtils/TestingCluster.scala | 1 -
.../runtime/KryoGenericTypeSerializerTest.scala | 2 +-
.../test/util/ForkableFlinkMiniCluster.scala | 2 +-
.../javaApiOperators/GroupReduceITCase.java | 135 ++++++++++---------
.../util/CollectionDataSets.java | 1 -
10 files changed, 114 insertions(+), 141 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/64b2602d/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopIOFormatsITCase.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopIOFormatsITCase.java b/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopIOFormatsITCase.java
index 6ef0f2e..32396b8 100644
--- a/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopIOFormatsITCase.java
+++ b/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopIOFormatsITCase.java
@@ -18,35 +18,26 @@
package org.apache.flink.test.hadoopcompatibility.mapred;
-import org.apache.commons.lang.RandomStringUtils;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.hadoopcompatibility.mapred.HadoopInputFormat;
-import org.apache.flink.hadoopcompatibility.mapred.HadoopMapFunction;
import org.apache.flink.test.util.JavaProgramTestBase;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.SequenceFileInputFormat;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
-import java.io.DataInput;
-import java.io.DataOutput;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/64b2602d/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
----------------------------------------------------------------------
diff --git a/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala b/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
index aa5eb13..63c9b71 100644
--- a/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
+++ b/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
@@ -69,7 +69,7 @@ trait YarnJobManager extends ActorLogMessages {
case StopYarnSession(status) =>
log.info("Stopping Yarn Session.")
- instanceManager.getAllRegisteredInstances foreach {
+ instanceManager.getAllRegisteredInstances.asScala foreach {
instance =>
instance.getTaskManager ! StopYarnSession(status)
}
@@ -196,7 +196,7 @@ trait YarnJobManager extends ActorLogMessages {
case Some(rmClient) => {
val response = rmClient.allocate(completedContainers.toFloat / numTaskManager)
- for (container <- response.getAllocatedContainers) {
+ for (container <- response.getAllocatedContainers.asScala) {
log.info(s"Got new container for TM ${container.getId} on host ${
container.getNodeId.getHost}")
@@ -220,7 +220,7 @@ trait YarnJobManager extends ActorLogMessages {
}
}
- for (status <- response.getCompletedContainersStatuses) {
+ for (status <- response.getCompletedContainersStatuses.asScala) {
completedContainers += 1
log.info(s"Completed container ${status.getContainerId}. Total completed " +
s"$completedContainers.")
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/64b2602d/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java
index 7cff856..96ba16b 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java
@@ -39,7 +39,6 @@ import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.api.java.typeutils.TypeInfoParserTest.MyWritable;
import org.apache.flink.api.java.typeutils.WritableTypeInfo;
import org.junit.Assert;
-import org.junit.Ignore;
import org.junit.Test;
import com.google.common.collect.HashMultiset;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/64b2602d/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 0dbbd92..94d5c6e 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -46,13 +46,14 @@ import org.apache.flink.runtime.messages.TaskManagerMessages.{NextInputSplit, He
import org.apache.flink.runtime.profiling.ProfilingUtils
import org.slf4j.LoggerFactory
-import scala.collection.convert.WrapAsScala
import scala.concurrent.{Future}
import scala.concurrent.duration._
class JobManager(val configuration: Configuration) extends
-Actor with ActorLogMessages with ActorLogging with WrapAsScala {
+Actor with ActorLogMessages with ActorLogging {
import context._
+ import scala.collection.JavaConverters._
+
implicit val timeout = FiniteDuration(configuration.getInteger(ConfigConstants.AKKA_ASK_TIMEOUT,
ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT), TimeUnit.SECONDS)
@@ -158,7 +159,7 @@ Actor with ActorLogMessages with ActorLogging with WrapAsScala {
.getName}}).")
}
- for (vertex <- jobGraph.getVertices) {
+ for (vertex <- jobGraph.getVertices.asScala) {
val executableClass = vertex.getInvokableClassName
if (executableClass == null || executableClass.length == 0) {
throw new JobException(s"The vertex ${vertex.getID} (${vertex.getName}) has no " +
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/64b2602d/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index bb4a241..9a612fb 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -58,7 +58,6 @@ import org.apache.flink.runtime.util.EnvironmentInformation
import org.apache.flink.util.ExceptionUtils
import org.slf4j.LoggerFactory
-import scala.collection.convert.{WrapAsScala, DecorateAsScala}
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.util.Failure
@@ -67,10 +66,11 @@ import scala.util.Success
class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkkaURL: String,
val taskManagerConfig: TaskManagerConfiguration,
val networkConnectionConfig: NetworkConnectionConfiguration)
- extends Actor with ActorLogMessages with ActorLogging with DecorateAsScala with WrapAsScala {
+ extends Actor with ActorLogMessages with ActorLogging {
import context._
import taskManagerConfig.{timeout => tmTimeout, _}
+ import scala.collection.JavaConverters._
implicit val timeout = tmTimeout
log.info(s"Starting task manager at ${self.path}.")
@@ -230,7 +230,6 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkka
val executionID = tdd.getExecutionId
val taskIndex = tdd.getIndexInSubtaskGroup
val numSubtasks = tdd.getCurrentNumberOfSubtasks
- var jarsRegistered = false
var startRegisteringTask = 0L
try {
@@ -243,7 +242,6 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkka
log.debug(s"Register task ${executionID} took ${(System.currentTimeMillis() -
startRegisteringTask)/1000.0}s")
}
- jarsRegistered = true
val userCodeClassLoader = libraryCacheManager.getClassLoader(jobID)
@@ -285,7 +283,7 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkka
val cpTasks = new util.HashMap[String, FutureTask[Path]]()
- for (entry <- DistributedCache.readFileInfoFromConfig(tdd.getJobConfiguration)) {
+ for (entry <- DistributedCache.readFileInfoFromConfig(tdd.getJobConfiguration).asScala) {
val cp = fileCache.createTmpFile(entry.getKey, entry.getValue, jobID)
cpTasks.put(entry.getKey, cp)
}
@@ -299,28 +297,18 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkka
} catch {
case t: Throwable =>
log.error(t, s"Could not instantiate task with execution ID ${executionID}.")
- runningTasks.remove(executionID)
- for (entry <- DistributedCache.readFileInfoFromConfig(tdd.getJobConfiguration)) {
- fileCache.deleteTmpFile(entry.getKey, entry.getValue, jobID)
- }
-
- if (jarsRegistered) {
- try {
- libraryCacheManager.unregisterTask(jobID, executionID)
- } catch {
- case ioe: IOException =>
- if(log.isDebugEnabled) {
- log.debug(s"Unregistering the execution ${executionID} caused an IOException.")
- }
- }
- }
+ unregisterTask(executionID)
sender ! new TaskOperationResult(executionID, false,
ExceptionUtils.stringifyException(t))
}
}
+ case UnregisterTask(executionID) => {
+ unregisterTask(executionID)
+ }
+
case SendHeartbeat => {
currentJobManager ! Heartbeat(instanceID)
}
@@ -348,38 +336,6 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkka
}
}
- case UnregisterTask(executionID) => {
- log.info(s"Unregister task with execution ID ${executionID}.")
- runningTasks.remove(executionID) match {
- case Some(task) =>
- if(task.getEnvironment != null) {
- for (entry <- DistributedCache.readFileInfoFromConfig(task.getEnvironment
- .getJobConfiguration)) {
- fileCache.deleteTmpFile(entry.getKey, entry.getValue, task.getJobID)
- }
- }
-
- channelManager foreach {
- _.unregister(executionID, task)
- }
-
- profiler foreach {
- _ ! UnmonitorTask(task.getExecutionId)
- }
-
- task.unregisterMemoryManager(memoryManager)
-
- try {
- libraryCacheManager.unregisterTask(task.getJobID, executionID)
- } catch {
- case ioe: IOException =>
- log.error(ioe, s"Unregistering the execution ${executionID} caused an IOException.")
- }
- case None =>
- log.error(s"Cannot find task with ID ${executionID} to unregister.")
- }
- }
-
case Terminated(jobManager) => {
log.info(s"Job manager ${jobManager.path} is no longer reachable. Try to reregister.")
tryJobManagerRegistration()
@@ -393,13 +349,11 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkka
val futureResponse = (currentJobManager ? UpdateTaskExecutionState(new TaskExecutionState
(jobID, executionID, executionState, optionalError)))(timeout)
- val receiver = this.self
-
futureResponse.mapTo[Boolean].onComplete {
case Success(result) =>
if (!result || executionState == ExecutionState.FINISHED || executionState ==
ExecutionState.CANCELED || executionState == ExecutionState.FAILED) {
- receiver ! UnregisterTask(executionID)
+ self ! UnregisterTask(executionID)
}
case Failure(t) =>
log.warning(s"Execution state change notification failed for task ${executionID} " +
@@ -461,6 +415,33 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkka
}
}
}
+
+ def unregisterTask(executionID: ExecutionAttemptID): Unit = {
+ log.info(s"Unregister task with execution ID ${executionID}.")
+ runningTasks.remove(executionID) match {
+ case Some(task) =>
+ if(task.getEnvironment != null) {
+ for (entry <- DistributedCache.readFileInfoFromConfig(task.getEnvironment
+ .getJobConfiguration).asScala) {
+ fileCache.deleteTmpFile(entry.getKey, entry.getValue, task.getJobID)
+ }
+ }
+
+ channelManager foreach {
+ _.unregister(executionID, task)
+ }
+
+ profiler foreach {
+ _ ! UnmonitorTask(task.getExecutionId)
+ }
+
+ task.unregisterMemoryManager(memoryManager)
+
+ libraryCacheManager.unregisterTask(task.getJobID, executionID)
+ case None =>
+ log.error(s"Cannot find task with ID ${executionID} to unregister.")
+ }
+ }
}
object TaskManager {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/64b2602d/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
index 806d9b4..dbed4ff 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.testingUtils
import akka.actor.{ActorSystem, Props}
import org.apache.flink.configuration.{ConfigConstants, Configuration}
-import org.apache.flink.runtime.instance.InstanceConnectionInfo
import org.apache.flink.runtime.jobmanager.JobManager
import org.apache.flink.runtime.minicluster.FlinkMiniCluster
import org.apache.flink.runtime.net.NetUtils
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/64b2602d/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/KryoGenericTypeSerializerTest.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/KryoGenericTypeSerializerTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/KryoGenericTypeSerializerTest.scala
index ddbe322..cadd7ff 100644
--- a/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/KryoGenericTypeSerializerTest.scala
+++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/KryoGenericTypeSerializerTest.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/64b2602d/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala b/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
index 649e094..71a634e 100644
--- a/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
+++ b/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
@@ -22,7 +22,7 @@ import akka.actor.{Props, ActorSystem, ActorRef}
import org.apache.flink.configuration.{ConfigConstants, Configuration}
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster
import org.apache.flink.runtime.taskmanager.TaskManager
-import org.apache.flink.runtime.testingUtils.{TestingUtils, TestingTaskManager}
+import org.apache.flink.runtime.testingUtils.{TestingTaskManager}
class ForkableFlinkMiniCluster(userConfiguration: Configuration, singleActorSystem: Boolean)
extends LocalFlinkMiniCluster(userConfiguration, singleActorSystem) {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/64b2602d/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
index aed7007..6cd484d 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
@@ -771,76 +771,79 @@ public class GroupReduceITCase extends MultipleProgramsTestBase {
}
out.collect(concat.toString());
}
- case 27: {
- /*
- * Test Java collections within pojos ( == test kryo)
- */
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setDegreeOfParallelism(1);
-
- DataSet<CollectionDataSets.PojoWithCollection> ds = CollectionDataSets.getPojoWithCollection(env);
- // f0.f0 is first integer
- DataSet<String> reduceDs = ds.groupBy("key")
- .reduceGroup(new GroupReduceFunction<CollectionDataSets.PojoWithCollection, String>() {
- @Override
- public void reduce(
- Iterable<CollectionDataSets.PojoWithCollection> values,
- Collector<String> out) throws Exception {
- StringBuilder concat = new StringBuilder();
- concat.append("call");
- for(CollectionDataSets.PojoWithCollection value : values) {
- concat.append("For key "+value.key+" we got: ");
- for(CollectionDataSets.Pojo1 p :value.pojos) {
- concat.append("pojo.a="+p.a);
- }
- }
- out.collect(concat.toString());
- }
- });
- reduceDs.writeAsText(resultPath);
- env.execute();
-
- // return expected result
- return "callFor key 0 we got: pojo.a=apojo.a=bFor key 0 we got: pojo.a=a2pojo.a=b2\n";
- }
+ }
- case 28: {
- /*
- * Group by generic type
- */
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setDegreeOfParallelism(1);
-
- DataSet<CollectionDataSets.PojoWithCollection> ds = CollectionDataSets.getPojoWithCollection(env);
- // f0.f0 is first integer
- DataSet<String> reduceDs = ds.groupBy("bigInt")
- .reduceGroup(new GroupReduceFunction<CollectionDataSets.PojoWithCollection, String>() {
- @Override
- public void reduce(
- Iterable<CollectionDataSets.PojoWithCollection> values,
- Collector<String> out) throws Exception {
- StringBuilder concat = new StringBuilder();
- concat.append("call");
- for(CollectionDataSets.PojoWithCollection value : values) {
- concat.append("\nFor key "+value.bigInt+" we got:\n"+value);
- }
- out.collect(concat.toString());
- }
- });
- reduceDs.writeAsText(resultPath);
- env.execute();
-
- // return expected result
- return "call\n" +
- "For key 92233720368547758070 we got:\n" +
- "PojoWithCollection{pojos.size()=2, key=0, sqlDate=2033-05-18, bigInt=92233720368547758070, bigDecimalKeepItNull=null, scalaBigInt=10, mixed=[{someKey=1}, /this/is/wrong, uhlala]}\n" +
- "For key 92233720368547758070 we got:\n" +
- "PojoWithCollection{pojos.size()=2, key=0, sqlDate=1976-05-03, bigInt=92233720368547758070, bigDecimalKeepItNull=null, scalaBigInt=31104000, mixed=null}\n";
+ @Test
+ public void testJavaCollectionsWithinPojos() throws Exception {
+ /*
+ * Test Java collections within pojos ( == test kryo)
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setDegreeOfParallelism(1);
+
+ DataSet<CollectionDataSets.PojoWithCollection> ds = CollectionDataSets.getPojoWithCollection(env);
+ // f0.f0 is first integer
+ DataSet<String> reduceDs = ds.groupBy("key")
+ .reduceGroup(new GroupReducer7());
+ reduceDs.writeAsText(resultPath);
+ env.execute();
+
+ expected = "callFor key 0 we got: pojo.a=apojo.a=bFor key 0 we got: pojo.a=a2pojo.a=b2\n";
+ }
+
+ public static class GroupReducer7 implements GroupReduceFunction<CollectionDataSets.PojoWithCollection, String> {
+ @Override
+ public void reduce(
+ Iterable<CollectionDataSets.PojoWithCollection> values,
+ Collector<String> out) throws Exception {
+ StringBuilder concat = new StringBuilder();
+ concat.append("call");
+ for(CollectionDataSets.PojoWithCollection value : values) {
+ concat.append("For key "+value.key+" we got: ");
+ for(CollectionDataSets.Pojo1 p :value.pojos) {
+ concat.append("pojo.a="+p.a);
}
+ }
+ out.collect(concat.toString());
+ }
+ }
+
+ @Test
+ public void testGroupByGenericType() throws Exception {
+ /*
+ * Group by generic type
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setDegreeOfParallelism(1);
+
+ DataSet<CollectionDataSets.PojoWithCollection> ds = CollectionDataSets.getPojoWithCollection(env);
+ // f0.f0 is first integer
+ DataSet<String> reduceDs = ds.groupBy("bigInt")
+ .reduceGroup(new GroupReducer8());
+ reduceDs.writeAsText(resultPath);
+ env.execute();
+ expected = "call\n" +
+ "For key 92233720368547758070 we got:\n" +
+ "PojoWithCollection{pojos.size()=2, key=0, sqlDate=2033-05-18, bigInt=92233720368547758070, bigDecimalKeepItNull=null, scalaBigInt=10, mixed=[{someKey=1}, /this/is/wrong, uhlala]}\n" +
+ "For key 92233720368547758070 we got:\n" +
+ "PojoWithCollection{pojos.size()=2, key=0, sqlDate=1976-05-03, bigInt=92233720368547758070, bigDecimalKeepItNull=null, scalaBigInt=31104000, mixed=null}\n";
}
-
-
+
+ public static class GroupReducer8 implements GroupReduceFunction<CollectionDataSets.PojoWithCollection, String> {
+ @Override
+ public void reduce(
+ Iterable<CollectionDataSets.PojoWithCollection> values,
+ Collector<String> out) throws Exception {
+ StringBuilder concat = new StringBuilder();
+ concat.append("call");
+ for(CollectionDataSets.PojoWithCollection value : values) {
+ concat.append("\nFor key "+value.bigInt+" we got:\n"+value);
+ }
+ out.collect(concat.toString());
+ }
+ }
+
public static class NestedTupleReducer implements GroupReduceFunction<Tuple2<Tuple2<Integer,Integer>,String>, String> {
@Override
public void reduce(
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/64b2602d/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java
index 895e996..aee0031 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java
@@ -26,7 +26,6 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
-import java.util.Hashtable;
import java.util.List;
import java.util.Map;