You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/04/15 23:18:38 UTC
flink git commit: [FLINK-3657] [dataSet] Change access of
DataSetUtils.countElements() to 'public'
Repository: flink
Updated Branches:
refs/heads/master d938c5f59 -> 5f993c65e
[FLINK-3657] [dataSet] Change access of DataSetUtils.countElements() to 'public'
This closes #1829
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5f993c65
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5f993c65
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5f993c65
Branch: refs/heads/master
Commit: 5f993c65eeb1ae0d805e98b14bcb6603f9bff05f
Parents: d938c5f
Author: smarthi <sm...@apache.org>
Authored: Fri Apr 15 21:44:00 2016 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Fri Apr 15 23:17:24 2016 +0200
----------------------------------------------------------------------
.../flink/api/java/utils/DataSetUtils.java | 5 ++---
.../apache/flink/api/scala/utils/package.scala | 14 ++++++++++++++
.../flink/test/util/DataSetUtilsITCase.java | 20 ++++++++++++++++----
.../api/scala/util/DataSetUtilsITCase.scala | 19 +++++++++++++++++--
4 files changed, 49 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/5f993c65/flink-java/src/main/java/org/apache/flink/api/java/utils/DataSetUtils.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/utils/DataSetUtils.java b/flink-java/src/main/java/org/apache/flink/api/java/utils/DataSetUtils.java
index 61a71aa..756bb55 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/utils/DataSetUtils.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/utils/DataSetUtils.java
@@ -60,7 +60,7 @@ public final class DataSetUtils {
* @param input the DataSet received as input
* @return a data set containing tuples of subtask index, number of elements mappings.
*/
- private static <T> DataSet<Tuple2<Integer, Long>> countElements(DataSet<T> input) {
+ public static <T> DataSet<Tuple2<Integer, Long>> countElementsPerPartition(DataSet<T> input) {
return input.mapPartition(new RichMapPartitionFunction<T, Tuple2<Integer, Long>>() {
@Override
public void mapPartition(Iterable<T> values, Collector<Tuple2<Integer, Long>> out) throws Exception {
@@ -68,7 +68,6 @@ public final class DataSetUtils {
for (T value : values) {
counter++;
}
-
out.collect(new Tuple2<>(getRuntimeContext().getIndexOfThisSubtask(), counter));
}
});
@@ -83,7 +82,7 @@ public final class DataSetUtils {
*/
public static <T> DataSet<Tuple2<Long, T>> zipWithIndex(DataSet<T> input) {
- DataSet<Tuple2<Integer, Long>> elementCount = countElements(input);
+ DataSet<Tuple2<Integer, Long>> elementCount = countElementsPerPartition(input);
return input.mapPartition(new RichMapPartitionFunction<T, Tuple2<Long, T>>() {
http://git-wip-us.apache.org/repos/asf/flink/blob/5f993c65/flink-scala/src/main/scala/org/apache/flink/api/scala/utils/package.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/utils/package.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/utils/package.scala
index adad9ab..d543998 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/utils/package.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/utils/package.scala
@@ -48,6 +48,20 @@ package object utils {
implicit class DataSetUtils[T: TypeInformation : ClassTag](val self: DataSet[T]) {
/**
+ * Method that goes over all the elements in each partition in order to retrieve
+ * the total number of elements.
+ *
+ * @return a data set of tuple2 consisting of (subtask index, number of elements mappings)
+ */
+ def countElementsPerPartition: DataSet[(Int, Long)] = {
+ implicit val typeInfo = createTuple2TypeInformation[Int, Long](
+ BasicTypeInfo.INT_TYPE_INFO.asInstanceOf[TypeInformation[Int]],
+ BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]]
+ )
+ wrap(jutils.countElementsPerPartition(self.javaSet)).map { t => (t.f0.toInt, t.f1.toLong)}
+ }
+
+ /**
* Method that takes a set of subtask index, total number of elements mappings
* and assigns ids to all the elements from the input data set.
*
http://git-wip-us.apache.org/repos/asf/flink/blob/5f993c65/flink-tests/src/test/java/org/apache/flink/test/util/DataSetUtilsITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/util/DataSetUtilsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/util/DataSetUtilsITCase.java
index 4ccc6e2..afbcb89 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/util/DataSetUtilsITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/util/DataSetUtilsITCase.java
@@ -18,8 +18,6 @@
package org.apache.flink.test.util;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
@@ -32,8 +30,10 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
+import java.util.HashSet;
import java.util.List;
import java.util.Set;
@@ -45,12 +45,24 @@ public class DataSetUtilsITCase extends MultipleProgramsTestBase {
}
@Test
+ public void testCountElementsPerPartition() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ long expectedSize = 100L;
+ DataSet<Long> numbers = env.generateSequence(0, expectedSize - 1);
+
+ DataSet<Tuple2<Integer, Long>> ds = DataSetUtils.countElementsPerPartition(numbers);
+
+ Assert.assertEquals(env.getParallelism(), ds.count());
+ Assert.assertEquals(expectedSize, ds.sum(1).collect().get(0).f1.longValue());
+ }
+
+ @Test
public void testZipWithIndex() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
long expectedSize = 100L;
DataSet<Long> numbers = env.generateSequence(0, expectedSize - 1);
- List<Tuple2<Long, Long>> result = Lists.newArrayList(DataSetUtils.zipWithIndex(numbers).collect());
+ List<Tuple2<Long, Long>> result = new ArrayList<>(DataSetUtils.zipWithIndex(numbers).collect());
Assert.assertEquals(expectedSize, result.size());
// sort result by created index
@@ -79,7 +91,7 @@ public class DataSetUtilsITCase extends MultipleProgramsTestBase {
}
});
- Set<Long> result = Sets.newHashSet(ids.collect());
+ Set<Long> result = new HashSet<>(ids.collect());
Assert.assertEquals(expectedSize, result.size());
}
http://git-wip-us.apache.org/repos/asf/flink/blob/5f993c65/flink-tests/src/test/scala/org/apache/flink/api/scala/util/DataSetUtilsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/util/DataSetUtilsITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/util/DataSetUtilsITCase.scala
index 25ecc9c..83dd2a4 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/util/DataSetUtilsITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/util/DataSetUtilsITCase.scala
@@ -34,7 +34,7 @@ class DataSetUtilsITCase (
@Test
@throws(classOf[Exception])
def testZipWithIndex(): Unit = {
- val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+ val env = ExecutionEnvironment.getExecutionEnvironment
val expectedSize = 100L
@@ -52,7 +52,7 @@ class DataSetUtilsITCase (
@Test
@throws(classOf[Exception])
def testZipWithUniqueId(): Unit = {
- val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+ val env = ExecutionEnvironment.getExecutionEnvironment
val expectedSize = 100L
@@ -73,4 +73,19 @@ class DataSetUtilsITCase (
Assert.assertEquals(checksum.getCount, 15)
Assert.assertEquals(checksum.getChecksum, 55)
}
+
+ @Test
+ @throws(classOf[Exception])
+ def testCountElementsPerPartition(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+
+ val expectedSize = 100L
+
+ val numbers = env.generateSequence(0, expectedSize - 1)
+
+ val ds = numbers.countElementsPerPartition
+
+ Assert.assertEquals(env.getParallelism, ds.collect().size)
+ Assert.assertEquals(expectedSize, ds.sum(1).collect().head._2)
+ }
}