You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2016/03/18 13:33:44 UTC
spark git commit: [MINOR][DOC] Fix nits in JavaStreamingTestExample
Repository: spark
Updated Branches:
refs/heads/master 0f1015ffd -> 53f32a22d
[MINOR][DOC] Fix nits in JavaStreamingTestExample
## What changes were proposed in this pull request?
Fix some nits discussed in https://github.com/apache/spark/pull/11776#issuecomment-198207419
use !rdd.isEmpty instead of rdd.count > 0
use static instead of AtomicInteger
remove unneeded "throws Exception"
## How was this patch tested?
manual tests
Author: Zheng RuiFeng <ru...@foxmail.com>
Closes #11821 from zhengruifeng/je_fix.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/53f32a22
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/53f32a22
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/53f32a22
Branch: refs/heads/master
Commit: 53f32a22daa40b713fef477054290d8adbeb6f71
Parents: 0f1015f
Author: Zheng RuiFeng <ru...@foxmail.com>
Authored: Fri Mar 18 12:34:14 2016 +0000
Committer: Sean Owen <so...@cloudera.com>
Committed: Fri Mar 18 12:34:14 2016 +0000
----------------------------------------------------------------------
.../mllib/JavaStreamingTestExample.java | 20 +++++++++++---------
1 file changed, 11 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/53f32a22/examples/src/main/java/org/apache/spark/examples/mllib/JavaStreamingTestExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaStreamingTestExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaStreamingTestExample.java
index 2197ef9..4c87559 100644
--- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaStreamingTestExample.java
+++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaStreamingTestExample.java
@@ -56,6 +56,9 @@ import org.apache.spark.util.Utils;
* batches processed exceeds `numBatchesTimeout`.
*/
public class JavaStreamingTestExample {
+
+ private static int timeoutCounter = 0;
+
public static void main(String[] args) {
if (args.length != 3) {
System.err.println("Usage: JavaStreamingTestExample " +
@@ -76,7 +79,7 @@ public class JavaStreamingTestExample {
JavaDStream<BinarySample> data = ssc.textFileStream(dataDir).map(
new Function<String, BinarySample>() {
@Override
- public BinarySample call(String line) throws Exception {
+ public BinarySample call(String line) {
String[] ts = line.split(",");
boolean label = Boolean.valueOf(ts[0]);
double value = Double.valueOf(ts[1]);
@@ -94,22 +97,21 @@ public class JavaStreamingTestExample {
// $example off$
// Stop processing if test becomes significant or we time out
- final Accumulator<Integer> timeoutCounter =
- ssc.sparkContext().accumulator(numBatchesTimeout);
+ timeoutCounter = numBatchesTimeout;
out.foreachRDD(new VoidFunction<JavaRDD<StreamingTestResult>>() {
@Override
- public void call(JavaRDD<StreamingTestResult> rdd) throws Exception {
- timeoutCounter.add(-1);
+ public void call(JavaRDD<StreamingTestResult> rdd) {
+ timeoutCounter -= 1;
- long cntSignificant = rdd.filter(new Function<StreamingTestResult, Boolean>() {
+ boolean anySignificant = !rdd.filter(new Function<StreamingTestResult, Boolean>() {
@Override
- public Boolean call(StreamingTestResult v) throws Exception {
+ public Boolean call(StreamingTestResult v) {
return v.pValue() < 0.05;
}
- }).count();
+ }).isEmpty();
- if (timeoutCounter.value() <= 0 || cntSignificant > 0) {
+ if (timeoutCounter <= 0 || anySignificant) {
rdd.context().stop();
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org