You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/04/15 23:18:38 UTC

flink git commit: [FLINK-3657] [dataSet] Change access of DataSetUtils.countElements() to 'public'

Repository: flink
Updated Branches:
  refs/heads/master d938c5f59 -> 5f993c65e


[FLINK-3657] [dataSet] Change access of DataSetUtils.countElements() to 'public'

This closes #1829


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5f993c65
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5f993c65
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5f993c65

Branch: refs/heads/master
Commit: 5f993c65eeb1ae0d805e98b14bcb6603f9bff05f
Parents: d938c5f
Author: smarthi <sm...@apache.org>
Authored: Fri Apr 15 21:44:00 2016 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Fri Apr 15 23:17:24 2016 +0200

----------------------------------------------------------------------
 .../flink/api/java/utils/DataSetUtils.java      |  5 ++---
 .../apache/flink/api/scala/utils/package.scala  | 14 ++++++++++++++
 .../flink/test/util/DataSetUtilsITCase.java     | 20 ++++++++++++++++----
 .../api/scala/util/DataSetUtilsITCase.scala     | 19 +++++++++++++++++--
 4 files changed, 49 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5f993c65/flink-java/src/main/java/org/apache/flink/api/java/utils/DataSetUtils.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/utils/DataSetUtils.java b/flink-java/src/main/java/org/apache/flink/api/java/utils/DataSetUtils.java
index 61a71aa..756bb55 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/utils/DataSetUtils.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/utils/DataSetUtils.java
@@ -60,7 +60,7 @@ public final class DataSetUtils {
 	 * @param input the DataSet received as input
 	 * @return a data set containing tuples of subtask index, number of elements mappings.
 	 */
-	private static <T> DataSet<Tuple2<Integer, Long>> countElements(DataSet<T> input) {
+	public static <T> DataSet<Tuple2<Integer, Long>> countElementsPerPartition(DataSet<T> input) {
 		return input.mapPartition(new RichMapPartitionFunction<T, Tuple2<Integer, Long>>() {
 			@Override
 			public void mapPartition(Iterable<T> values, Collector<Tuple2<Integer, Long>> out) throws Exception {
@@ -68,7 +68,6 @@ public final class DataSetUtils {
 				for (T value : values) {
 					counter++;
 				}
-
 				out.collect(new Tuple2<>(getRuntimeContext().getIndexOfThisSubtask(), counter));
 			}
 		});
@@ -83,7 +82,7 @@ public final class DataSetUtils {
 	 */
 	public static <T> DataSet<Tuple2<Long, T>> zipWithIndex(DataSet<T> input) {
 
-		DataSet<Tuple2<Integer, Long>> elementCount = countElements(input);
+		DataSet<Tuple2<Integer, Long>> elementCount = countElementsPerPartition(input);
 
 		return input.mapPartition(new RichMapPartitionFunction<T, Tuple2<Long, T>>() {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/5f993c65/flink-scala/src/main/scala/org/apache/flink/api/scala/utils/package.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/utils/package.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/utils/package.scala
index adad9ab..d543998 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/utils/package.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/utils/package.scala
@@ -48,6 +48,20 @@ package object utils {
   implicit class DataSetUtils[T: TypeInformation : ClassTag](val self: DataSet[T]) {
 
     /**
+      * Method that goes over all the elements in each partition in order to retrieve
+      * the total number of elements.
+      *
+      * @return a data set of tuple2 consisting of (subtask index, number of elements mappings)
+      */
+    def countElementsPerPartition: DataSet[(Int, Long)] = {
+      implicit val typeInfo = createTuple2TypeInformation[Int, Long](
+        BasicTypeInfo.INT_TYPE_INFO.asInstanceOf[TypeInformation[Int]],
+        BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]]
+      )
+      wrap(jutils.countElementsPerPartition(self.javaSet)).map { t => (t.f0.toInt, t.f1.toLong)}
+    }
+
+    /**
      * Method that takes a set of subtask index, total number of elements mappings
      * and assigns ids to all the elements from the input data set.
      *

http://git-wip-us.apache.org/repos/asf/flink/blob/5f993c65/flink-tests/src/test/java/org/apache/flink/test/util/DataSetUtilsITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/util/DataSetUtilsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/util/DataSetUtilsITCase.java
index 4ccc6e2..afbcb89 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/util/DataSetUtilsITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/util/DataSetUtilsITCase.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.test.util;
 
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
@@ -32,8 +30,10 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
@@ -45,12 +45,24 @@ public class DataSetUtilsITCase extends MultipleProgramsTestBase {
 	}
 
 	@Test
+	public void testCountElementsPerPartition() throws Exception {
+	 	ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+	 	long expectedSize = 100L;
+	 	DataSet<Long> numbers = env.generateSequence(0, expectedSize - 1);
+
+	 	DataSet<Tuple2<Integer, Long>> ds = DataSetUtils.countElementsPerPartition(numbers);
+
+	 	Assert.assertEquals(env.getParallelism(), ds.count());
+	 	Assert.assertEquals(expectedSize, ds.sum(1).collect().get(0).f1.longValue());
+	}
+
+	@Test
 	public void testZipWithIndex() throws Exception {
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		long expectedSize = 100L;
 		DataSet<Long> numbers = env.generateSequence(0, expectedSize - 1);
 
-		List<Tuple2<Long, Long>> result = Lists.newArrayList(DataSetUtils.zipWithIndex(numbers).collect());
+		List<Tuple2<Long, Long>> result = new ArrayList<>(DataSetUtils.zipWithIndex(numbers).collect());
 
 		Assert.assertEquals(expectedSize, result.size());
 		// sort result by created index
@@ -79,7 +91,7 @@ public class DataSetUtilsITCase extends MultipleProgramsTestBase {
 			}
 		});
 
-		Set<Long> result = Sets.newHashSet(ids.collect());
+		Set<Long> result = new HashSet<>(ids.collect());
 
 		Assert.assertEquals(expectedSize, result.size());
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/5f993c65/flink-tests/src/test/scala/org/apache/flink/api/scala/util/DataSetUtilsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/util/DataSetUtilsITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/util/DataSetUtilsITCase.scala
index 25ecc9c..83dd2a4 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/util/DataSetUtilsITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/util/DataSetUtilsITCase.scala
@@ -34,7 +34,7 @@ class DataSetUtilsITCase (
   @Test
   @throws(classOf[Exception])
   def testZipWithIndex(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val env = ExecutionEnvironment.getExecutionEnvironment
 
     val expectedSize = 100L
 
@@ -52,7 +52,7 @@ class DataSetUtilsITCase (
   @Test
   @throws(classOf[Exception])
   def testZipWithUniqueId(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val env = ExecutionEnvironment.getExecutionEnvironment
 
     val expectedSize = 100L
 
@@ -73,4 +73,19 @@ class DataSetUtilsITCase (
     Assert.assertEquals(checksum.getCount, 15)
     Assert.assertEquals(checksum.getChecksum, 55)
   }
+
+  @Test
+  @throws(classOf[Exception])
+  def testCountElementsPerPartition(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+
+    val expectedSize = 100L
+
+    val numbers = env.generateSequence(0, expectedSize - 1)
+
+    val ds = numbers.countElementsPerPartition
+
+    Assert.assertEquals(env.getParallelism, ds.collect().size)
+    Assert.assertEquals(expectedSize, ds.sum(1).collect().head._2)
+  }
 }