You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/11/17 15:39:18 UTC
[3/6] flink git commit: [hotfix] [utils] Change the parameter
'numSample' in DataSetUtils.sampleWithSize() to 'numSamples',
remove redundant Java 6 code
[hotfix] [utils] Change the parameter 'numSample' in DataSetUtils.sampleWithSize() to 'numSamples', remove redundant Java 6 code
This closes #1362
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e692a108
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e692a108
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e692a108
Branch: refs/heads/master
Commit: e692a10801629d31a6cd06b8f173363ea060859d
Parents: a654760
Author: smarthi <sm...@apache.org>
Authored: Mon Nov 16 17:00:30 2015 -0500
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Nov 17 15:38:05 2015 +0100
----------------------------------------------------------------------
.../flink/api/java/utils/DataSetUtils.java | 24 +++++++++-----------
.../apache/flink/api/scala/utils/package.scala | 6 ++---
.../flink/test/util/DataSetUtilsITCase.java | 2 +-
3 files changed, 15 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/e692a108/flink-java/src/main/java/org/apache/flink/api/java/utils/DataSetUtils.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/utils/DataSetUtils.java b/flink-java/src/main/java/org/apache/flink/api/java/utils/DataSetUtils.java
index b91dc82..9a1e952 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/utils/DataSetUtils.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/utils/DataSetUtils.java
@@ -28,7 +28,6 @@ import org.apache.flink.api.java.functions.SampleInPartition;
import org.apache.flink.api.java.functions.SampleWithFraction;
import org.apache.flink.api.java.operators.GroupReduceOperator;
import org.apache.flink.api.java.operators.MapPartitionOperator;
-import org.apache.flink.api.java.sampling.IntermediateSampleData;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;
@@ -59,7 +58,7 @@ public final class DataSetUtils {
counter++;
}
- out.collect(new Tuple2<Integer, Long>(getRuntimeContext().getIndexOfThisSubtask(), counter));
+ out.collect(new Tuple2<>(getRuntimeContext().getIndexOfThisSubtask(), counter));
}
});
}
@@ -109,7 +108,7 @@ public final class DataSetUtils {
@Override
public void mapPartition(Iterable<T> values, Collector<Tuple2<Long, T>> out) throws Exception {
for (T value: values) {
- out.collect(new Tuple2<Long, T>(start++, value));
+ out.collect(new Tuple2<>(start++, value));
}
}
}).withBroadcastSet(elementCount, "counts");
@@ -151,7 +150,7 @@ public final class DataSetUtils {
label = (start << shifter) + taskId;
if (getBitSize(start) + shifter < maxBitSize) {
- out.collect(new Tuple2<Long, T>(label, value));
+ out.collect(new Tuple2<>(label, value));
start++;
} else {
throw new Exception("Exceeded Long value range while generating labels");
@@ -209,15 +208,15 @@ public final class DataSetUtils {
* </p>
*
* @param withReplacement Whether element can be selected more than once.
- * @param numSample The expected sample size.
+ * @param numSamples The expected sample size.
* @return The sampled DataSet
*/
public static <T> DataSet<T> sampleWithSize(
DataSet <T> input,
final boolean withReplacement,
- final int numSample) {
+ final int numSamples) {
- return sampleWithSize(input, withReplacement, numSample, Utils.RNG.nextLong());
+ return sampleWithSize(input, withReplacement, numSamples, Utils.RNG.nextLong());
}
/**
@@ -228,24 +227,23 @@ public final class DataSetUtils {
* </p>
*
* @param withReplacement Whether element can be selected more than once.
- * @param numSample The expected sample size.
+ * @param numSamples The expected sample size.
* @param seed Random number generator seed.
* @return The sampled DataSet
*/
public static <T> DataSet<T> sampleWithSize(
DataSet <T> input,
final boolean withReplacement,
- final int numSample,
+ final int numSamples,
final long seed) {
- SampleInPartition sampleInPartition = new SampleInPartition<T>(withReplacement, numSample, seed);
+ SampleInPartition<T> sampleInPartition = new SampleInPartition<>(withReplacement, numSamples, seed);
MapPartitionOperator mapPartitionOperator = input.mapPartition(sampleInPartition);
// There is no previous group, so the parallelism of GroupReduceOperator is always 1.
String callLocation = Utils.getCallLocationName();
- SampleInCoordinator<T> sampleInCoordinator = new SampleInCoordinator<T>(withReplacement, numSample, seed);
- return new GroupReduceOperator<IntermediateSampleData<T>, T>(mapPartitionOperator,
- input.getType(), sampleInCoordinator, callLocation);
+ SampleInCoordinator<T> sampleInCoordinator = new SampleInCoordinator<>(withReplacement, numSamples, seed);
+ return new GroupReduceOperator<>(mapPartitionOperator, input.getType(), sampleInCoordinator, callLocation);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e692a108/flink-scala/src/main/scala/org/apache/flink/api/scala/utils/package.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/utils/package.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/utils/package.scala
index 0d0f6e2..82d4394 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/utils/package.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/utils/package.scala
@@ -92,16 +92,16 @@ package object utils {
* <p/>
*
* @param withReplacement Whether element can be selected more than once.
- * @param numSample The expected sample size.
+ * @param numSamples The expected sample size.
* @param seed Random number generator seed.
* @return The sampled DataSet
*/
def sampleWithSize(
withReplacement: Boolean,
- numSample: Int,
+ numSamples: Int,
seed: Long = Utils.RNG.nextLong())
: DataSet[T] = {
- wrap(jutils.sampleWithSize(self.javaSet, withReplacement, numSample, seed))
+ wrap(jutils.sampleWithSize(self.javaSet, withReplacement, numSamples, seed))
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e692a108/flink-tests/src/test/java/org/apache/flink/test/util/DataSetUtilsITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/util/DataSetUtilsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/util/DataSetUtilsITCase.java
index 61c5fe1..478354a 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/util/DataSetUtilsITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/util/DataSetUtilsITCase.java
@@ -60,7 +60,7 @@ public class DataSetUtilsITCase extends MultipleProgramsTestBase {
});
// test if index is consecutive
for (int i = 0; i < expectedSize; i++) {
- Assert.assertEquals(i, (long) result.get(i).f0);
+ Assert.assertEquals(i, result.get(i).f0.longValue());
}
}