You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/11/17 15:39:18 UTC

[3/6] flink git commit: [hotfix] [utils] Change the parameter 'numSample' in DataSetUtils.sampleWithSize() to 'numSamples', remove redundant Java 6 code

[hotfix] [utils] Change the parameter 'numSample' in DataSetUtils.sampleWithSize() to 'numSamples', remove redundant Java 6 code

This closes #1362


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e692a108
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e692a108
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e692a108

Branch: refs/heads/master
Commit: e692a10801629d31a6cd06b8f173363ea060859d
Parents: a654760
Author: smarthi <sm...@apache.org>
Authored: Mon Nov 16 17:00:30 2015 -0500
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Nov 17 15:38:05 2015 +0100

----------------------------------------------------------------------
 .../flink/api/java/utils/DataSetUtils.java      | 24 +++++++++-----------
 .../apache/flink/api/scala/utils/package.scala  |  6 ++---
 .../flink/test/util/DataSetUtilsITCase.java     |  2 +-
 3 files changed, 15 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e692a108/flink-java/src/main/java/org/apache/flink/api/java/utils/DataSetUtils.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/utils/DataSetUtils.java b/flink-java/src/main/java/org/apache/flink/api/java/utils/DataSetUtils.java
index b91dc82..9a1e952 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/utils/DataSetUtils.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/utils/DataSetUtils.java
@@ -28,7 +28,6 @@ import org.apache.flink.api.java.functions.SampleInPartition;
 import org.apache.flink.api.java.functions.SampleWithFraction;
 import org.apache.flink.api.java.operators.GroupReduceOperator;
 import org.apache.flink.api.java.operators.MapPartitionOperator;
-import org.apache.flink.api.java.sampling.IntermediateSampleData;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.util.Collector;
@@ -59,7 +58,7 @@ public final class DataSetUtils {
 					counter++;
 				}
 
-				out.collect(new Tuple2<Integer, Long>(getRuntimeContext().getIndexOfThisSubtask(), counter));
+				out.collect(new Tuple2<>(getRuntimeContext().getIndexOfThisSubtask(), counter));
 			}
 		});
 	}
@@ -109,7 +108,7 @@ public final class DataSetUtils {
 			@Override
 			public void mapPartition(Iterable<T> values, Collector<Tuple2<Long, T>> out) throws Exception {
 				for (T value: values) {
-					out.collect(new Tuple2<Long, T>(start++, value));
+					out.collect(new Tuple2<>(start++, value));
 				}
 			}
 		}).withBroadcastSet(elementCount, "counts");
@@ -151,7 +150,7 @@ public final class DataSetUtils {
 					label = (start << shifter) + taskId;
 
 					if (getBitSize(start) + shifter < maxBitSize) {
-						out.collect(new Tuple2<Long, T>(label, value));
+						out.collect(new Tuple2<>(label, value));
 						start++;
 					} else {
 						throw new Exception("Exceeded Long value range while generating labels");
@@ -209,15 +208,15 @@ public final class DataSetUtils {
 	 * </p>
 	 *
 	 * @param withReplacement Whether element can be selected more than once.
-	 * @param numSample       The expected sample size.
+	 * @param numSamples       The expected sample size.
 	 * @return The sampled DataSet
 	 */
 	public static <T> DataSet<T> sampleWithSize(
 		DataSet <T> input,
 		final boolean withReplacement,
-		final int numSample) {
+		final int numSamples) {
 
-		return sampleWithSize(input, withReplacement, numSample, Utils.RNG.nextLong());
+		return sampleWithSize(input, withReplacement, numSamples, Utils.RNG.nextLong());
 	}
 
 	/**
@@ -228,24 +227,23 @@ public final class DataSetUtils {
 	 * </p>
 	 *
 	 * @param withReplacement Whether element can be selected more than once.
-	 * @param numSample       The expected sample size.
+	 * @param numSamples       The expected sample size.
 	 * @param seed            Random number generator seed.
 	 * @return The sampled DataSet
 	 */
 	public static <T> DataSet<T> sampleWithSize(
 		DataSet <T> input,
 		final boolean withReplacement,
-		final int numSample,
+		final int numSamples,
 		final long seed) {
 
-		SampleInPartition sampleInPartition = new SampleInPartition<T>(withReplacement, numSample, seed);
+		SampleInPartition<T> sampleInPartition = new SampleInPartition<>(withReplacement, numSamples, seed);
 		MapPartitionOperator mapPartitionOperator = input.mapPartition(sampleInPartition);
 
 		// There is no previous group, so the parallelism of GroupReduceOperator is always 1.
 		String callLocation = Utils.getCallLocationName();
-		SampleInCoordinator<T> sampleInCoordinator = new SampleInCoordinator<T>(withReplacement, numSample, seed);
-		return new GroupReduceOperator<IntermediateSampleData<T>, T>(mapPartitionOperator,
-			input.getType(), sampleInCoordinator, callLocation);
+		SampleInCoordinator<T> sampleInCoordinator = new SampleInCoordinator<>(withReplacement, numSamples, seed);
+		return new GroupReduceOperator<>(mapPartitionOperator, input.getType(), sampleInCoordinator, callLocation);
 	}
 
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e692a108/flink-scala/src/main/scala/org/apache/flink/api/scala/utils/package.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/utils/package.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/utils/package.scala
index 0d0f6e2..82d4394 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/utils/package.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/utils/package.scala
@@ -92,16 +92,16 @@ package object utils {
      * <p/>
      *
      * @param withReplacement Whether element can be selected more than once.
-     * @param numSample       The expected sample size.
+     * @param numSamples       The expected sample size.
      * @param seed            Random number generator seed.
      * @return The sampled DataSet
      */
     def sampleWithSize(
         withReplacement: Boolean,
-        numSample: Int,
+        numSamples: Int,
         seed: Long = Utils.RNG.nextLong())
       : DataSet[T] = {
-      wrap(jutils.sampleWithSize(self.javaSet, withReplacement, numSample, seed))
+      wrap(jutils.sampleWithSize(self.javaSet, withReplacement, numSamples, seed))
     }
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e692a108/flink-tests/src/test/java/org/apache/flink/test/util/DataSetUtilsITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/util/DataSetUtilsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/util/DataSetUtilsITCase.java
index 61c5fe1..478354a 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/util/DataSetUtilsITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/util/DataSetUtilsITCase.java
@@ -60,7 +60,7 @@ public class DataSetUtilsITCase extends MultipleProgramsTestBase {
 		});
 		// test if index is consecutive
 		for (int i = 0; i < expectedSize; i++) {
-			Assert.assertEquals(i, (long) result.get(i).f0);
+			Assert.assertEquals(i, result.get(i).f0.longValue());
 		}
 	}