You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2014/05/07 02:28:01 UTC
[4/4] git commit: SPARK-1637: Clean up examples for 1.0
SPARK-1637: Clean up examples for 1.0
- [x] Move all of them into subpackages of org.apache.spark.examples (right now some are in org.apache.spark.streaming.examples, for instance, and others are in org.apache.spark.examples.mllib)
- [x] Move Python examples into examples/src/main/python
- [x] Update docs to reflect these changes
Author: Sandeep <sa...@techaddict.me>
This patch had conflicts when merged, resolved by
Committer: Matei Zaharia <ma...@databricks.com>
Closes #571 from techaddict/SPARK-1637 and squashes the following commits:
47ef86c [Sandeep] Changes based on Discussions on PR, removing use of RawTextHelper from examples
8ed2d3f [Sandeep] Docs Updated for changes, Change for java examples
5f96121 [Sandeep] Move Python examples into examples/src/main/python
0a8dd77 [Sandeep] Move all Scala Examples to org.apache.spark.examples (some are in org.apache.spark.streaming.examples, for instance, and others are in org.apache.spark.examples.mllib)
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a000b5c3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a000b5c3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a000b5c3
Branch: refs/heads/master
Commit: a000b5c3b0438c17e9973df4832c320210c29c27
Parents: 39b8b14
Author: Sandeep <sa...@techaddict.me>
Authored: Tue May 6 17:27:52 2014 -0700
Committer: Matei Zaharia <ma...@databricks.com>
Committed: Tue May 6 17:27:52 2014 -0700
----------------------------------------------------------------------
docs/index.md | 4 +-
docs/python-programming-guide.md | 4 +-
docs/streaming-programming-guide.md | 23 ++-
.../apache/spark/examples/mllib/JavaALS.java | 90 ++++++++++
.../apache/spark/examples/mllib/JavaKMeans.java | 84 +++++++++
.../org/apache/spark/examples/mllib/JavaLR.java | 82 +++++++++
.../examples/streaming/JavaFlumeEventCount.java | 76 ++++++++
.../examples/streaming/JavaKafkaWordCount.java | 112 ++++++++++++
.../streaming/JavaNetworkWordCount.java | 88 +++++++++
.../examples/streaming/JavaQueueStream.java | 86 +++++++++
.../apache/spark/mllib/examples/JavaALS.java | 90 ----------
.../apache/spark/mllib/examples/JavaKMeans.java | 84 ---------
.../org/apache/spark/mllib/examples/JavaLR.java | 82 ---------
.../streaming/examples/JavaFlumeEventCount.java | 75 --------
.../streaming/examples/JavaKafkaWordCount.java | 111 ------------
.../examples/JavaNetworkWordCount.java | 87 ---------
.../streaming/examples/JavaQueueStream.java | 85 ---------
examples/src/main/python/als.py | 87 +++++++++
examples/src/main/python/kmeans.py | 73 ++++++++
examples/src/main/python/logistic_regression.py | 76 ++++++++
examples/src/main/python/mllib/kmeans.py | 44 +++++
.../main/python/mllib/logistic_regression.py | 50 ++++++
examples/src/main/python/pagerank.py | 70 ++++++++
examples/src/main/python/pi.py | 37 ++++
examples/src/main/python/sort.py | 36 ++++
examples/src/main/python/transitive_closure.py | 66 +++++++
examples/src/main/python/wordcount.py | 35 ++++
.../apache/spark/examples/sql/RDDRelation.scala | 71 ++++++++
.../spark/examples/sql/hive/HiveFromSpark.scala | 64 +++++++
.../examples/streaming/ActorWordCount.scala | 177 +++++++++++++++++++
.../examples/streaming/FlumeEventCount.scala | 65 +++++++
.../examples/streaming/HdfsWordCount.scala | 55 ++++++
.../examples/streaming/KafkaWordCount.scala | 103 +++++++++++
.../examples/streaming/MQTTWordCount.scala | 109 ++++++++++++
.../examples/streaming/NetworkWordCount.scala | 61 +++++++
.../spark/examples/streaming/QueueStream.scala | 58 ++++++
.../examples/streaming/RawNetworkGrep.scala | 62 +++++++
.../streaming/RecoverableNetworkWordCount.scala | 122 +++++++++++++
.../streaming/StatefulNetworkWordCount.scala | 73 ++++++++
.../examples/streaming/StreamingExamples.scala | 38 ++++
.../examples/streaming/TwitterAlgebirdCMS.scala | 119 +++++++++++++
.../examples/streaming/TwitterAlgebirdHLL.scala | 96 ++++++++++
.../examples/streaming/TwitterPopularTags.scala | 74 ++++++++
.../examples/streaming/ZeroMQWordCount.scala | 101 +++++++++++
.../clickstream/PageViewGenerator.scala | 109 ++++++++++++
.../streaming/clickstream/PageViewStream.scala | 107 +++++++++++
.../spark/sql/examples/HiveFromSpark.scala | 64 -------
.../apache/spark/sql/examples/RDDRelation.scala | 71 --------
.../streaming/examples/ActorWordCount.scala | 177 -------------------
.../streaming/examples/FlumeEventCount.scala | 65 -------
.../streaming/examples/HdfsWordCount.scala | 55 ------
.../streaming/examples/KafkaWordCount.scala | 104 -----------
.../streaming/examples/MQTTWordCount.scala | 109 ------------
.../streaming/examples/NetworkWordCount.scala | 61 -------
.../spark/streaming/examples/QueueStream.scala | 58 ------
.../streaming/examples/RawNetworkGrep.scala | 66 -------
.../examples/RecoverableNetworkWordCount.scala | 122 -------------
.../examples/StatefulNetworkWordCount.scala | 73 --------
.../streaming/examples/StreamingExamples.scala | 38 ----
.../streaming/examples/TwitterAlgebirdCMS.scala | 119 -------------
.../streaming/examples/TwitterAlgebirdHLL.scala | 96 ----------
.../streaming/examples/TwitterPopularTags.scala | 74 --------
.../streaming/examples/ZeroMQWordCount.scala | 101 -----------
.../clickstream/PageViewGenerator.scala | 109 ------------
.../examples/clickstream/PageViewStream.scala | 107 -----------
python/examples/als.py | 87 ---------
python/examples/kmeans.py | 73 --------
python/examples/logistic_regression.py | 76 --------
python/examples/mllib/kmeans.py | 44 -----
python/examples/mllib/logistic_regression.py | 50 ------
python/examples/pagerank.py | 70 --------
python/examples/pi.py | 37 ----
python/examples/sort.py | 36 ----
python/examples/transitive_closure.py | 66 -------
python/examples/wordcount.py | 35 ----
.../spark/streaming/util/RawTextHelper.scala | 3 +-
76 files changed, 2872 insertions(+), 2875 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/a000b5c3/docs/index.md
----------------------------------------------------------------------
diff --git a/docs/index.md b/docs/index.md
index e364771..a2f1a84 100644
--- a/docs/index.md
+++ b/docs/index.md
@@ -24,11 +24,11 @@ right version of Scala from [scala-lang.org](http://www.scala-lang.org/download/
# Running the Examples and Shell
-Spark comes with several sample programs. Scala and Java examples are in the `examples` directory, and Python examples are in `python/examples`.
+Spark comes with several sample programs. Scala, Java and Python examples are in the `examples/src/main` directory.
To run one of the Java or Scala sample programs, use `./bin/run-example <class> <params>` in the top-level Spark directory
(the `bin/run-example` script sets up the appropriate paths and launches that program).
For example, try `./bin/run-example org.apache.spark.examples.SparkPi local`.
-To run a Python sample program, use `./bin/pyspark <sample-program> <params>`. For example, try `./bin/pyspark ./python/examples/pi.py local`.
+To run a Python sample program, use `./bin/pyspark <sample-program> <params>`. For example, try `./bin/pyspark ./examples/src/main/python/pi.py local`.
Each example prints usage help when run with no parameters.
http://git-wip-us.apache.org/repos/asf/spark/blob/a000b5c3/docs/python-programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/python-programming-guide.md b/docs/python-programming-guide.md
index 8ea22e1..6813963 100644
--- a/docs/python-programming-guide.md
+++ b/docs/python-programming-guide.md
@@ -161,9 +161,9 @@ some example applications.
# Where to Go from Here
-PySpark also includes several sample programs in the [`python/examples` folder](https://github.com/apache/spark/tree/master/python/examples).
+PySpark also includes several sample programs in the [`examples/src/main/python` folder](https://github.com/apache/spark/tree/master/examples/src/main/python).
You can run them by passing the files to `pyspark`; e.g.:
- ./bin/spark-submit python/examples/wordcount.py
+ ./bin/spark-submit examples/src/main/python/wordcount.py
Each program prints usage help when run without arguments.
http://git-wip-us.apache.org/repos/asf/spark/blob/a000b5c3/docs/streaming-programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md
index e8b718b..939599a 100644
--- a/docs/streaming-programming-guide.md
+++ b/docs/streaming-programming-guide.md
@@ -129,7 +129,7 @@ ssc.awaitTermination() // Wait for the computation to terminate
{% endhighlight %}
The complete code can be found in the Spark Streaming example
-[NetworkWordCount]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala).
+[NetworkWordCount]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/NetworkWordCount.scala).
<br>
</div>
@@ -215,7 +215,7 @@ jssc.awaitTermination(); // Wait for the computation to terminate
{% endhighlight %}
The complete code can be found in the Spark Streaming example
-[JavaNetworkWordCount]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java).
+[JavaNetworkWordCount]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java).
<br>
</div>
@@ -234,12 +234,12 @@ Then, in a different terminal, you can start the example by using
<div class="codetabs">
<div data-lang="scala" markdown="1">
{% highlight bash %}
-$ ./bin/run-example org.apache.spark.streaming.examples.NetworkWordCount local[2] localhost 9999
+$ ./bin/run-example org.apache.spark.examples.streaming.NetworkWordCount local[2] localhost 9999
{% endhighlight %}
</div>
<div data-lang="java" markdown="1">
{% highlight bash %}
-$ ./bin/run-example org.apache.spark.streaming.examples.JavaNetworkWordCount local[2] localhost 9999
+$ ./bin/run-example org.apache.spark.examples.streaming.JavaNetworkWordCount local[2] localhost 9999
{% endhighlight %}
</div>
</div>
@@ -268,7 +268,7 @@ hello world
{% highlight bash %}
# TERMINAL 2: RUNNING NetworkWordCount or JavaNetworkWordCount
-$ ./bin/run-example org.apache.spark.streaming.examples.NetworkWordCount local[2] localhost 9999
+$ ./bin/run-example org.apache.spark.examples.streaming.NetworkWordCount local[2] localhost 9999
...
-------------------------------------------
Time: 1357008430000 ms
@@ -609,7 +609,7 @@ JavaPairDStream<String, Integer> runningCounts = pairs.updateStateByKey(updateFu
The update function will be called for each word, with `newValues` having a sequence of 1's (from
the `(word, 1)` pairs) and the `runningCount` having the previous count. For the complete
Scala code, take a look at the example
-[StatefulNetworkWordCount]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/scala/org/apache/spark/streaming/examples/StatefulNetworkWordCount.scala).
+[StatefulNetworkWordCount]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala).
<h4>Transform Operation</h4>
@@ -1135,7 +1135,7 @@ If the `checkpointDirectory` exists, then the context will be recreated from the
If the directory does not exist (i.e., running for the first time),
then the function `functionToCreateContext` will be called to create a new
context and set up the DStreams. See the Scala example
-[RecoverableNetworkWordCount]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala).
+[RecoverableNetworkWordCount]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala).
This example appends the word counts of network data into a file.
You can also explicitly create a `StreamingContext` from the checkpoint data and start the
@@ -1174,7 +1174,7 @@ If the `checkpointDirectory` exists, then the context will be recreated from the
If the directory does not exist (i.e., running for the first time),
then the function `contextFactory` will be called to create a new
context and set up the DStreams. See the Scala example
-[JavaRecoverableWordCount]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/scala/org/apache/spark/streaming/examples/JavaRecoverableWordCount.scala)
+[JavaRecoverableWordCount]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/scala/org/apache/spark/examples/streaming/JavaRecoverableWordCount.scala)
(note that this example is missing in the 0.9 release, so you can test it using the master branch).
This example appends the word counts of network data into a file.
@@ -1374,7 +1374,6 @@ package and renamed for better clarity.
[ZeroMQUtils](api/java/org/apache/spark/streaming/zeromq/ZeroMQUtils.html), and
[MQTTUtils](api/java/org/apache/spark/streaming/mqtt/MQTTUtils.html)
-* More examples in [Scala]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/scala/org/apache/spark/streaming/examples)
- and [Java]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/java/org/apache/spark/streaming/examples)
-* [Paper](http://www.eecs.berkeley.edu/Pubs/TechRpts/2012/EECS-2012-259.pdf) and
-[video](http://youtu.be/g171ndOHgJ0) describing Spark Streaming.
+* More examples in [Scala]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/scala/org/apache/spark/examples/streaming)
+ and [Java]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/java/org/apache/spark/examples/streaming)
+* [Paper](http://www.eecs.berkeley.edu/Pubs/TechRpts/2012/EECS-2012-259.pdf) and [video](http://youtu.be/g171ndOHgJ0) describing Spark Streaming.
http://git-wip-us.apache.org/repos/asf/spark/blob/a000b5c3/examples/src/main/java/org/apache/spark/examples/mllib/JavaALS.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaALS.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaALS.java
new file mode 100644
index 0000000..4533c4c
--- /dev/null
+++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaALS.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.mllib;
+
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.Function;
+
+import org.apache.spark.mllib.recommendation.ALS;
+import org.apache.spark.mllib.recommendation.MatrixFactorizationModel;
+import org.apache.spark.mllib.recommendation.Rating;
+
+import java.util.Arrays;
+import java.util.regex.Pattern;
+
+import scala.Tuple2;
+
+/**
+ * Example using MLLib ALS from Java.
+ */
+public final class JavaALS {
+
+ static class ParseRating implements Function<String, Rating> {
+ private static final Pattern COMMA = Pattern.compile(",");
+
+ @Override
+ public Rating call(String line) {
+ String[] tok = COMMA.split(line);
+ int x = Integer.parseInt(tok[0]);
+ int y = Integer.parseInt(tok[1]);
+ double rating = Double.parseDouble(tok[2]);
+ return new Rating(x, y, rating);
+ }
+ }
+
+ static class FeaturesToString implements Function<Tuple2<Object, double[]>, String> {
+ @Override
+ public String call(Tuple2<Object, double[]> element) {
+ return element._1() + "," + Arrays.toString(element._2());
+ }
+ }
+
+ public static void main(String[] args) {
+
+ if (args.length != 5 && args.length != 6) {
+ System.err.println(
+ "Usage: JavaALS <master> <ratings_file> <rank> <iterations> <output_dir> [<blocks>]");
+ System.exit(1);
+ }
+
+ int rank = Integer.parseInt(args[2]);
+ int iterations = Integer.parseInt(args[3]);
+ String outputDir = args[4];
+ int blocks = -1;
+ if (args.length == 6) {
+ blocks = Integer.parseInt(args[5]);
+ }
+
+ JavaSparkContext sc = new JavaSparkContext(args[0], "JavaALS",
+ System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(JavaALS.class));
+ JavaRDD<String> lines = sc.textFile(args[1]);
+
+ JavaRDD<Rating> ratings = lines.map(new ParseRating());
+
+ MatrixFactorizationModel model = ALS.train(ratings.rdd(), rank, iterations, 0.01, blocks);
+
+ model.userFeatures().toJavaRDD().map(new FeaturesToString()).saveAsTextFile(
+ outputDir + "/userFeatures");
+ model.productFeatures().toJavaRDD().map(new FeaturesToString()).saveAsTextFile(
+ outputDir + "/productFeatures");
+ System.out.println("Final user/product features written to " + outputDir);
+
+ sc.stop();
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/a000b5c3/examples/src/main/java/org/apache/spark/examples/mllib/JavaKMeans.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaKMeans.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaKMeans.java
new file mode 100644
index 0000000..0cfb8e6
--- /dev/null
+++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaKMeans.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.mllib;
+
+import java.util.regex.Pattern;
+
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.Function;
+
+import org.apache.spark.mllib.clustering.KMeans;
+import org.apache.spark.mllib.clustering.KMeansModel;
+import org.apache.spark.mllib.linalg.Vector;
+import org.apache.spark.mllib.linalg.Vectors;
+
+/**
+ * Example using MLLib KMeans from Java.
+ */
+public final class JavaKMeans {
+
+ private static class ParsePoint implements Function<String, Vector> {
+ private static final Pattern SPACE = Pattern.compile(" ");
+
+ @Override
+ public Vector call(String line) {
+ String[] tok = SPACE.split(line);
+ double[] point = new double[tok.length];
+ for (int i = 0; i < tok.length; ++i) {
+ point[i] = Double.parseDouble(tok[i]);
+ }
+ return Vectors.dense(point);
+ }
+ }
+
+ public static void main(String[] args) {
+
+ if (args.length < 4) {
+ System.err.println(
+ "Usage: JavaKMeans <master> <input_file> <k> <max_iterations> [<runs>]");
+ System.exit(1);
+ }
+
+ String inputFile = args[1];
+ int k = Integer.parseInt(args[2]);
+ int iterations = Integer.parseInt(args[3]);
+ int runs = 1;
+
+ if (args.length >= 5) {
+ runs = Integer.parseInt(args[4]);
+ }
+
+ JavaSparkContext sc = new JavaSparkContext(args[0], "JavaKMeans",
+ System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(JavaKMeans.class));
+ JavaRDD<String> lines = sc.textFile(inputFile);
+
+ JavaRDD<Vector> points = lines.map(new ParsePoint());
+
+ KMeansModel model = KMeans.train(points.rdd(), k, iterations, runs, KMeans.K_MEANS_PARALLEL());
+
+ System.out.println("Cluster centers:");
+ for (Vector center : model.clusterCenters()) {
+ System.out.println(" " + center);
+ }
+ double cost = model.computeCost(points.rdd());
+ System.out.println("Cost: " + cost);
+
+ sc.stop();
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/a000b5c3/examples/src/main/java/org/apache/spark/examples/mllib/JavaLR.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaLR.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaLR.java
new file mode 100644
index 0000000..f6e48b4
--- /dev/null
+++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaLR.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.mllib;
+
+import java.util.regex.Pattern;
+
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.Function;
+
+import org.apache.spark.mllib.classification.LogisticRegressionWithSGD;
+import org.apache.spark.mllib.classification.LogisticRegressionModel;
+import org.apache.spark.mllib.linalg.Vectors;
+import org.apache.spark.mllib.regression.LabeledPoint;
+
+/**
+ * Logistic regression based classification using ML Lib.
+ */
+public final class JavaLR {
+
+ static class ParsePoint implements Function<String, LabeledPoint> {
+ private static final Pattern COMMA = Pattern.compile(",");
+ private static final Pattern SPACE = Pattern.compile(" ");
+
+ @Override
+ public LabeledPoint call(String line) {
+ String[] parts = COMMA.split(line);
+ double y = Double.parseDouble(parts[0]);
+ String[] tok = SPACE.split(parts[1]);
+ double[] x = new double[tok.length];
+ for (int i = 0; i < tok.length; ++i) {
+ x[i] = Double.parseDouble(tok[i]);
+ }
+ return new LabeledPoint(y, Vectors.dense(x));
+ }
+ }
+
+ public static void main(String[] args) {
+ if (args.length != 4) {
+ System.err.println("Usage: JavaLR <master> <input_dir> <step_size> <niters>");
+ System.exit(1);
+ }
+
+ JavaSparkContext sc = new JavaSparkContext(args[0], "JavaLR",
+ System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(JavaLR.class));
+ JavaRDD<String> lines = sc.textFile(args[1]);
+ JavaRDD<LabeledPoint> points = lines.map(new ParsePoint()).cache();
+ double stepSize = Double.parseDouble(args[2]);
+ int iterations = Integer.parseInt(args[3]);
+
+ // Another way to configure LogisticRegression
+ //
+ // LogisticRegressionWithSGD lr = new LogisticRegressionWithSGD();
+ // lr.optimizer().setNumIterations(iterations)
+ // .setStepSize(stepSize)
+ // .setMiniBatchFraction(1.0);
+ // lr.setIntercept(true);
+ // LogisticRegressionModel model = lr.train(points.rdd());
+
+ LogisticRegressionModel model = LogisticRegressionWithSGD.train(points.rdd(),
+ iterations, stepSize);
+
+ System.out.print("Final w: " + model.weights());
+
+ sc.stop();
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/a000b5c3/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java
new file mode 100644
index 0000000..a5ece68
--- /dev/null
+++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.streaming;
+
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.examples.streaming.StreamingExamples;
+import org.apache.spark.streaming.*;
+import org.apache.spark.streaming.api.java.*;
+import org.apache.spark.streaming.flume.FlumeUtils;
+import org.apache.spark.streaming.flume.SparkFlumeEvent;
+
+/**
+ * Produces a count of events received from Flume.
+ *
+ * This should be used in conjunction with an AvroSink in Flume. It will start
+ * an Avro server on at the request host:port address and listen for requests.
+ * Your Flume AvroSink should be pointed to this address.
+ *
+ * Usage: JavaFlumeEventCount <master> <host> <port>
+ *
+ * <master> is a Spark master URL
+ * <host> is the host the Flume receiver will be started on - a receiver
+ * creates a server and listens for flume events.
+ * <port> is the port the Flume receiver will listen on.
+ */
+public final class JavaFlumeEventCount {
+ private JavaFlumeEventCount() {
+ }
+
+ public static void main(String[] args) {
+ if (args.length != 3) {
+ System.err.println("Usage: JavaFlumeEventCount <master> <host> <port>");
+ System.exit(1);
+ }
+
+ StreamingExamples.setStreamingLogLevels();
+
+ String master = args[0];
+ String host = args[1];
+ int port = Integer.parseInt(args[2]);
+
+ Duration batchInterval = new Duration(2000);
+
+ JavaStreamingContext ssc = new JavaStreamingContext(master, "FlumeEventCount", batchInterval,
+ System.getenv("SPARK_HOME"),
+ JavaStreamingContext.jarOfClass(JavaFlumeEventCount.class));
+ JavaReceiverInputDStream<SparkFlumeEvent> flumeStream = FlumeUtils.createStream(ssc, "localhost", port);
+
+ flumeStream.count();
+
+ flumeStream.count().map(new Function<Long, String>() {
+ @Override
+ public String call(Long in) {
+ return "Received " + in + " flume events.";
+ }
+ }).print();
+
+ ssc.start();
+ ssc.awaitTermination();
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/a000b5c3/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java
new file mode 100644
index 0000000..da51eb1
--- /dev/null
+++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.streaming;
+
+import java.util.Map;
+import java.util.HashMap;
+import java.util.regex.Pattern;
+
+import com.google.common.collect.Lists;
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.Function2;
+import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.examples.streaming.StreamingExamples;
+import org.apache.spark.streaming.Duration;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaPairDStream;
+import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+import org.apache.spark.streaming.kafka.KafkaUtils;
+import scala.Tuple2;
+
+/**
+ * Consumes messages from one or more topics in Kafka and does wordcount.
+ * Usage: JavaKafkaWordCount <master> <zkQuorum> <group> <topics> <numThreads>
+ * <master> is the Spark master URL. In local mode, <master> should be 'local[n]' with n > 1.
+ * <zkQuorum> is a list of one or more zookeeper servers that make quorum
+ * <group> is the name of kafka consumer group
+ * <topics> is a list of one or more kafka topics to consume from
+ * <numThreads> is the number of threads the kafka consumer should use
+ *
+ * Example:
+ * `./bin/run-example org.apache.spark.examples.streaming.JavaKafkaWordCount local[2] zoo01,zoo02,
+ * zoo03 my-consumer-group topic1,topic2 1`
+ */
+
+public final class JavaKafkaWordCount {
+ private static final Pattern SPACE = Pattern.compile(" ");
+
+ private JavaKafkaWordCount() {
+ }
+
+ public static void main(String[] args) {
+ if (args.length < 5) {
+ System.err.println("Usage: KafkaWordCount <master> <zkQuorum> <group> <topics> <numThreads>");
+ System.exit(1);
+ }
+
+ StreamingExamples.setStreamingLogLevels();
+
+ // Create the context with a 1 second batch size
+ JavaStreamingContext jssc = new JavaStreamingContext(args[0], "KafkaWordCount",
+ new Duration(2000), System.getenv("SPARK_HOME"),
+ JavaStreamingContext.jarOfClass(JavaKafkaWordCount.class));
+
+ int numThreads = Integer.parseInt(args[4]);
+ Map<String, Integer> topicMap = new HashMap<String, Integer>();
+ String[] topics = args[3].split(",");
+ for (String topic: topics) {
+ topicMap.put(topic, numThreads);
+ }
+
+ JavaPairReceiverInputDStream<String, String> messages =
+ KafkaUtils.createStream(jssc, args[1], args[2], topicMap);
+
+ JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {
+ @Override
+ public String call(Tuple2<String, String> tuple2) {
+ return tuple2._2();
+ }
+ });
+
+ JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
+ @Override
+ public Iterable<String> call(String x) {
+ return Lists.newArrayList(SPACE.split(x));
+ }
+ });
+
+ JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
+ new PairFunction<String, String, Integer>() {
+ @Override
+ public Tuple2<String, Integer> call(String s) {
+ return new Tuple2<String, Integer>(s, 1);
+ }
+ }).reduceByKey(new Function2<Integer, Integer, Integer>() {
+ @Override
+ public Integer call(Integer i1, Integer i2) {
+ return i1 + i2;
+ }
+ });
+
+ wordCounts.print();
+ jssc.start();
+ jssc.awaitTermination();
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/a000b5c3/examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java
new file mode 100644
index 0000000..ac84991
--- /dev/null
+++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.streaming;
+
+import com.google.common.collect.Lists;
+import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
+import scala.Tuple2;
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.api.java.function.Function2;
+import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.examples.streaming.StreamingExamples;
+import org.apache.spark.streaming.Duration;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaPairDStream;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+
+import java.util.regex.Pattern;
+
+/**
+ * Counts words in UTF8 encoded, '\n' delimited text received from the network every second.
+ * Usage: JavaNetworkWordCount <master> <hostname> <port>
+ * <master> is the Spark master URL. In local mode, <master> should be 'local[n]' with n > 1.
+ * <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive data.
+ *
+ * To run this on your local machine, you need to first run a Netcat server
+ * `$ nc -lk 9999`
+ * and then run the example
+ * `$ ./run org.apache.spark.examples.streaming.JavaNetworkWordCount local[2] localhost 9999`
+ */
+public final class JavaNetworkWordCount {
+ private static final Pattern SPACE = Pattern.compile(" ");
+
+ public static void main(String[] args) {
+ if (args.length < 3) {
+ System.err.println("Usage: JavaNetworkWordCount <master> <hostname> <port>\n" +
+ "In local mode, <master> should be 'local[n]' with n > 1");
+ System.exit(1);
+ }
+
+ StreamingExamples.setStreamingLogLevels();
+
+ // Create the context with a 1 second batch size
+ JavaStreamingContext ssc = new JavaStreamingContext(args[0], "JavaNetworkWordCount",
+ new Duration(1000), System.getenv("SPARK_HOME"),
+ JavaStreamingContext.jarOfClass(JavaNetworkWordCount.class));
+
+ // Create a JavaReceiverInputDStream on target ip:port and count the
+ // words in input stream of \n delimited text (eg. generated by 'nc')
+ JavaReceiverInputDStream<String> lines = ssc.socketTextStream(args[1], Integer.parseInt(args[2]));
+ JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
+ @Override
+ public Iterable<String> call(String x) {
+ return Lists.newArrayList(SPACE.split(x));
+ }
+ });
+ JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
+ new PairFunction<String, String, Integer>() {
+ @Override
+ public Tuple2<String, Integer> call(String s) {
+ return new Tuple2<String, Integer>(s, 1);
+ }
+ }).reduceByKey(new Function2<Integer, Integer, Integer>() {
+ @Override
+ public Integer call(Integer i1, Integer i2) {
+ return i1 + i2;
+ }
+ });
+
+ wordCounts.print();
+ ssc.start();
+ ssc.awaitTermination();
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/a000b5c3/examples/src/main/java/org/apache/spark/examples/streaming/JavaQueueStream.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaQueueStream.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaQueueStream.java
new file mode 100644
index 0000000..8193119
--- /dev/null
+++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaQueueStream.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.streaming;
+
+import com.google.common.collect.Lists;
+import scala.Tuple2;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.Function2;
+import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.examples.streaming.StreamingExamples;
+import org.apache.spark.streaming.Duration;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaPairDStream;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+
+public final class JavaQueueStream {
+ private JavaQueueStream() {
+ }
+
+ public static void main(String[] args) throws Exception {
+ if (args.length < 1) {
+ System.err.println("Usage: JavaQueueStream <master>");
+ System.exit(1);
+ }
+
+ StreamingExamples.setStreamingLogLevels();
+
+ // Create the context
+ JavaStreamingContext ssc = new JavaStreamingContext(args[0], "QueueStream", new Duration(1000),
+ System.getenv("SPARK_HOME"), JavaStreamingContext.jarOfClass(JavaQueueStream.class));
+
+ // Create the queue through which RDDs can be pushed to
+ // a QueueInputDStream
+ Queue<JavaRDD<Integer>> rddQueue = new LinkedList<JavaRDD<Integer>>();
+
+ // Create and push some RDDs into the queue
+ List<Integer> list = Lists.newArrayList();
+ for (int i = 0; i < 1000; i++) {
+ list.add(i);
+ }
+
+ for (int i = 0; i < 30; i++) {
+ rddQueue.add(ssc.sparkContext().parallelize(list));
+ }
+
+ // Create the QueueInputDStream and use it do some processing
+ JavaDStream<Integer> inputStream = ssc.queueStream(rddQueue);
+ JavaPairDStream<Integer, Integer> mappedStream = inputStream.mapToPair(
+ new PairFunction<Integer, Integer, Integer>() {
+ @Override
+ public Tuple2<Integer, Integer> call(Integer i) {
+ return new Tuple2<Integer, Integer>(i % 10, 1);
+ }
+ });
+ JavaPairDStream<Integer, Integer> reducedStream = mappedStream.reduceByKey(
+ new Function2<Integer, Integer, Integer>() {
+ @Override
+ public Integer call(Integer i1, Integer i2) {
+ return i1 + i2;
+ }
+ });
+
+ reducedStream.print();
+ ssc.start();
+ ssc.awaitTermination();
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/a000b5c3/examples/src/main/java/org/apache/spark/mllib/examples/JavaALS.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/mllib/examples/JavaALS.java b/examples/src/main/java/org/apache/spark/mllib/examples/JavaALS.java
deleted file mode 100644
index c516199..0000000
--- a/examples/src/main/java/org/apache/spark/mllib/examples/JavaALS.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.mllib.examples;
-
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.api.java.function.Function;
-
-import org.apache.spark.mllib.recommendation.ALS;
-import org.apache.spark.mllib.recommendation.MatrixFactorizationModel;
-import org.apache.spark.mllib.recommendation.Rating;
-
-import java.util.Arrays;
-import java.util.regex.Pattern;
-
-import scala.Tuple2;
-
-/**
- * Example using MLLib ALS from Java.
- */
-public final class JavaALS {
-
- static class ParseRating implements Function<String, Rating> {
- private static final Pattern COMMA = Pattern.compile(",");
-
- @Override
- public Rating call(String line) {
- String[] tok = COMMA.split(line);
- int x = Integer.parseInt(tok[0]);
- int y = Integer.parseInt(tok[1]);
- double rating = Double.parseDouble(tok[2]);
- return new Rating(x, y, rating);
- }
- }
-
- static class FeaturesToString implements Function<Tuple2<Object, double[]>, String> {
- @Override
- public String call(Tuple2<Object, double[]> element) {
- return element._1() + "," + Arrays.toString(element._2());
- }
- }
-
- public static void main(String[] args) {
-
- if (args.length != 5 && args.length != 6) {
- System.err.println(
- "Usage: JavaALS <master> <ratings_file> <rank> <iterations> <output_dir> [<blocks>]");
- System.exit(1);
- }
-
- int rank = Integer.parseInt(args[2]);
- int iterations = Integer.parseInt(args[3]);
- String outputDir = args[4];
- int blocks = -1;
- if (args.length == 6) {
- blocks = Integer.parseInt(args[5]);
- }
-
- JavaSparkContext sc = new JavaSparkContext(args[0], "JavaALS",
- System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(JavaALS.class));
- JavaRDD<String> lines = sc.textFile(args[1]);
-
- JavaRDD<Rating> ratings = lines.map(new ParseRating());
-
- MatrixFactorizationModel model = ALS.train(ratings.rdd(), rank, iterations, 0.01, blocks);
-
- model.userFeatures().toJavaRDD().map(new FeaturesToString()).saveAsTextFile(
- outputDir + "/userFeatures");
- model.productFeatures().toJavaRDD().map(new FeaturesToString()).saveAsTextFile(
- outputDir + "/productFeatures");
- System.out.println("Final user/product features written to " + outputDir);
-
- sc.stop();
- }
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/a000b5c3/examples/src/main/java/org/apache/spark/mllib/examples/JavaKMeans.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/mllib/examples/JavaKMeans.java b/examples/src/main/java/org/apache/spark/mllib/examples/JavaKMeans.java
deleted file mode 100644
index 7461609..0000000
--- a/examples/src/main/java/org/apache/spark/mllib/examples/JavaKMeans.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.mllib.examples;
-
-import java.util.regex.Pattern;
-
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.api.java.function.Function;
-
-import org.apache.spark.mllib.clustering.KMeans;
-import org.apache.spark.mllib.clustering.KMeansModel;
-import org.apache.spark.mllib.linalg.Vector;
-import org.apache.spark.mllib.linalg.Vectors;
-
-/**
- * Example using MLLib KMeans from Java.
- */
-public final class JavaKMeans {
-
- private static class ParsePoint implements Function<String, Vector> {
- private static final Pattern SPACE = Pattern.compile(" ");
-
- @Override
- public Vector call(String line) {
- String[] tok = SPACE.split(line);
- double[] point = new double[tok.length];
- for (int i = 0; i < tok.length; ++i) {
- point[i] = Double.parseDouble(tok[i]);
- }
- return Vectors.dense(point);
- }
- }
-
- public static void main(String[] args) {
-
- if (args.length < 4) {
- System.err.println(
- "Usage: JavaKMeans <master> <input_file> <k> <max_iterations> [<runs>]");
- System.exit(1);
- }
-
- String inputFile = args[1];
- int k = Integer.parseInt(args[2]);
- int iterations = Integer.parseInt(args[3]);
- int runs = 1;
-
- if (args.length >= 5) {
- runs = Integer.parseInt(args[4]);
- }
-
- JavaSparkContext sc = new JavaSparkContext(args[0], "JavaKMeans",
- System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(JavaKMeans.class));
- JavaRDD<String> lines = sc.textFile(inputFile);
-
- JavaRDD<Vector> points = lines.map(new ParsePoint());
-
- KMeansModel model = KMeans.train(points.rdd(), k, iterations, runs, KMeans.K_MEANS_PARALLEL());
-
- System.out.println("Cluster centers:");
- for (Vector center : model.clusterCenters()) {
- System.out.println(" " + center);
- }
- double cost = model.computeCost(points.rdd());
- System.out.println("Cost: " + cost);
-
- sc.stop();
- }
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/a000b5c3/examples/src/main/java/org/apache/spark/mllib/examples/JavaLR.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/mllib/examples/JavaLR.java b/examples/src/main/java/org/apache/spark/mllib/examples/JavaLR.java
deleted file mode 100644
index e3ab87c..0000000
--- a/examples/src/main/java/org/apache/spark/mllib/examples/JavaLR.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.mllib.examples;
-
-import java.util.regex.Pattern;
-
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.api.java.function.Function;
-
-import org.apache.spark.mllib.classification.LogisticRegressionWithSGD;
-import org.apache.spark.mllib.classification.LogisticRegressionModel;
-import org.apache.spark.mllib.linalg.Vectors;
-import org.apache.spark.mllib.regression.LabeledPoint;
-
-/**
- * Logistic regression based classification using ML Lib.
- */
-public final class JavaLR {
-
- static class ParsePoint implements Function<String, LabeledPoint> {
- private static final Pattern COMMA = Pattern.compile(",");
- private static final Pattern SPACE = Pattern.compile(" ");
-
- @Override
- public LabeledPoint call(String line) {
- String[] parts = COMMA.split(line);
- double y = Double.parseDouble(parts[0]);
- String[] tok = SPACE.split(parts[1]);
- double[] x = new double[tok.length];
- for (int i = 0; i < tok.length; ++i) {
- x[i] = Double.parseDouble(tok[i]);
- }
- return new LabeledPoint(y, Vectors.dense(x));
- }
- }
-
- public static void main(String[] args) {
- if (args.length != 4) {
- System.err.println("Usage: JavaLR <master> <input_dir> <step_size> <niters>");
- System.exit(1);
- }
-
- JavaSparkContext sc = new JavaSparkContext(args[0], "JavaLR",
- System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(JavaLR.class));
- JavaRDD<String> lines = sc.textFile(args[1]);
- JavaRDD<LabeledPoint> points = lines.map(new ParsePoint()).cache();
- double stepSize = Double.parseDouble(args[2]);
- int iterations = Integer.parseInt(args[3]);
-
- // Another way to configure LogisticRegression
- //
- // LogisticRegressionWithSGD lr = new LogisticRegressionWithSGD();
- // lr.optimizer().setNumIterations(iterations)
- // .setStepSize(stepSize)
- // .setMiniBatchFraction(1.0);
- // lr.setIntercept(true);
- // LogisticRegressionModel model = lr.train(points.rdd());
-
- LogisticRegressionModel model = LogisticRegressionWithSGD.train(points.rdd(),
- iterations, stepSize);
-
- System.out.print("Final w: " + model.weights());
-
- sc.stop();
- }
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/a000b5c3/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java
deleted file mode 100644
index c59f753..0000000
--- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.examples;
-
-import org.apache.spark.api.java.function.Function;
-import org.apache.spark.streaming.*;
-import org.apache.spark.streaming.api.java.*;
-import org.apache.spark.streaming.flume.FlumeUtils;
-import org.apache.spark.streaming.flume.SparkFlumeEvent;
-
-/**
- * Produces a count of events received from Flume.
- *
- * This should be used in conjunction with an AvroSink in Flume. It will start
- * an Avro server on at the request host:port address and listen for requests.
- * Your Flume AvroSink should be pointed to this address.
- *
- * Usage: JavaFlumeEventCount <master> <host> <port>
- *
- * <master> is a Spark master URL
- * <host> is the host the Flume receiver will be started on - a receiver
- * creates a server and listens for flume events.
- * <port> is the port the Flume receiver will listen on.
- */
-public final class JavaFlumeEventCount {
- private JavaFlumeEventCount() {
- }
-
- public static void main(String[] args) {
- if (args.length != 3) {
- System.err.println("Usage: JavaFlumeEventCount <master> <host> <port>");
- System.exit(1);
- }
-
- StreamingExamples.setStreamingLogLevels();
-
- String master = args[0];
- String host = args[1];
- int port = Integer.parseInt(args[2]);
-
- Duration batchInterval = new Duration(2000);
-
- JavaStreamingContext ssc = new JavaStreamingContext(master, "FlumeEventCount", batchInterval,
- System.getenv("SPARK_HOME"),
- JavaStreamingContext.jarOfClass(JavaFlumeEventCount.class));
- JavaReceiverInputDStream<SparkFlumeEvent> flumeStream = FlumeUtils.createStream(ssc, "localhost", port);
-
- flumeStream.count();
-
- flumeStream.count().map(new Function<Long, String>() {
- @Override
- public String call(Long in) {
- return "Received " + in + " flume events.";
- }
- }).print();
-
- ssc.start();
- ssc.awaitTermination();
- }
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/a000b5c3/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java
deleted file mode 100644
index 8da9bcd..0000000
--- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.examples;
-
-import java.util.Map;
-import java.util.HashMap;
-import java.util.regex.Pattern;
-
-import com.google.common.collect.Lists;
-import org.apache.spark.api.java.function.FlatMapFunction;
-import org.apache.spark.api.java.function.Function;
-import org.apache.spark.api.java.function.Function2;
-import org.apache.spark.api.java.function.PairFunction;
-import org.apache.spark.streaming.Duration;
-import org.apache.spark.streaming.api.java.JavaDStream;
-import org.apache.spark.streaming.api.java.JavaPairDStream;
-import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
-import org.apache.spark.streaming.api.java.JavaStreamingContext;
-import org.apache.spark.streaming.kafka.KafkaUtils;
-import scala.Tuple2;
-
-/**
- * Consumes messages from one or more topics in Kafka and does wordcount.
- * Usage: JavaKafkaWordCount <master> <zkQuorum> <group> <topics> <numThreads>
- * <master> is the Spark master URL. In local mode, <master> should be 'local[n]' with n > 1.
- * <zkQuorum> is a list of one or more zookeeper servers that make quorum
- * <group> is the name of kafka consumer group
- * <topics> is a list of one or more kafka topics to consume from
- * <numThreads> is the number of threads the kafka consumer should use
- *
- * Example:
- * `./bin/run-example org.apache.spark.streaming.examples.JavaKafkaWordCount local[2] zoo01,zoo02,
- * zoo03 my-consumer-group topic1,topic2 1`
- */
-
-public final class JavaKafkaWordCount {
- private static final Pattern SPACE = Pattern.compile(" ");
-
- private JavaKafkaWordCount() {
- }
-
- public static void main(String[] args) {
- if (args.length < 5) {
- System.err.println("Usage: KafkaWordCount <master> <zkQuorum> <group> <topics> <numThreads>");
- System.exit(1);
- }
-
- StreamingExamples.setStreamingLogLevels();
-
- // Create the context with a 1 second batch size
- JavaStreamingContext jssc = new JavaStreamingContext(args[0], "KafkaWordCount",
- new Duration(2000), System.getenv("SPARK_HOME"),
- JavaStreamingContext.jarOfClass(JavaKafkaWordCount.class));
-
- int numThreads = Integer.parseInt(args[4]);
- Map<String, Integer> topicMap = new HashMap<String, Integer>();
- String[] topics = args[3].split(",");
- for (String topic: topics) {
- topicMap.put(topic, numThreads);
- }
-
- JavaPairReceiverInputDStream<String, String> messages =
- KafkaUtils.createStream(jssc, args[1], args[2], topicMap);
-
- JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {
- @Override
- public String call(Tuple2<String, String> tuple2) {
- return tuple2._2();
- }
- });
-
- JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
- @Override
- public Iterable<String> call(String x) {
- return Lists.newArrayList(SPACE.split(x));
- }
- });
-
- JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
- new PairFunction<String, String, Integer>() {
- @Override
- public Tuple2<String, Integer> call(String s) {
- return new Tuple2<String, Integer>(s, 1);
- }
- }).reduceByKey(new Function2<Integer, Integer, Integer>() {
- @Override
- public Integer call(Integer i1, Integer i2) {
- return i1 + i2;
- }
- });
-
- wordCounts.print();
- jssc.start();
- jssc.awaitTermination();
- }
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/a000b5c3/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java
deleted file mode 100644
index 098c329..0000000
--- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.examples;
-
-import com.google.common.collect.Lists;
-import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
-import scala.Tuple2;
-import org.apache.spark.api.java.function.FlatMapFunction;
-import org.apache.spark.api.java.function.Function2;
-import org.apache.spark.api.java.function.PairFunction;
-import org.apache.spark.streaming.Duration;
-import org.apache.spark.streaming.api.java.JavaDStream;
-import org.apache.spark.streaming.api.java.JavaPairDStream;
-import org.apache.spark.streaming.api.java.JavaStreamingContext;
-
-import java.util.regex.Pattern;
-
-/**
- * Counts words in UTF8 encoded, '\n' delimited text received from the network every second.
- * Usage: JavaNetworkWordCount <master> <hostname> <port>
- * <master> is the Spark master URL. In local mode, <master> should be 'local[n]' with n > 1.
- * <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive data.
- *
- * To run this on your local machine, you need to first run a Netcat server
- * `$ nc -lk 9999`
- * and then run the example
- * `$ ./run org.apache.spark.streaming.examples.JavaNetworkWordCount local[2] localhost 9999`
- */
-public final class JavaNetworkWordCount {
- private static final Pattern SPACE = Pattern.compile(" ");
-
- public static void main(String[] args) {
- if (args.length < 3) {
- System.err.println("Usage: JavaNetworkWordCount <master> <hostname> <port>\n" +
- "In local mode, <master> should be 'local[n]' with n > 1");
- System.exit(1);
- }
-
- StreamingExamples.setStreamingLogLevels();
-
- // Create the context with a 1 second batch size
- JavaStreamingContext ssc = new JavaStreamingContext(args[0], "JavaNetworkWordCount",
- new Duration(1000), System.getenv("SPARK_HOME"),
- JavaStreamingContext.jarOfClass(JavaNetworkWordCount.class));
-
- // Create a JavaReceiverInputDStream on target ip:port and count the
- // words in input stream of \n delimited text (eg. generated by 'nc')
- JavaReceiverInputDStream<String> lines = ssc.socketTextStream(args[1], Integer.parseInt(args[2]));
- JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
- @Override
- public Iterable<String> call(String x) {
- return Lists.newArrayList(SPACE.split(x));
- }
- });
- JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
- new PairFunction<String, String, Integer>() {
- @Override
- public Tuple2<String, Integer> call(String s) {
- return new Tuple2<String, Integer>(s, 1);
- }
- }).reduceByKey(new Function2<Integer, Integer, Integer>() {
- @Override
- public Integer call(Integer i1, Integer i2) {
- return i1 + i2;
- }
- });
-
- wordCounts.print();
- ssc.start();
- ssc.awaitTermination();
- }
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/a000b5c3/examples/src/main/java/org/apache/spark/streaming/examples/JavaQueueStream.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaQueueStream.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaQueueStream.java
deleted file mode 100644
index 88ad341..0000000
--- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaQueueStream.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.examples;
-
-import com.google.common.collect.Lists;
-import scala.Tuple2;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.function.Function2;
-import org.apache.spark.api.java.function.PairFunction;
-import org.apache.spark.streaming.Duration;
-import org.apache.spark.streaming.api.java.JavaDStream;
-import org.apache.spark.streaming.api.java.JavaPairDStream;
-import org.apache.spark.streaming.api.java.JavaStreamingContext;
-
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Queue;
-
-public final class JavaQueueStream {
- private JavaQueueStream() {
- }
-
- public static void main(String[] args) throws Exception {
- if (args.length < 1) {
- System.err.println("Usage: JavaQueueStream <master>");
- System.exit(1);
- }
-
- StreamingExamples.setStreamingLogLevels();
-
- // Create the context
- JavaStreamingContext ssc = new JavaStreamingContext(args[0], "QueueStream", new Duration(1000),
- System.getenv("SPARK_HOME"), JavaStreamingContext.jarOfClass(JavaQueueStream.class));
-
- // Create the queue through which RDDs can be pushed to
- // a QueueInputDStream
- Queue<JavaRDD<Integer>> rddQueue = new LinkedList<JavaRDD<Integer>>();
-
- // Create and push some RDDs into the queue
- List<Integer> list = Lists.newArrayList();
- for (int i = 0; i < 1000; i++) {
- list.add(i);
- }
-
- for (int i = 0; i < 30; i++) {
- rddQueue.add(ssc.sparkContext().parallelize(list));
- }
-
- // Create the QueueInputDStream and use it do some processing
- JavaDStream<Integer> inputStream = ssc.queueStream(rddQueue);
- JavaPairDStream<Integer, Integer> mappedStream = inputStream.mapToPair(
- new PairFunction<Integer, Integer, Integer>() {
- @Override
- public Tuple2<Integer, Integer> call(Integer i) {
- return new Tuple2<Integer, Integer>(i % 10, 1);
- }
- });
- JavaPairDStream<Integer, Integer> reducedStream = mappedStream.reduceByKey(
- new Function2<Integer, Integer, Integer>() {
- @Override
- public Integer call(Integer i1, Integer i2) {
- return i1 + i2;
- }
- });
-
- reducedStream.print();
- ssc.start();
- ssc.awaitTermination();
- }
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/a000b5c3/examples/src/main/python/als.py
----------------------------------------------------------------------
diff --git a/examples/src/main/python/als.py b/examples/src/main/python/als.py
new file mode 100755
index 0000000..a77dfb2
--- /dev/null
+++ b/examples/src/main/python/als.py
@@ -0,0 +1,87 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+This example requires numpy (http://www.numpy.org/)
+"""
+from os.path import realpath
+import sys
+
+import numpy as np
+from numpy.random import rand
+from numpy import matrix
+from pyspark import SparkContext
+
+LAMBDA = 0.01 # regularization
+np.random.seed(42)
+
+def rmse(R, ms, us):
+ diff = R - ms * us.T
+ return np.sqrt(np.sum(np.power(diff, 2)) / M * U)
+
+def update(i, vec, mat, ratings):
+ uu = mat.shape[0]
+ ff = mat.shape[1]
+ XtX = matrix(np.zeros((ff, ff)))
+ Xty = np.zeros((ff, 1))
+
+ for j in range(uu):
+ v = mat[j, :]
+ XtX += v.T * v
+ Xty += v.T * ratings[i, j]
+ XtX += np.eye(ff, ff) * LAMBDA * uu
+ return np.linalg.solve(XtX, Xty)
+
+if __name__ == "__main__":
+ if len(sys.argv) < 2:
+ print >> sys.stderr, "Usage: als <master> <M> <U> <F> <iters> <slices>"
+ exit(-1)
+ sc = SparkContext(sys.argv[1], "PythonALS", pyFiles=[realpath(__file__)])
+ M = int(sys.argv[2]) if len(sys.argv) > 2 else 100
+ U = int(sys.argv[3]) if len(sys.argv) > 3 else 500
+ F = int(sys.argv[4]) if len(sys.argv) > 4 else 10
+ ITERATIONS = int(sys.argv[5]) if len(sys.argv) > 5 else 5
+ slices = int(sys.argv[6]) if len(sys.argv) > 6 else 2
+
+ print "Running ALS with M=%d, U=%d, F=%d, iters=%d, slices=%d\n" % \
+ (M, U, F, ITERATIONS, slices)
+
+ R = matrix(rand(M, F)) * matrix(rand(U, F).T)
+ ms = matrix(rand(M ,F))
+ us = matrix(rand(U, F))
+
+ Rb = sc.broadcast(R)
+ msb = sc.broadcast(ms)
+ usb = sc.broadcast(us)
+
+ for i in range(ITERATIONS):
+ ms = sc.parallelize(range(M), slices) \
+ .map(lambda x: update(x, msb.value[x, :], usb.value, Rb.value)) \
+ .collect()
+ ms = matrix(np.array(ms)[:, :, 0]) # collect() returns a list, so array ends up being
+ # a 3-d array, we take the first 2 dims for the matrix
+ msb = sc.broadcast(ms)
+
+ us = sc.parallelize(range(U), slices) \
+ .map(lambda x: update(x, usb.value[x, :], msb.value, Rb.value.T)) \
+ .collect()
+ us = matrix(np.array(us)[:, :, 0])
+ usb = sc.broadcast(us)
+
+ error = rmse(R, ms, us)
+ print "Iteration %d:" % i
+ print "\nRMSE: %5.4f\n" % error
http://git-wip-us.apache.org/repos/asf/spark/blob/a000b5c3/examples/src/main/python/kmeans.py
----------------------------------------------------------------------
diff --git a/examples/src/main/python/kmeans.py b/examples/src/main/python/kmeans.py
new file mode 100755
index 0000000..e359648
--- /dev/null
+++ b/examples/src/main/python/kmeans.py
@@ -0,0 +1,73 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+The K-means algorithm written from scratch against PySpark. In practice,
+one may prefer to use the KMeans algorithm in MLlib, as shown in
+examples/src/main/python/mllib/kmeans.py.
+
+This example requires NumPy (http://www.numpy.org/).
+"""
+
+import sys
+
+import numpy as np
+from pyspark import SparkContext
+
+
+def parseVector(line):
+ return np.array([float(x) for x in line.split(' ')])
+
+
+def closestPoint(p, centers):
+ bestIndex = 0
+ closest = float("+inf")
+ for i in range(len(centers)):
+ tempDist = np.sum((p - centers[i]) ** 2)
+ if tempDist < closest:
+ closest = tempDist
+ bestIndex = i
+ return bestIndex
+
+
+if __name__ == "__main__":
+ if len(sys.argv) < 5:
+ print >> sys.stderr, "Usage: kmeans <master> <file> <k> <convergeDist>"
+ exit(-1)
+ sc = SparkContext(sys.argv[1], "PythonKMeans")
+ lines = sc.textFile(sys.argv[2])
+ data = lines.map(parseVector).cache()
+ K = int(sys.argv[3])
+ convergeDist = float(sys.argv[4])
+
+ kPoints = data.takeSample(False, K, 1)
+ tempDist = 1.0
+
+ while tempDist > convergeDist:
+ closest = data.map(
+ lambda p : (closestPoint(p, kPoints), (p, 1)))
+ pointStats = closest.reduceByKey(
+ lambda (x1, y1), (x2, y2): (x1 + x2, y1 + y2))
+ newPoints = pointStats.map(
+ lambda (x, (y, z)): (x, y / z)).collect()
+
+ tempDist = sum(np.sum((kPoints[x] - y) ** 2) for (x, y) in newPoints)
+
+ for (x, y) in newPoints:
+ kPoints[x] = y
+
+ print "Final centers: " + str(kPoints)
http://git-wip-us.apache.org/repos/asf/spark/blob/a000b5c3/examples/src/main/python/logistic_regression.py
----------------------------------------------------------------------
diff --git a/examples/src/main/python/logistic_regression.py b/examples/src/main/python/logistic_regression.py
new file mode 100755
index 0000000..fe5373c
--- /dev/null
+++ b/examples/src/main/python/logistic_regression.py
@@ -0,0 +1,76 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+A logistic regression implementation that uses NumPy (http://www.numpy.org)
+to act on batches of input data using efficient matrix operations.
+
+In practice, one may prefer to use the LogisticRegression algorithm in
+MLlib, as shown in examples/src/main/python/mllib/logistic_regression.py.
+"""
+
+from collections import namedtuple
+from math import exp
+from os.path import realpath
+import sys
+
+import numpy as np
+from pyspark import SparkContext
+
+
+D = 10 # Number of dimensions
+
+
+# Read a batch of points from the input file into a NumPy matrix object. We operate on batches to
+# make further computations faster.
+# The data file contains lines of the form <label> <x1> <x2> ... <xD>. We load each block of these
+# into a NumPy array of size numLines * (D + 1) and pull out column 0 vs the others in gradient().
+def readPointBatch(iterator):
+ strs = list(iterator)
+ matrix = np.zeros((len(strs), D + 1))
+ for i in xrange(len(strs)):
+ matrix[i] = np.fromstring(strs[i].replace(',', ' '), dtype=np.float32, sep=' ')
+ return [matrix]
+
+if __name__ == "__main__":
+ if len(sys.argv) != 4:
+ print >> sys.stderr, "Usage: logistic_regression <master> <file> <iters>"
+ exit(-1)
+ sc = SparkContext(sys.argv[1], "PythonLR", pyFiles=[realpath(__file__)])
+ points = sc.textFile(sys.argv[2]).mapPartitions(readPointBatch).cache()
+ iterations = int(sys.argv[3])
+
+ # Initialize w to a random value
+ w = 2 * np.random.ranf(size=D) - 1
+ print "Initial w: " + str(w)
+
+ # Compute logistic regression gradient for a matrix of data points
+ def gradient(matrix, w):
+ Y = matrix[:,0] # point labels (first column of input file)
+ X = matrix[:,1:] # point coordinates
+ # For each point (x, y), compute gradient function, then sum these up
+ return ((1.0 / (1.0 + np.exp(-Y * X.dot(w))) - 1.0) * Y * X.T).sum(1)
+
+ def add(x, y):
+ x += y
+ return x
+
+ for i in range(iterations):
+ print "On iteration %i" % (i + 1)
+ w -= points.map(lambda m: gradient(m, w)).reduce(add)
+
+ print "Final w: " + str(w)
http://git-wip-us.apache.org/repos/asf/spark/blob/a000b5c3/examples/src/main/python/mllib/kmeans.py
----------------------------------------------------------------------
diff --git a/examples/src/main/python/mllib/kmeans.py b/examples/src/main/python/mllib/kmeans.py
new file mode 100755
index 0000000..dec82ff
--- /dev/null
+++ b/examples/src/main/python/mllib/kmeans.py
@@ -0,0 +1,44 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+A K-means clustering program using MLlib.
+
+This example requires NumPy (http://www.numpy.org/).
+"""
+
+import sys
+
+import numpy as np
+from pyspark import SparkContext
+from pyspark.mllib.clustering import KMeans
+
+
+def parseVector(line):
+ return np.array([float(x) for x in line.split(' ')])
+
+
+if __name__ == "__main__":
+ if len(sys.argv) < 4:
+ print >> sys.stderr, "Usage: kmeans <master> <file> <k>"
+ exit(-1)
+ sc = SparkContext(sys.argv[1], "KMeans")
+ lines = sc.textFile(sys.argv[2])
+ data = lines.map(parseVector)
+ k = int(sys.argv[3])
+ model = KMeans.train(data, k)
+ print "Final centers: " + str(model.clusterCenters)
http://git-wip-us.apache.org/repos/asf/spark/blob/a000b5c3/examples/src/main/python/mllib/logistic_regression.py
----------------------------------------------------------------------
diff --git a/examples/src/main/python/mllib/logistic_regression.py b/examples/src/main/python/mllib/logistic_regression.py
new file mode 100755
index 0000000..8631051
--- /dev/null
+++ b/examples/src/main/python/mllib/logistic_regression.py
@@ -0,0 +1,50 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Logistic regression using MLlib.
+
+This example requires NumPy (http://www.numpy.org/).
+"""
+
+from math import exp
+import sys
+
+import numpy as np
+from pyspark import SparkContext
+from pyspark.mllib.regression import LabeledPoint
+from pyspark.mllib.classification import LogisticRegressionWithSGD
+
+
+# Parse a line of text into an MLlib LabeledPoint object
+def parsePoint(line):
+ values = [float(s) for s in line.split(' ')]
+ if values[0] == -1: # Convert -1 labels to 0 for MLlib
+ values[0] = 0
+ return LabeledPoint(values[0], values[1:])
+
+
+if __name__ == "__main__":
+ if len(sys.argv) != 4:
+ print >> sys.stderr, "Usage: logistic_regression <master> <file> <iters>"
+ exit(-1)
+ sc = SparkContext(sys.argv[1], "PythonLR")
+ points = sc.textFile(sys.argv[2]).map(parsePoint)
+ iterations = int(sys.argv[3])
+ model = LogisticRegressionWithSGD.train(points, iterations)
+ print "Final weights: " + str(model.weights)
+ print "Final intercept: " + str(model.intercept)
http://git-wip-us.apache.org/repos/asf/spark/blob/a000b5c3/examples/src/main/python/pagerank.py
----------------------------------------------------------------------
diff --git a/examples/src/main/python/pagerank.py b/examples/src/main/python/pagerank.py
new file mode 100755
index 0000000..cd774cf
--- /dev/null
+++ b/examples/src/main/python/pagerank.py
@@ -0,0 +1,70 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+#!/usr/bin/env python
+
+import re, sys
+from operator import add
+
+from pyspark import SparkContext
+
+
+def computeContribs(urls, rank):
+ """Calculates URL contributions to the rank of other URLs."""
+ num_urls = len(urls)
+ for url in urls: yield (url, rank / num_urls)
+
+
+def parseNeighbors(urls):
+ """Parses a urls pair string into urls pair."""
+ parts = re.split(r'\s+', urls)
+ return parts[0], parts[1]
+
+
+if __name__ == "__main__":
+ if len(sys.argv) < 3:
+ print >> sys.stderr, "Usage: pagerank <master> <file> <number_of_iterations>"
+ exit(-1)
+
+ # Initialize the spark context.
+ sc = SparkContext(sys.argv[1], "PythonPageRank")
+
+ # Loads in input file. It should be in format of:
+ # URL neighbor URL
+ # URL neighbor URL
+ # URL neighbor URL
+ # ...
+ lines = sc.textFile(sys.argv[2], 1)
+
+ # Loads all URLs from input file and initialize their neighbors.
+ links = lines.map(lambda urls: parseNeighbors(urls)).distinct().groupByKey().cache()
+
+ # Loads all URLs with other URL(s) link to from input file and initialize ranks of them to one.
+ ranks = links.map(lambda (url, neighbors): (url, 1.0))
+
+ # Calculates and updates URL ranks continuously using PageRank algorithm.
+ for iteration in xrange(int(sys.argv[3])):
+ # Calculates URL contributions to the rank of other URLs.
+ contribs = links.join(ranks).flatMap(lambda (url, (urls, rank)):
+ computeContribs(urls, rank))
+
+ # Re-calculates URL ranks based on neighbor contributions.
+ ranks = contribs.reduceByKey(add).mapValues(lambda rank: rank * 0.85 + 0.15)
+
+ # Collects all URL ranks and dump them to console.
+ for (link, rank) in ranks.collect():
+ print "%s has rank: %s." % (link, rank)
http://git-wip-us.apache.org/repos/asf/spark/blob/a000b5c3/examples/src/main/python/pi.py
----------------------------------------------------------------------
diff --git a/examples/src/main/python/pi.py b/examples/src/main/python/pi.py
new file mode 100755
index 0000000..ab0645f
--- /dev/null
+++ b/examples/src/main/python/pi.py
@@ -0,0 +1,37 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import sys
+from random import random
+from operator import add
+
+from pyspark import SparkContext
+
+
+if __name__ == "__main__":
+ if len(sys.argv) == 1:
+ print >> sys.stderr, "Usage: pi <master> [<slices>]"
+ exit(-1)
+ sc = SparkContext(sys.argv[1], "PythonPi")
+ slices = int(sys.argv[2]) if len(sys.argv) > 2 else 2
+ n = 100000 * slices
+ def f(_):
+ x = random() * 2 - 1
+ y = random() * 2 - 1
+ return 1 if x ** 2 + y ** 2 < 1 else 0
+ count = sc.parallelize(xrange(1, n+1), slices).map(f).reduce(add)
+ print "Pi is roughly %f" % (4.0 * count / n)
http://git-wip-us.apache.org/repos/asf/spark/blob/a000b5c3/examples/src/main/python/sort.py
----------------------------------------------------------------------
diff --git a/examples/src/main/python/sort.py b/examples/src/main/python/sort.py
new file mode 100755
index 0000000..5de20a6
--- /dev/null
+++ b/examples/src/main/python/sort.py
@@ -0,0 +1,36 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import sys
+
+from pyspark import SparkContext
+
+
+if __name__ == "__main__":
+ if len(sys.argv) < 3:
+ print >> sys.stderr, "Usage: sort <master> <file>"
+ exit(-1)
+ sc = SparkContext(sys.argv[1], "PythonSort")
+ lines = sc.textFile(sys.argv[2], 1)
+ sortedCount = lines.flatMap(lambda x: x.split(' ')) \
+ .map(lambda x: (int(x), 1)) \
+ .sortByKey(lambda x: x)
+ # This is just a demo on how to bring all the sorted data back to a single node.
+ # In reality, we wouldn't want to collect all the data to the driver node.
+ output = sortedCount.collect()
+ for (num, unitcount) in output:
+ print num