You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ml...@apache.org on 2016/03/17 10:10:50 UTC

spark git commit: [MINOR][DOC] Add JavaStreamingTestExample

Repository: spark
Updated Branches:
  refs/heads/master 30c18841e -> 204c9dec2


[MINOR][DOC] Add JavaStreamingTestExample

## What changes were proposed in this pull request?

Add the java example of StreamingTest

## How was this patch tested?

manual tests in CLI: bin/run-example mllib.JavaStreamingTestExample dataDir 5 100

Author: Zheng RuiFeng <ru...@foxmail.com>

Closes #11776 from zhengruifeng/streaming_je.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/204c9dec
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/204c9dec
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/204c9dec

Branch: refs/heads/master
Commit: 204c9dec2c3876d20558ef5bda4dbd6edaf59643
Parents: 30c1884
Author: Zheng RuiFeng <ru...@foxmail.com>
Authored: Thu Mar 17 11:09:02 2016 +0200
Committer: Nick Pentreath <ni...@gmail.com>
Committed: Thu Mar 17 11:09:02 2016 +0200

----------------------------------------------------------------------
 docs/mllib-statistics.md                        |   7 ++
 .../mllib/JavaStreamingTestExample.java         | 121 +++++++++++++++++++
 2 files changed, 128 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/204c9dec/docs/mllib-statistics.md
----------------------------------------------------------------------
diff --git a/docs/mllib-statistics.md b/docs/mllib-statistics.md
index 652d215..b773031 100644
--- a/docs/mllib-statistics.md
+++ b/docs/mllib-statistics.md
@@ -544,6 +544,13 @@ provides streaming hypothesis testing.
 
 {% include_example scala/org/apache/spark/examples/mllib/StreamingTestExample.scala %}
 </div>
+
+<div data-lang="java" markdown="1">
+[`StreamingTest`](api/java/index.html#org.apache.spark.mllib.stat.test.StreamingTest)
+provides streaming hypothesis testing.
+
+{% include_example java/org/apache/spark/examples/mllib/JavaStreamingTestExample.java %}
+</div>
 </div>
 
 

http://git-wip-us.apache.org/repos/asf/spark/blob/204c9dec/examples/src/main/java/org/apache/spark/examples/mllib/JavaStreamingTestExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaStreamingTestExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaStreamingTestExample.java
new file mode 100644
index 0000000..2197ef9
--- /dev/null
+++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaStreamingTestExample.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.mllib;
+
+
+import org.apache.spark.Accumulator;
+import org.apache.spark.api.java.function.VoidFunction;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.Function;
+// $example on$
+import org.apache.spark.mllib.stat.test.BinarySample;
+import org.apache.spark.mllib.stat.test.StreamingTest;
+import org.apache.spark.mllib.stat.test.StreamingTestResult;
+// $example off$
+import org.apache.spark.SparkConf;
+import org.apache.spark.streaming.Duration;
+import org.apache.spark.streaming.Seconds;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+import org.apache.spark.util.Utils;
+
+
+/**
+ * Perform streaming testing using Welch's 2-sample t-test on a stream of data, where the data
+ * stream arrives as text files in a directory. Stops when the two groups are statistically
+ * significant (p-value < 0.05) or after a user-specified timeout in number of batches is exceeded.
+ *
+ * The rows of the text files must be in the form `Boolean, Double`. For example:
+ *   false, -3.92
+ *   true, 99.32
+ *
+ * Usage:
+ *   JavaStreamingTestExample <dataDir> <batchDuration> <numBatchesTimeout>
+ *
+ * To run on your local machine using the directory `dataDir` with 5 seconds between each batch and
+ * a timeout after 100 insignificant batches, call:
+ *    $ bin/run-example mllib.JavaStreamingTestExample dataDir 5 100
+ *
+ * As you add text files to `dataDir` the significance test wil continually update every
+ * `batchDuration` seconds until the test becomes significant (p-value < 0.05) or the number of
+ * batches processed exceeds `numBatchesTimeout`.
+ */
+public class JavaStreamingTestExample {
+  public static void main(String[] args) {
+    if (args.length != 3) {
+      System.err.println("Usage: JavaStreamingTestExample " +
+        "<dataDir> <batchDuration> <numBatchesTimeout>");
+        System.exit(1);
+    }
+
+    String dataDir = args[0];
+    Duration batchDuration = Seconds.apply(Long.valueOf(args[1]));
+    int numBatchesTimeout = Integer.valueOf(args[2]);
+
+    SparkConf conf = new SparkConf().setMaster("local").setAppName("StreamingTestExample");
+    JavaStreamingContext ssc = new JavaStreamingContext(conf, batchDuration);
+
+    ssc.checkpoint(Utils.createTempDir(System.getProperty("java.io.tmpdir"), "spark").toString());
+
+    // $example on$
+    JavaDStream<BinarySample> data = ssc.textFileStream(dataDir).map(
+      new Function<String, BinarySample>() {
+        @Override
+        public BinarySample call(String line) throws Exception {
+          String[] ts = line.split(",");
+          boolean label = Boolean.valueOf(ts[0]);
+          double value = Double.valueOf(ts[1]);
+          return new BinarySample(label, value);
+        }
+      });
+
+    StreamingTest streamingTest = new StreamingTest()
+      .setPeacePeriod(0)
+      .setWindowSize(0)
+      .setTestMethod("welch");
+
+    JavaDStream<StreamingTestResult> out = streamingTest.registerStream(data);
+    out.print();
+    // $example off$
+
+    // Stop processing if test becomes significant or we time out
+    final Accumulator<Integer> timeoutCounter =
+      ssc.sparkContext().accumulator(numBatchesTimeout);
+
+    out.foreachRDD(new VoidFunction<JavaRDD<StreamingTestResult>>() {
+      @Override
+      public void call(JavaRDD<StreamingTestResult> rdd) throws Exception {
+        timeoutCounter.add(-1);
+
+        long cntSignificant = rdd.filter(new Function<StreamingTestResult, Boolean>() {
+          @Override
+          public Boolean call(StreamingTestResult v) throws Exception {
+            return v.pValue() < 0.05;
+          }
+        }).count();
+
+        if (timeoutCounter.value() <= 0 || cntSignificant > 0) {
+          rdd.context().stop();
+        }
+      }
+    });
+
+    ssc.start();
+    ssc.awaitTermination();
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org