You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2013/09/01 23:58:47 UTC
[03/69] [abbrv] [partial] Initial work to rename package to
org.apache.spark
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java
new file mode 100644
index 0000000..261813b
--- /dev/null
+++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.examples;
+
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.streaming.*;
+import org.apache.spark.streaming.api.java.*;
+import org.apache.spark.streaming.dstream.SparkFlumeEvent;
+
+/**
+ * Produces a count of events received from Flume.
+ *
+ * This should be used in conjunction with an AvroSink in Flume. It will start
+ * an Avro server on at the request host:port address and listen for requests.
+ * Your Flume AvroSink should be pointed to this address.
+ *
+ * Usage: JavaFlumeEventCount <master> <host> <port>
+ *
+ * <master> is a Spark master URL
+ * <host> is the host the Flume receiver will be started on - a receiver
+ * creates a server and listens for flume events.
+ * <port> is the port the Flume receiver will listen on.
+ */
+public class JavaFlumeEventCount {
+ public static void main(String[] args) {
+ if (args.length != 3) {
+ System.err.println("Usage: JavaFlumeEventCount <master> <host> <port>");
+ System.exit(1);
+ }
+
+ String master = args[0];
+ String host = args[1];
+ int port = Integer.parseInt(args[2]);
+
+ Duration batchInterval = new Duration(2000);
+
+ JavaStreamingContext sc = new JavaStreamingContext(master, "FlumeEventCount", batchInterval,
+ System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR"));
+
+ JavaDStream<SparkFlumeEvent> flumeStream = sc.flumeStream("localhost", port);
+
+ flumeStream.count();
+
+ flumeStream.count().map(new Function<Long, String>() {
+ @Override
+ public String call(Long in) {
+ return "Received " + in + " flume events.";
+ }
+ }).print();
+
+ sc.start();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java
new file mode 100644
index 0000000..def87c1
--- /dev/null
+++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.examples;
+
+import com.google.common.collect.Lists;
+import scala.Tuple2;
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.api.java.function.Function2;
+import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.streaming.Duration;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaPairDStream;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+
+/**
+ * Counts words in UTF8 encoded, '\n' delimited text received from the network every second.
+ * Usage: NetworkWordCount <master> <hostname> <port>
+ * <master> is the Spark master URL. In local mode, <master> should be 'local[n]' with n > 1.
+ * <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive data.
+ *
+ * To run this on your local machine, you need to first run a Netcat server
+ * `$ nc -lk 9999`
+ * and then run the example
+ * `$ ./run spark.streaming.examples.JavaNetworkWordCount local[2] localhost 9999`
+ */
+public class JavaNetworkWordCount {
+ public static void main(String[] args) {
+ if (args.length < 3) {
+ System.err.println("Usage: NetworkWordCount <master> <hostname> <port>\n" +
+ "In local mode, <master> should be 'local[n]' with n > 1");
+ System.exit(1);
+ }
+
+ // Create the context with a 1 second batch size
+ JavaStreamingContext ssc = new JavaStreamingContext(args[0], "NetworkWordCount",
+ new Duration(1000), System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR"));
+
+ // Create a NetworkInputDStream on target ip:port and count the
+ // words in input stream of \n delimited test (eg. generated by 'nc')
+ JavaDStream<String> lines = ssc.socketTextStream(args[1], Integer.parseInt(args[2]));
+ JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
+ @Override
+ public Iterable<String> call(String x) {
+ return Lists.newArrayList(x.split(" "));
+ }
+ });
+ JavaPairDStream<String, Integer> wordCounts = words.map(
+ new PairFunction<String, String, Integer>() {
+ @Override
+ public Tuple2<String, Integer> call(String s) throws Exception {
+ return new Tuple2<String, Integer>(s, 1);
+ }
+ }).reduceByKey(new Function2<Integer, Integer, Integer>() {
+ @Override
+ public Integer call(Integer i1, Integer i2) throws Exception {
+ return i1 + i2;
+ }
+ });
+
+ wordCounts.print();
+ ssc.start();
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/examples/src/main/java/org/apache/spark/streaming/examples/JavaQueueStream.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaQueueStream.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaQueueStream.java
new file mode 100644
index 0000000..c8c7389
--- /dev/null
+++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaQueueStream.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.examples;
+
+import com.google.common.collect.Lists;
+import scala.Tuple2;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.Function2;
+import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.streaming.Duration;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaPairDStream;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+
+public class JavaQueueStream {
+ public static void main(String[] args) throws InterruptedException {
+ if (args.length < 1) {
+ System.err.println("Usage: JavaQueueStream <master>");
+ System.exit(1);
+ }
+
+ // Create the context
+ JavaStreamingContext ssc = new JavaStreamingContext(args[0], "QueueStream", new Duration(1000),
+ System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR"));
+
+ // Create the queue through which RDDs can be pushed to
+ // a QueueInputDStream
+ Queue<JavaRDD<Integer>> rddQueue = new LinkedList<JavaRDD<Integer>>();
+
+ // Create and push some RDDs into the queue
+ List<Integer> list = Lists.newArrayList();
+ for (int i = 0; i < 1000; i++) {
+ list.add(i);
+ }
+
+ for (int i = 0; i < 30; i++) {
+ rddQueue.add(ssc.sc().parallelize(list));
+ }
+
+
+ // Create the QueueInputDStream and use it do some processing
+ JavaDStream<Integer> inputStream = ssc.queueStream(rddQueue);
+ JavaPairDStream<Integer, Integer> mappedStream = inputStream.map(
+ new PairFunction<Integer, Integer, Integer>() {
+ @Override
+ public Tuple2<Integer, Integer> call(Integer i) throws Exception {
+ return new Tuple2<Integer, Integer>(i % 10, 1);
+ }
+ });
+ JavaPairDStream<Integer, Integer> reducedStream = mappedStream.reduceByKey(
+ new Function2<Integer, Integer, Integer>() {
+ @Override
+ public Integer call(Integer i1, Integer i2) throws Exception {
+ return i1 + i2;
+ }
+ });
+
+ reducedStream.print();
+ ssc.start();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/examples/src/main/java/spark/examples/JavaHdfsLR.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/spark/examples/JavaHdfsLR.java b/examples/src/main/java/spark/examples/JavaHdfsLR.java
deleted file mode 100644
index 9485e0c..0000000
--- a/examples/src/main/java/spark/examples/JavaHdfsLR.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.examples;
-
-import spark.api.java.JavaRDD;
-import spark.api.java.JavaSparkContext;
-import spark.api.java.function.Function;
-import spark.api.java.function.Function2;
-
-import java.io.Serializable;
-import java.util.Arrays;
-import java.util.StringTokenizer;
-import java.util.Random;
-
-/**
- * Logistic regression based classification.
- */
-public class JavaHdfsLR {
-
- static int D = 10; // Number of dimensions
- static Random rand = new Random(42);
-
- static class DataPoint implements Serializable {
- public DataPoint(double[] x, double y) {
- this.x = x;
- this.y = y;
- }
-
- double[] x;
- double y;
- }
-
- static class ParsePoint extends Function<String, DataPoint> {
- public DataPoint call(String line) {
- StringTokenizer tok = new StringTokenizer(line, " ");
- double y = Double.parseDouble(tok.nextToken());
- double[] x = new double[D];
- int i = 0;
- while (i < D) {
- x[i] = Double.parseDouble(tok.nextToken());
- i += 1;
- }
- return new DataPoint(x, y);
- }
- }
-
- static class VectorSum extends Function2<double[], double[], double[]> {
- public double[] call(double[] a, double[] b) {
- double[] result = new double[D];
- for (int j = 0; j < D; j++) {
- result[j] = a[j] + b[j];
- }
- return result;
- }
- }
-
- static class ComputeGradient extends Function<DataPoint, double[]> {
- double[] weights;
-
- public ComputeGradient(double[] weights) {
- this.weights = weights;
- }
-
- public double[] call(DataPoint p) {
- double[] gradient = new double[D];
- for (int i = 0; i < D; i++) {
- double dot = dot(weights, p.x);
- gradient[i] = (1 / (1 + Math.exp(-p.y * dot)) - 1) * p.y * p.x[i];
- }
- return gradient;
- }
- }
-
- public static double dot(double[] a, double[] b) {
- double x = 0;
- for (int i = 0; i < D; i++) {
- x += a[i] * b[i];
- }
- return x;
- }
-
- public static void printWeights(double[] a) {
- System.out.println(Arrays.toString(a));
- }
-
- public static void main(String[] args) {
-
- if (args.length < 3) {
- System.err.println("Usage: JavaHdfsLR <master> <file> <iters>");
- System.exit(1);
- }
-
- JavaSparkContext sc = new JavaSparkContext(args[0], "JavaHdfsLR",
- System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR"));
- JavaRDD<String> lines = sc.textFile(args[1]);
- JavaRDD<DataPoint> points = lines.map(new ParsePoint()).cache();
- int ITERATIONS = Integer.parseInt(args[2]);
-
- // Initialize w to a random value
- double[] w = new double[D];
- for (int i = 0; i < D; i++) {
- w[i] = 2 * rand.nextDouble() - 1;
- }
-
- System.out.print("Initial w: ");
- printWeights(w);
-
- for (int i = 1; i <= ITERATIONS; i++) {
- System.out.println("On iteration " + i);
-
- double[] gradient = points.map(
- new ComputeGradient(w)
- ).reduce(new VectorSum());
-
- for (int j = 0; j < D; j++) {
- w[j] -= gradient[j];
- }
-
- }
-
- System.out.print("Final w: ");
- printWeights(w);
- System.exit(0);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/examples/src/main/java/spark/examples/JavaKMeans.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/spark/examples/JavaKMeans.java b/examples/src/main/java/spark/examples/JavaKMeans.java
deleted file mode 100644
index 2d34776..0000000
--- a/examples/src/main/java/spark/examples/JavaKMeans.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.examples;
-
-import scala.Tuple2;
-import spark.api.java.JavaPairRDD;
-import spark.api.java.JavaRDD;
-import spark.api.java.JavaSparkContext;
-import spark.api.java.function.Function;
-import spark.api.java.function.PairFunction;
-import spark.util.Vector;
-
-import java.util.List;
-import java.util.Map;
-
-/**
- * K-means clustering using Java API.
- */
-public class JavaKMeans {
-
- /** Parses numbers split by whitespace to a vector */
- static Vector parseVector(String line) {
- String[] splits = line.split(" ");
- double[] data = new double[splits.length];
- int i = 0;
- for (String s : splits)
- data[i] = Double.parseDouble(splits[i++]);
- return new Vector(data);
- }
-
- /** Computes the vector to which the input vector is closest using squared distance */
- static int closestPoint(Vector p, List<Vector> centers) {
- int bestIndex = 0;
- double closest = Double.POSITIVE_INFINITY;
- for (int i = 0; i < centers.size(); i++) {
- double tempDist = p.squaredDist(centers.get(i));
- if (tempDist < closest) {
- closest = tempDist;
- bestIndex = i;
- }
- }
- return bestIndex;
- }
-
- /** Computes the mean across all vectors in the input set of vectors */
- static Vector average(List<Vector> ps) {
- int numVectors = ps.size();
- Vector out = new Vector(ps.get(0).elements());
- // start from i = 1 since we already copied index 0 above
- for (int i = 1; i < numVectors; i++) {
- out.addInPlace(ps.get(i));
- }
- return out.divide(numVectors);
- }
-
- public static void main(String[] args) throws Exception {
- if (args.length < 4) {
- System.err.println("Usage: JavaKMeans <master> <file> <k> <convergeDist>");
- System.exit(1);
- }
- JavaSparkContext sc = new JavaSparkContext(args[0], "JavaKMeans",
- System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR"));
- String path = args[1];
- int K = Integer.parseInt(args[2]);
- double convergeDist = Double.parseDouble(args[3]);
-
- JavaRDD<Vector> data = sc.textFile(path).map(
- new Function<String, Vector>() {
- @Override
- public Vector call(String line) throws Exception {
- return parseVector(line);
- }
- }
- ).cache();
-
- final List<Vector> centroids = data.takeSample(false, K, 42);
-
- double tempDist;
- do {
- // allocate each vector to closest centroid
- JavaPairRDD<Integer, Vector> closest = data.map(
- new PairFunction<Vector, Integer, Vector>() {
- @Override
- public Tuple2<Integer, Vector> call(Vector vector) throws Exception {
- return new Tuple2<Integer, Vector>(
- closestPoint(vector, centroids), vector);
- }
- }
- );
-
- // group by cluster id and average the vectors within each cluster to compute centroids
- JavaPairRDD<Integer, List<Vector>> pointsGroup = closest.groupByKey();
- Map<Integer, Vector> newCentroids = pointsGroup.mapValues(
- new Function<List<Vector>, Vector>() {
- public Vector call(List<Vector> ps) throws Exception {
- return average(ps);
- }
- }).collectAsMap();
- tempDist = 0.0;
- for (int i = 0; i < K; i++) {
- tempDist += centroids.get(i).squaredDist(newCentroids.get(i));
- }
- for (Map.Entry<Integer, Vector> t: newCentroids.entrySet()) {
- centroids.set(t.getKey(), t.getValue());
- }
- System.out.println("Finished iteration (delta = " + tempDist + ")");
- } while (tempDist > convergeDist);
-
- System.out.println("Final centers:");
- for (Vector c : centroids)
- System.out.println(c);
-
- System.exit(0);
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/examples/src/main/java/spark/examples/JavaLogQuery.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/spark/examples/JavaLogQuery.java b/examples/src/main/java/spark/examples/JavaLogQuery.java
deleted file mode 100644
index d22684d..0000000
--- a/examples/src/main/java/spark/examples/JavaLogQuery.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.examples;
-
-import com.google.common.collect.Lists;
-import scala.Tuple2;
-import scala.Tuple3;
-import spark.api.java.JavaPairRDD;
-import spark.api.java.JavaRDD;
-import spark.api.java.JavaSparkContext;
-import spark.api.java.function.Function2;
-import spark.api.java.function.PairFunction;
-
-import java.io.Serializable;
-import java.util.Collections;
-import java.util.List;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-/**
- * Executes a roll up-style query against Apache logs.
- */
-public class JavaLogQuery {
-
- public static List<String> exampleApacheLogs = Lists.newArrayList(
- "10.10.10.10 - \"FRED\" [18/Jan/2013:17:56:07 +1100] \"GET http://images.com/2013/Generic.jpg " +
- "HTTP/1.1\" 304 315 \"http://referall.com/\" \"Mozilla/4.0 (compatible; MSIE 7.0; " +
- "Windows NT 5.1; GTB7.4; .NET CLR 2.0.50727; .NET CLR 3.0.04506.30; .NET CLR 3.0.04506.648; " +
- ".NET CLR 3.5.21022; .NET CLR 3.0.4506.2152; .NET CLR 1.0.3705; .NET CLR 1.1.4322; .NET CLR " +
- "3.5.30729; Release=ARP)\" \"UD-1\" - \"image/jpeg\" \"whatever\" 0.350 \"-\" - \"\" 265 923 934 \"\" " +
- "62.24.11.25 images.com 1358492167 - Whatup",
- "10.10.10.10 - \"FRED\" [18/Jan/2013:18:02:37 +1100] \"GET http://images.com/2013/Generic.jpg " +
- "HTTP/1.1\" 304 306 \"http:/referall.com\" \"Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; " +
- "GTB7.4; .NET CLR 2.0.50727; .NET CLR 3.0.04506.30; .NET CLR 3.0.04506.648; .NET CLR " +
- "3.5.21022; .NET CLR 3.0.4506.2152; .NET CLR 1.0.3705; .NET CLR 1.1.4322; .NET CLR " +
- "3.5.30729; Release=ARP)\" \"UD-1\" - \"image/jpeg\" \"whatever\" 0.352 \"-\" - \"\" 256 977 988 \"\" " +
- "0 73.23.2.15 images.com 1358492557 - Whatup");
-
- public static Pattern apacheLogRegex = Pattern.compile(
- "^([\\d.]+) (\\S+) (\\S+) \\[([\\w\\d:/]+\\s[+\\-]\\d{4})\\] \"(.+?)\" (\\d{3}) ([\\d\\-]+) \"([^\"]+)\" \"([^\"]+)\".*");
-
- /** Tracks the total query count and number of aggregate bytes for a particular group. */
- public static class Stats implements Serializable {
-
- private int count;
- private int numBytes;
-
- public Stats(int count, int numBytes) {
- this.count = count;
- this.numBytes = numBytes;
- }
- public Stats merge(Stats other) {
- return new Stats(count + other.count, numBytes + other.numBytes);
- }
-
- public String toString() {
- return String.format("bytes=%s\tn=%s", numBytes, count);
- }
- }
-
- public static Tuple3<String, String, String> extractKey(String line) {
- Matcher m = apacheLogRegex.matcher(line);
- List<String> key = Collections.emptyList();
- if (m.find()) {
- String ip = m.group(1);
- String user = m.group(3);
- String query = m.group(5);
- if (!user.equalsIgnoreCase("-")) {
- return new Tuple3<String, String, String>(ip, user, query);
- }
- }
- return new Tuple3<String, String, String>(null, null, null);
- }
-
- public static Stats extractStats(String line) {
- Matcher m = apacheLogRegex.matcher(line);
- if (m.find()) {
- int bytes = Integer.parseInt(m.group(7));
- return new Stats(1, bytes);
- }
- else
- return new Stats(1, 0);
- }
-
- public static void main(String[] args) throws Exception {
- if (args.length == 0) {
- System.err.println("Usage: JavaLogQuery <master> [logFile]");
- System.exit(1);
- }
-
- JavaSparkContext jsc = new JavaSparkContext(args[0], "JavaLogQuery",
- System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR"));
-
- JavaRDD<String> dataSet = (args.length == 2) ? jsc.textFile(args[1]) : jsc.parallelize(exampleApacheLogs);
-
- JavaPairRDD<Tuple3<String, String, String>, Stats> extracted = dataSet.map(new PairFunction<String, Tuple3<String, String, String>, Stats>() {
- @Override
- public Tuple2<Tuple3<String, String, String>, Stats> call(String s) throws Exception {
- return new Tuple2<Tuple3<String, String, String>, Stats>(extractKey(s), extractStats(s));
- }
- });
-
- JavaPairRDD<Tuple3<String, String, String>, Stats> counts = extracted.reduceByKey(new Function2<Stats, Stats, Stats>() {
- @Override
- public Stats call(Stats stats, Stats stats2) throws Exception {
- return stats.merge(stats2);
- }
- });
-
- List<Tuple2<Tuple3<String, String, String>, Stats>> output = counts.collect();
- for (Tuple2 t : output) {
- System.out.println(t._1 + "\t" + t._2);
- }
- System.exit(0);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/examples/src/main/java/spark/examples/JavaPageRank.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/spark/examples/JavaPageRank.java b/examples/src/main/java/spark/examples/JavaPageRank.java
deleted file mode 100644
index 75df1af..0000000
--- a/examples/src/main/java/spark/examples/JavaPageRank.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.examples;
-
-import scala.Tuple2;
-import spark.api.java.JavaPairRDD;
-import spark.api.java.JavaRDD;
-import spark.api.java.JavaSparkContext;
-import spark.api.java.function.FlatMapFunction;
-import spark.api.java.function.Function;
-import spark.api.java.function.Function2;
-import spark.api.java.function.PairFlatMapFunction;
-import spark.api.java.function.PairFunction;
-
-import java.util.List;
-import java.util.ArrayList;
-
-/**
- * Computes the PageRank of URLs from an input file. Input file should
- * be in format of:
- * URL neighbor URL
- * URL neighbor URL
- * URL neighbor URL
- * ...
- * where URL and their neighbors are separated by space(s).
- */
-public class JavaPageRank {
- private static class Sum extends Function2<Double, Double, Double> {
- @Override
- public Double call(Double a, Double b) {
- return a + b;
- }
- }
-
- public static void main(String[] args) throws Exception {
- if (args.length < 3) {
- System.err.println("Usage: JavaPageRank <master> <file> <number_of_iterations>");
- System.exit(1);
- }
-
- JavaSparkContext ctx = new JavaSparkContext(args[0], "JavaPageRank",
- System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR"));
-
- // Loads in input file. It should be in format of:
- // URL neighbor URL
- // URL neighbor URL
- // URL neighbor URL
- // ...
- JavaRDD<String> lines = ctx.textFile(args[1], 1);
-
- // Loads all URLs from input file and initialize their neighbors.
- JavaPairRDD<String, List<String>> links = lines.map(new PairFunction<String, String, String>() {
- @Override
- public Tuple2<String, String> call(String s) {
- String[] parts = s.split("\\s+");
- return new Tuple2<String, String>(parts[0], parts[1]);
- }
- }).distinct().groupByKey().cache();
-
- // Loads all URLs with other URL(s) link to from input file and initialize ranks of them to one.
- JavaPairRDD<String, Double> ranks = links.mapValues(new Function<List<String>, Double>() {
- @Override
- public Double call(List<String> rs) throws Exception {
- return 1.0;
- }
- });
-
- // Calculates and updates URL ranks continuously using PageRank algorithm.
- for (int current = 0; current < Integer.parseInt(args[2]); current++) {
- // Calculates URL contributions to the rank of other URLs.
- JavaPairRDD<String, Double> contribs = links.join(ranks).values()
- .flatMap(new PairFlatMapFunction<Tuple2<List<String>, Double>, String, Double>() {
- @Override
- public Iterable<Tuple2<String, Double>> call(Tuple2<List<String>, Double> s) {
- List<Tuple2<String, Double>> results = new ArrayList<Tuple2<String, Double>>();
- for (String n : s._1) {
- results.add(new Tuple2<String, Double>(n, s._2 / s._1.size()));
- }
- return results;
- }
- });
-
- // Re-calculates URL ranks based on neighbor contributions.
- ranks = contribs.reduceByKey(new Sum()).mapValues(new Function<Double, Double>() {
- @Override
- public Double call(Double sum) throws Exception {
- return 0.15 + sum * 0.85;
- }
- });
- }
-
- // Collects all URL ranks and dump them to console.
- List<Tuple2<String, Double>> output = ranks.collect();
- for (Tuple2 tuple : output) {
- System.out.println(tuple._1 + " has rank: " + tuple._2 + ".");
- }
-
- System.exit(0);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/examples/src/main/java/spark/examples/JavaSparkPi.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/spark/examples/JavaSparkPi.java b/examples/src/main/java/spark/examples/JavaSparkPi.java
deleted file mode 100644
index d5f42fb..0000000
--- a/examples/src/main/java/spark/examples/JavaSparkPi.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.examples;
-
-import spark.api.java.JavaRDD;
-import spark.api.java.JavaSparkContext;
-import spark.api.java.function.Function;
-import spark.api.java.function.Function2;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/** Computes an approximation to pi */
-public class JavaSparkPi {
-
-
- public static void main(String[] args) throws Exception {
- if (args.length == 0) {
- System.err.println("Usage: JavaLogQuery <master> [slices]");
- System.exit(1);
- }
-
- JavaSparkContext jsc = new JavaSparkContext(args[0], "JavaLogQuery",
- System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR"));
-
- int slices = (args.length == 2) ? Integer.parseInt(args[1]) : 2;
- int n = 100000 * slices;
- List<Integer> l = new ArrayList<Integer>(n);
- for (int i = 0; i < n; i++)
- l.add(i);
-
- JavaRDD<Integer> dataSet = jsc.parallelize(l, slices);
-
- int count = dataSet.map(new Function<Integer, Integer>() {
- @Override
- public Integer call(Integer integer) throws Exception {
- double x = Math.random() * 2 - 1;
- double y = Math.random() * 2 - 1;
- return (x * x + y * y < 1) ? 1 : 0;
- }
- }).reduce(new Function2<Integer, Integer, Integer>() {
- @Override
- public Integer call(Integer integer, Integer integer2) throws Exception {
- return integer + integer2;
- }
- });
-
- System.out.println("Pi is roughly " + 4.0 * count / n);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/examples/src/main/java/spark/examples/JavaTC.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/spark/examples/JavaTC.java b/examples/src/main/java/spark/examples/JavaTC.java
deleted file mode 100644
index 559d7f9..0000000
--- a/examples/src/main/java/spark/examples/JavaTC.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.examples;
-
-import scala.Tuple2;
-import spark.api.java.JavaPairRDD;
-import spark.api.java.JavaSparkContext;
-import spark.api.java.function.PairFunction;
-
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Random;
-import java.util.Set;
-
-/**
- * Transitive closure on a graph, implemented in Java.
- */
-public class JavaTC {
-
- static int numEdges = 200;
- static int numVertices = 100;
- static Random rand = new Random(42);
-
- static List<Tuple2<Integer, Integer>> generateGraph() {
- Set<Tuple2<Integer, Integer>> edges = new HashSet<Tuple2<Integer, Integer>>(numEdges);
- while (edges.size() < numEdges) {
- int from = rand.nextInt(numVertices);
- int to = rand.nextInt(numVertices);
- Tuple2<Integer, Integer> e = new Tuple2<Integer, Integer>(from, to);
- if (from != to) edges.add(e);
- }
- return new ArrayList<Tuple2<Integer, Integer>>(edges);
- }
-
- static class ProjectFn extends PairFunction<Tuple2<Integer, Tuple2<Integer, Integer>>,
- Integer, Integer> {
- static ProjectFn INSTANCE = new ProjectFn();
-
- public Tuple2<Integer, Integer> call(Tuple2<Integer, Tuple2<Integer, Integer>> triple) {
- return new Tuple2<Integer, Integer>(triple._2()._2(), triple._2()._1());
- }
- }
-
- public static void main(String[] args) {
- if (args.length == 0) {
- System.err.println("Usage: JavaTC <host> [<slices>]");
- System.exit(1);
- }
-
- JavaSparkContext sc = new JavaSparkContext(args[0], "JavaTC",
- System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR"));
- Integer slices = (args.length > 1) ? Integer.parseInt(args[1]): 2;
- JavaPairRDD<Integer, Integer> tc = sc.parallelizePairs(generateGraph(), slices).cache();
-
- // Linear transitive closure: each round grows paths by one edge,
- // by joining the graph's edges with the already-discovered paths.
- // e.g. join the path (y, z) from the TC with the edge (x, y) from
- // the graph to obtain the path (x, z).
-
- // Because join() joins on keys, the edges are stored in reversed order.
- JavaPairRDD<Integer, Integer> edges = tc.map(
- new PairFunction<Tuple2<Integer, Integer>, Integer, Integer>() {
- public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> e) {
- return new Tuple2<Integer, Integer>(e._2(), e._1());
- }
- });
-
- long oldCount = 0;
- long nextCount = tc.count();
- do {
- oldCount = nextCount;
- // Perform the join, obtaining an RDD of (y, (z, x)) pairs,
- // then project the result to obtain the new (x, z) paths.
- tc = tc.union(tc.join(edges).map(ProjectFn.INSTANCE)).distinct().cache();
- nextCount = tc.count();
- } while (nextCount != oldCount);
-
- System.out.println("TC has " + tc.count() + " edges.");
- System.exit(0);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/examples/src/main/java/spark/examples/JavaWordCount.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/spark/examples/JavaWordCount.java b/examples/src/main/java/spark/examples/JavaWordCount.java
deleted file mode 100644
index 1af370c..0000000
--- a/examples/src/main/java/spark/examples/JavaWordCount.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.examples;
-
-import scala.Tuple2;
-import spark.api.java.JavaPairRDD;
-import spark.api.java.JavaRDD;
-import spark.api.java.JavaSparkContext;
-import spark.api.java.function.FlatMapFunction;
-import spark.api.java.function.Function2;
-import spark.api.java.function.PairFunction;
-
-import java.util.Arrays;
-import java.util.List;
-
-public class JavaWordCount {
- public static void main(String[] args) throws Exception {
- if (args.length < 2) {
- System.err.println("Usage: JavaWordCount <master> <file>");
- System.exit(1);
- }
-
- JavaSparkContext ctx = new JavaSparkContext(args[0], "JavaWordCount",
- System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR"));
- JavaRDD<String> lines = ctx.textFile(args[1], 1);
-
- JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
- public Iterable<String> call(String s) {
- return Arrays.asList(s.split(" "));
- }
- });
-
- JavaPairRDD<String, Integer> ones = words.map(new PairFunction<String, String, Integer>() {
- public Tuple2<String, Integer> call(String s) {
- return new Tuple2<String, Integer>(s, 1);
- }
- });
-
- JavaPairRDD<String, Integer> counts = ones.reduceByKey(new Function2<Integer, Integer, Integer>() {
- public Integer call(Integer i1, Integer i2) {
- return i1 + i2;
- }
- });
-
- List<Tuple2<String, Integer>> output = counts.collect();
- for (Tuple2 tuple : output) {
- System.out.println(tuple._1 + ": " + tuple._2);
- }
- System.exit(0);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/examples/src/main/java/spark/mllib/examples/JavaALS.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/spark/mllib/examples/JavaALS.java b/examples/src/main/java/spark/mllib/examples/JavaALS.java
deleted file mode 100644
index b48f459..0000000
--- a/examples/src/main/java/spark/mllib/examples/JavaALS.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.mllib.examples;
-
-import spark.api.java.JavaRDD;
-import spark.api.java.JavaSparkContext;
-import spark.api.java.function.Function;
-
-import spark.mllib.recommendation.ALS;
-import spark.mllib.recommendation.MatrixFactorizationModel;
-import spark.mllib.recommendation.Rating;
-
-import java.io.Serializable;
-import java.util.Arrays;
-import java.util.StringTokenizer;
-
-import scala.Tuple2;
-
-/**
- * Example using MLLib ALS from Java.
- */
-public class JavaALS {
-
- static class ParseRating extends Function<String, Rating> {
- public Rating call(String line) {
- StringTokenizer tok = new StringTokenizer(line, ",");
- int x = Integer.parseInt(tok.nextToken());
- int y = Integer.parseInt(tok.nextToken());
- double rating = Double.parseDouble(tok.nextToken());
- return new Rating(x, y, rating);
- }
- }
-
- static class FeaturesToString extends Function<Tuple2<Object, double[]>, String> {
- public String call(Tuple2<Object, double[]> element) {
- return element._1().toString() + "," + Arrays.toString(element._2());
- }
- }
-
- public static void main(String[] args) {
-
- if (args.length != 5 && args.length != 6) {
- System.err.println(
- "Usage: JavaALS <master> <ratings_file> <rank> <iterations> <output_dir> [<blocks>]");
- System.exit(1);
- }
-
- int rank = Integer.parseInt(args[2]);
- int iterations = Integer.parseInt(args[3]);
- String outputDir = args[4];
- int blocks = -1;
- if (args.length == 6) {
- blocks = Integer.parseInt(args[5]);
- }
-
- JavaSparkContext sc = new JavaSparkContext(args[0], "JavaALS",
- System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR"));
- JavaRDD<String> lines = sc.textFile(args[1]);
-
- JavaRDD<Rating> ratings = lines.map(new ParseRating());
-
- MatrixFactorizationModel model = ALS.train(ratings.rdd(), rank, iterations, 0.01, blocks);
-
- model.userFeatures().toJavaRDD().map(new FeaturesToString()).saveAsTextFile(
- outputDir + "/userFeatures");
- model.productFeatures().toJavaRDD().map(new FeaturesToString()).saveAsTextFile(
- outputDir + "/productFeatures");
- System.out.println("Final user/product features written to " + outputDir);
-
- System.exit(0);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/examples/src/main/java/spark/mllib/examples/JavaKMeans.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/spark/mllib/examples/JavaKMeans.java b/examples/src/main/java/spark/mllib/examples/JavaKMeans.java
deleted file mode 100644
index 02f4043..0000000
--- a/examples/src/main/java/spark/mllib/examples/JavaKMeans.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.mllib.examples;
-
-import spark.api.java.JavaRDD;
-import spark.api.java.JavaSparkContext;
-import spark.api.java.function.Function;
-
-import spark.mllib.clustering.KMeans;
-import spark.mllib.clustering.KMeansModel;
-
-import java.util.Arrays;
-import java.util.StringTokenizer;
-
-/**
- * Example using MLLib KMeans from Java.
- */
-public class JavaKMeans {
-
- static class ParsePoint extends Function<String, double[]> {
- public double[] call(String line) {
- StringTokenizer tok = new StringTokenizer(line, " ");
- int numTokens = tok.countTokens();
- double[] point = new double[numTokens];
- for (int i = 0; i < numTokens; ++i) {
- point[i] = Double.parseDouble(tok.nextToken());
- }
- return point;
- }
- }
-
- public static void main(String[] args) {
-
- if (args.length < 4) {
- System.err.println(
- "Usage: JavaKMeans <master> <input_file> <k> <max_iterations> [<runs>]");
- System.exit(1);
- }
-
- String inputFile = args[1];
- int k = Integer.parseInt(args[2]);
- int iterations = Integer.parseInt(args[3]);
- int runs = 1;
-
- if (args.length >= 5) {
- runs = Integer.parseInt(args[4]);
- }
-
- JavaSparkContext sc = new JavaSparkContext(args[0], "JavaKMeans",
- System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR"));
- JavaRDD<String> lines = sc.textFile(args[1]);
-
- JavaRDD<double[]> points = lines.map(new ParsePoint());
-
- KMeansModel model = KMeans.train(points.rdd(), k, iterations, runs);
-
- System.out.println("Cluster centers:");
- for (double[] center : model.clusterCenters()) {
- System.out.println(" " + Arrays.toString(center));
- }
- double cost = model.computeCost(points.rdd());
- System.out.println("Cost: " + cost);
-
- System.exit(0);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/examples/src/main/java/spark/mllib/examples/JavaLR.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/spark/mllib/examples/JavaLR.java b/examples/src/main/java/spark/mllib/examples/JavaLR.java
deleted file mode 100644
index bf4aeaf..0000000
--- a/examples/src/main/java/spark/mllib/examples/JavaLR.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.mllib.examples;
-
-
-import spark.api.java.JavaRDD;
-import spark.api.java.JavaSparkContext;
-import spark.api.java.function.Function;
-
-import spark.mllib.classification.LogisticRegressionWithSGD;
-import spark.mllib.classification.LogisticRegressionModel;
-import spark.mllib.regression.LabeledPoint;
-
-import java.util.Arrays;
-import java.util.StringTokenizer;
-
-/**
- * Logistic regression based classification using ML Lib.
- */
-public class JavaLR {
-
- static class ParsePoint extends Function<String, LabeledPoint> {
- public LabeledPoint call(String line) {
- String[] parts = line.split(",");
- double y = Double.parseDouble(parts[0]);
- StringTokenizer tok = new StringTokenizer(parts[1], " ");
- int numTokens = tok.countTokens();
- double[] x = new double[numTokens];
- for (int i = 0; i < numTokens; ++i) {
- x[i] = Double.parseDouble(tok.nextToken());
- }
- return new LabeledPoint(y, x);
- }
- }
-
- public static void printWeights(double[] a) {
- System.out.println(Arrays.toString(a));
- }
-
- public static void main(String[] args) {
- if (args.length != 4) {
- System.err.println("Usage: JavaLR <master> <input_dir> <step_size> <niters>");
- System.exit(1);
- }
-
- JavaSparkContext sc = new JavaSparkContext(args[0], "JavaLR",
- System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR"));
- JavaRDD<String> lines = sc.textFile(args[1]);
- JavaRDD<LabeledPoint> points = lines.map(new ParsePoint()).cache();
- double stepSize = Double.parseDouble(args[2]);
- int iterations = Integer.parseInt(args[3]);
-
- // Another way to configure LogisticRegression
- //
- // LogisticRegressionWithSGD lr = new LogisticRegressionWithSGD();
- // lr.optimizer().setNumIterations(iterations)
- // .setStepSize(stepSize)
- // .setMiniBatchFraction(1.0);
- // lr.setIntercept(true);
- // LogisticRegressionModel model = lr.train(points.rdd());
-
- LogisticRegressionModel model = LogisticRegressionWithSGD.train(points.rdd(),
- iterations, stepSize);
-
- System.out.print("Final w: ");
- printWeights(model.weights());
-
- System.exit(0);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/examples/src/main/java/spark/streaming/examples/JavaFlumeEventCount.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/spark/streaming/examples/JavaFlumeEventCount.java b/examples/src/main/java/spark/streaming/examples/JavaFlumeEventCount.java
deleted file mode 100644
index 096a9ae..0000000
--- a/examples/src/main/java/spark/streaming/examples/JavaFlumeEventCount.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.streaming.examples;
-
-import spark.api.java.function.Function;
-import spark.streaming.*;
-import spark.streaming.api.java.*;
-import spark.streaming.dstream.SparkFlumeEvent;
-
-/**
- * Produces a count of events received from Flume.
- *
- * This should be used in conjunction with an AvroSink in Flume. It will start
- * an Avro server on at the request host:port address and listen for requests.
- * Your Flume AvroSink should be pointed to this address.
- *
- * Usage: JavaFlumeEventCount <master> <host> <port>
- *
- * <master> is a Spark master URL
- * <host> is the host the Flume receiver will be started on - a receiver
- * creates a server and listens for flume events.
- * <port> is the port the Flume receiver will listen on.
- */
-public class JavaFlumeEventCount {
- public static void main(String[] args) {
- if (args.length != 3) {
- System.err.println("Usage: JavaFlumeEventCount <master> <host> <port>");
- System.exit(1);
- }
-
- String master = args[0];
- String host = args[1];
- int port = Integer.parseInt(args[2]);
-
- Duration batchInterval = new Duration(2000);
-
- JavaStreamingContext sc = new JavaStreamingContext(master, "FlumeEventCount", batchInterval,
- System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR"));
-
- JavaDStream<SparkFlumeEvent> flumeStream = sc.flumeStream("localhost", port);
-
- flumeStream.count();
-
- flumeStream.count().map(new Function<Long, String>() {
- @Override
- public String call(Long in) {
- return "Received " + in + " flume events.";
- }
- }).print();
-
- sc.start();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/examples/src/main/java/spark/streaming/examples/JavaNetworkWordCount.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/spark/streaming/examples/JavaNetworkWordCount.java b/examples/src/main/java/spark/streaming/examples/JavaNetworkWordCount.java
deleted file mode 100644
index c54d3f3..0000000
--- a/examples/src/main/java/spark/streaming/examples/JavaNetworkWordCount.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.streaming.examples;
-
-import com.google.common.collect.Lists;
-import scala.Tuple2;
-import spark.api.java.function.FlatMapFunction;
-import spark.api.java.function.Function2;
-import spark.api.java.function.PairFunction;
-import spark.streaming.Duration;
-import spark.streaming.api.java.JavaDStream;
-import spark.streaming.api.java.JavaPairDStream;
-import spark.streaming.api.java.JavaStreamingContext;
-
-/**
- * Counts words in UTF8 encoded, '\n' delimited text received from the network every second.
- * Usage: NetworkWordCount <master> <hostname> <port>
- * <master> is the Spark master URL. In local mode, <master> should be 'local[n]' with n > 1.
- * <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive data.
- *
- * To run this on your local machine, you need to first run a Netcat server
- * `$ nc -lk 9999`
- * and then run the example
- * `$ ./run spark.streaming.examples.JavaNetworkWordCount local[2] localhost 9999`
- */
-public class JavaNetworkWordCount {
- public static void main(String[] args) {
- if (args.length < 3) {
- System.err.println("Usage: NetworkWordCount <master> <hostname> <port>\n" +
- "In local mode, <master> should be 'local[n]' with n > 1");
- System.exit(1);
- }
-
- // Create the context with a 1 second batch size
- JavaStreamingContext ssc = new JavaStreamingContext(args[0], "NetworkWordCount",
- new Duration(1000), System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR"));
-
- // Create a NetworkInputDStream on target ip:port and count the
- // words in input stream of \n delimited test (eg. generated by 'nc')
- JavaDStream<String> lines = ssc.socketTextStream(args[1], Integer.parseInt(args[2]));
- JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
- @Override
- public Iterable<String> call(String x) {
- return Lists.newArrayList(x.split(" "));
- }
- });
- JavaPairDStream<String, Integer> wordCounts = words.map(
- new PairFunction<String, String, Integer>() {
- @Override
- public Tuple2<String, Integer> call(String s) throws Exception {
- return new Tuple2<String, Integer>(s, 1);
- }
- }).reduceByKey(new Function2<Integer, Integer, Integer>() {
- @Override
- public Integer call(Integer i1, Integer i2) throws Exception {
- return i1 + i2;
- }
- });
-
- wordCounts.print();
- ssc.start();
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/examples/src/main/java/spark/streaming/examples/JavaQueueStream.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/spark/streaming/examples/JavaQueueStream.java b/examples/src/main/java/spark/streaming/examples/JavaQueueStream.java
deleted file mode 100644
index 1f4a991..0000000
--- a/examples/src/main/java/spark/streaming/examples/JavaQueueStream.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.streaming.examples;
-
-import com.google.common.collect.Lists;
-import scala.Tuple2;
-import spark.api.java.JavaRDD;
-import spark.api.java.function.Function2;
-import spark.api.java.function.PairFunction;
-import spark.streaming.Duration;
-import spark.streaming.api.java.JavaDStream;
-import spark.streaming.api.java.JavaPairDStream;
-import spark.streaming.api.java.JavaStreamingContext;
-
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Queue;
-
-public class JavaQueueStream {
- public static void main(String[] args) throws InterruptedException {
- if (args.length < 1) {
- System.err.println("Usage: JavaQueueStream <master>");
- System.exit(1);
- }
-
- // Create the context
- JavaStreamingContext ssc = new JavaStreamingContext(args[0], "QueueStream", new Duration(1000),
- System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR"));
-
- // Create the queue through which RDDs can be pushed to
- // a QueueInputDStream
- Queue<JavaRDD<Integer>> rddQueue = new LinkedList<JavaRDD<Integer>>();
-
- // Create and push some RDDs into the queue
- List<Integer> list = Lists.newArrayList();
- for (int i = 0; i < 1000; i++) {
- list.add(i);
- }
-
- for (int i = 0; i < 30; i++) {
- rddQueue.add(ssc.sc().parallelize(list));
- }
-
-
- // Create the QueueInputDStream and use it do some processing
- JavaDStream<Integer> inputStream = ssc.queueStream(rddQueue);
- JavaPairDStream<Integer, Integer> mappedStream = inputStream.map(
- new PairFunction<Integer, Integer, Integer>() {
- @Override
- public Tuple2<Integer, Integer> call(Integer i) throws Exception {
- return new Tuple2<Integer, Integer>(i % 10, 1);
- }
- });
- JavaPairDStream<Integer, Integer> reducedStream = mappedStream.reduceByKey(
- new Function2<Integer, Integer, Integer>() {
- @Override
- public Integer call(Integer i1, Integer i2) throws Exception {
- return i1 + i2;
- }
- });
-
- reducedStream.print();
- ssc.start();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala b/examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala
new file mode 100644
index 0000000..868ff81
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples
+
+import org.apache.spark.SparkContext
+
+object BroadcastTest {
+ def main(args: Array[String]) {
+ if (args.length == 0) {
+ System.err.println("Usage: BroadcastTest <master> [<slices>] [numElem]")
+ System.exit(1)
+ }
+
+ val sc = new SparkContext(args(0), "Broadcast Test",
+ System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
+ val slices = if (args.length > 1) args(1).toInt else 2
+ val num = if (args.length > 2) args(2).toInt else 1000000
+
+ var arr1 = new Array[Int](num)
+ for (i <- 0 until arr1.length) {
+ arr1(i) = i
+ }
+
+ for (i <- 0 until 2) {
+ println("Iteration " + i)
+ println("===========")
+ val barr1 = sc.broadcast(arr1)
+ sc.parallelize(1 to 10, slices).foreach {
+ i => println(barr1.value.size)
+ }
+ }
+
+ System.exit(0)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/examples/src/main/scala/org/apache/spark/examples/CassandraTest.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/CassandraTest.scala b/examples/src/main/scala/org/apache/spark/examples/CassandraTest.scala
new file mode 100644
index 0000000..33bf715
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/CassandraTest.scala
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples
+
+import org.apache.hadoop.mapreduce.Job
+import org.apache.cassandra.hadoop.ColumnFamilyOutputFormat
+import org.apache.cassandra.hadoop.ConfigHelper
+import org.apache.cassandra.hadoop.ColumnFamilyInputFormat
+import org.apache.cassandra.thrift._
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkContext._
+import java.nio.ByteBuffer
+import java.util.SortedMap
+import org.apache.cassandra.db.IColumn
+import org.apache.cassandra.utils.ByteBufferUtil
+import scala.collection.JavaConversions._
+
+
+/*
+ * This example demonstrates using Spark with Cassandra with the New Hadoop API and Cassandra
+ * support for Hadoop.
+ *
+ * To run this example, run this file with the following command params -
+ * <spark_master> <cassandra_node> <cassandra_port>
+ *
+ * So if you want to run this on localhost this will be,
+ * local[3] localhost 9160
+ *
+ * The example makes some assumptions:
+ * 1. You have already created a keyspace called casDemo and it has a column family named Words
+ * 2. There are column family has a column named "para" which has test content.
+ *
+ * You can create the content by running the following script at the bottom of this file with
+ * cassandra-cli.
+ *
+ */
+object CassandraTest {
+
+ def main(args: Array[String]) {
+
+ // Get a SparkContext
+ val sc = new SparkContext(args(0), "casDemo")
+
+ // Build the job configuration with ConfigHelper provided by Cassandra
+ val job = new Job()
+ job.setInputFormatClass(classOf[ColumnFamilyInputFormat])
+
+ val host: String = args(1)
+ val port: String = args(2)
+
+ ConfigHelper.setInputInitialAddress(job.getConfiguration(), host)
+ ConfigHelper.setInputRpcPort(job.getConfiguration(), port)
+ ConfigHelper.setOutputInitialAddress(job.getConfiguration(), host)
+ ConfigHelper.setOutputRpcPort(job.getConfiguration(), port)
+ ConfigHelper.setInputColumnFamily(job.getConfiguration(), "casDemo", "Words")
+ ConfigHelper.setOutputColumnFamily(job.getConfiguration(), "casDemo", "WordCount")
+
+ val predicate = new SlicePredicate()
+ val sliceRange = new SliceRange()
+ sliceRange.setStart(Array.empty[Byte])
+ sliceRange.setFinish(Array.empty[Byte])
+ predicate.setSlice_range(sliceRange)
+ ConfigHelper.setInputSlicePredicate(job.getConfiguration(), predicate)
+
+ ConfigHelper.setInputPartitioner(job.getConfiguration(), "Murmur3Partitioner")
+ ConfigHelper.setOutputPartitioner(job.getConfiguration(), "Murmur3Partitioner")
+
+ // Make a new Hadoop RDD
+ val casRdd = sc.newAPIHadoopRDD(
+ job.getConfiguration(),
+ classOf[ColumnFamilyInputFormat],
+ classOf[ByteBuffer],
+ classOf[SortedMap[ByteBuffer, IColumn]])
+
+ // Let us first get all the paragraphs from the retrieved rows
+ val paraRdd = casRdd.map {
+ case (key, value) => {
+ ByteBufferUtil.string(value.get(ByteBufferUtil.bytes("para")).value())
+ }
+ }
+
+ // Lets get the word count in paras
+ val counts = paraRdd.flatMap(p => p.split(" ")).map(word => (word, 1)).reduceByKey(_ + _)
+
+ counts.collect().foreach {
+ case (word, count) => println(word + ":" + count)
+ }
+
+ counts.map {
+ case (word, count) => {
+ val colWord = new org.apache.cassandra.thrift.Column()
+ colWord.setName(ByteBufferUtil.bytes("word"))
+ colWord.setValue(ByteBufferUtil.bytes(word))
+ colWord.setTimestamp(System.currentTimeMillis)
+
+ val colCount = new org.apache.cassandra.thrift.Column()
+ colCount.setName(ByteBufferUtil.bytes("wcount"))
+ colCount.setValue(ByteBufferUtil.bytes(count.toLong))
+ colCount.setTimestamp(System.currentTimeMillis)
+
+ val outputkey = ByteBufferUtil.bytes(word + "-COUNT-" + System.currentTimeMillis)
+
+ val mutations: java.util.List[Mutation] = new Mutation() :: new Mutation() :: Nil
+ mutations.get(0).setColumn_or_supercolumn(new ColumnOrSuperColumn())
+ mutations.get(0).column_or_supercolumn.setColumn(colWord)
+ mutations.get(1).setColumn_or_supercolumn(new ColumnOrSuperColumn())
+ mutations.get(1).column_or_supercolumn.setColumn(colCount)
+ (outputkey, mutations)
+ }
+ }.saveAsNewAPIHadoopFile("casDemo", classOf[ByteBuffer], classOf[List[Mutation]],
+ classOf[ColumnFamilyOutputFormat], job.getConfiguration)
+ }
+}
+
+/*
+create keyspace casDemo;
+use casDemo;
+
+create column family WordCount with comparator = UTF8Type;
+update column family WordCount with column_metadata =
+ [{column_name: word, validation_class: UTF8Type},
+ {column_name: wcount, validation_class: LongType}];
+
+create column family Words with comparator = UTF8Type;
+update column family Words with column_metadata =
+ [{column_name: book, validation_class: UTF8Type},
+ {column_name: para, validation_class: UTF8Type}];
+
+assume Words keys as utf8;
+
+set Words['3musk001']['book'] = 'The Three Musketeers';
+set Words['3musk001']['para'] = 'On the first Monday of the month of April, 1625, the market
+ town of Meung, in which the author of ROMANCE OF THE ROSE was born, appeared to
+ be in as perfect a state of revolution as if the Huguenots had just made
+ a second La Rochelle of it. Many citizens, seeing the women flying
+ toward the High Street, leaving their children crying at the open doors,
+ hastened to don the cuirass, and supporting their somewhat uncertain
+ courage with a musket or a partisan, directed their steps toward the
+ hostelry of the Jolly Miller, before which was gathered, increasing
+ every minute, a compact group, vociferous and full of curiosity.';
+
+set Words['3musk002']['book'] = 'The Three Musketeers';
+set Words['3musk002']['para'] = 'In those times panics were common, and few days passed without
+ some city or other registering in its archives an event of this kind. There were
+ nobles, who made war against each other; there was the king, who made
+ war against the cardinal; there was Spain, which made war against the
+ king. Then, in addition to these concealed or public, secret or open
+ wars, there were robbers, mendicants, Huguenots, wolves, and scoundrels,
+ who made war upon everybody. The citizens always took up arms readily
+ against thieves, wolves or scoundrels, often against nobles or
+ Huguenots, sometimes against the king, but never against cardinal or
+ Spain. It resulted, then, from this habit that on the said first Monday
+ of April, 1625, the citizens, on hearing the clamor, and seeing neither
+ the red-and-yellow standard nor the livery of the Duc de Richelieu,
+ rushed toward the hostel of the Jolly Miller. When arrived there, the
+ cause of the hubbub was apparent to all';
+
+set Words['3musk003']['book'] = 'The Three Musketeers';
+set Words['3musk003']['para'] = 'You ought, I say, then, to husband the means you have, however
+ large the sum may be; but you ought also to endeavor to perfect yourself in
+ the exercises becoming a gentleman. I will write a letter today to the
+ Director of the Royal Academy, and tomorrow he will admit you without
+ any expense to yourself. Do not refuse this little service. Our
+ best-born and richest gentlemen sometimes solicit it without being able
+ to obtain it. You will learn horsemanship, swordsmanship in all its
+ branches, and dancing. You will make some desirable acquaintances; and
+ from time to time you can call upon me, just to tell me how you are
+ getting on, and to say whether I can be of further service to you.';
+
+
+set Words['thelostworld001']['book'] = 'The Lost World';
+set Words['thelostworld001']['para'] = 'She sat with that proud, delicate profile of hers outlined
+ against the red curtain. How beautiful she was! And yet how aloof! We had been
+ friends, quite good friends; but never could I get beyond the same
+ comradeship which I might have established with one of my
+ fellow-reporters upon the Gazette,--perfectly frank, perfectly kindly,
+ and perfectly unsexual. My instincts are all against a woman being too
+ frank and at her ease with me. It is no compliment to a man. Where
+ the real sex feeling begins, timidity and distrust are its companions,
+ heritage from old wicked days when love and violence went often hand in
+ hand. The bent head, the averted eye, the faltering voice, the wincing
+ figure--these, and not the unshrinking gaze and frank reply, are the
+ true signals of passion. Even in my short life I had learned as much
+ as that--or had inherited it in that race memory which we call instinct.';
+
+set Words['thelostworld002']['book'] = 'The Lost World';
+set Words['thelostworld002']['para'] = 'I always liked McArdle, the crabbed, old, round-backed,
+ red-headed news editor, and I rather hoped that he liked me. Of course, Beaumont was
+ the real boss; but he lived in the rarefied atmosphere of some Olympian
+ height from which he could distinguish nothing smaller than an
+ international crisis or a split in the Cabinet. Sometimes we saw him
+ passing in lonely majesty to his inner sanctum, with his eyes staring
+ vaguely and his mind hovering over the Balkans or the Persian Gulf. He
+ was above and beyond us. But McArdle was his first lieutenant, and it
+ was he that we knew. The old man nodded as I entered the room, and he
+ pushed his spectacles far up on his bald forehead.';
+
+*/
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/examples/src/main/scala/org/apache/spark/examples/ExceptionHandlingTest.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/ExceptionHandlingTest.scala b/examples/src/main/scala/org/apache/spark/examples/ExceptionHandlingTest.scala
new file mode 100644
index 0000000..92eb96b
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/ExceptionHandlingTest.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples
+
+import org.apache.spark.SparkContext
+
+object ExceptionHandlingTest {
+ def main(args: Array[String]) {
+ if (args.length == 0) {
+ System.err.println("Usage: ExceptionHandlingTest <master>")
+ System.exit(1)
+ }
+
+ val sc = new SparkContext(args(0), "ExceptionHandlingTest",
+ System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
+ sc.parallelize(0 until sc.defaultParallelism).foreach { i =>
+ if (math.random > 0.75)
+ throw new Exception("Testing exception handling")
+ }
+
+ System.exit(0)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/examples/src/main/scala/org/apache/spark/examples/GroupByTest.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/GroupByTest.scala b/examples/src/main/scala/org/apache/spark/examples/GroupByTest.scala
new file mode 100644
index 0000000..42c2e0e
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/GroupByTest.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples
+
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkContext._
+import java.util.Random
+
+object GroupByTest {
+ def main(args: Array[String]) {
+ if (args.length == 0) {
+ System.err.println("Usage: GroupByTest <master> [numMappers] [numKVPairs] [KeySize] [numReducers]")
+ System.exit(1)
+ }
+
+ var numMappers = if (args.length > 1) args(1).toInt else 2
+ var numKVPairs = if (args.length > 2) args(2).toInt else 1000
+ var valSize = if (args.length > 3) args(3).toInt else 1000
+ var numReducers = if (args.length > 4) args(4).toInt else numMappers
+
+ val sc = new SparkContext(args(0), "GroupBy Test",
+ System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
+
+ val pairs1 = sc.parallelize(0 until numMappers, numMappers).flatMap { p =>
+ val ranGen = new Random
+ var arr1 = new Array[(Int, Array[Byte])](numKVPairs)
+ for (i <- 0 until numKVPairs) {
+ val byteArr = new Array[Byte](valSize)
+ ranGen.nextBytes(byteArr)
+ arr1(i) = (ranGen.nextInt(Int.MaxValue), byteArr)
+ }
+ arr1
+ }.cache
+ // Enforce that everything has been calculated and in cache
+ pairs1.count
+
+ println(pairs1.groupByKey(numReducers).count)
+
+ System.exit(0)
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/examples/src/main/scala/org/apache/spark/examples/HBaseTest.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/HBaseTest.scala b/examples/src/main/scala/org/apache/spark/examples/HBaseTest.scala
new file mode 100644
index 0000000..efe2e93
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/HBaseTest.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples
+
+import org.apache.spark._
+import org.apache.spark.rdd.NewHadoopRDD
+import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}
+import org.apache.hadoop.hbase.client.HBaseAdmin
+import org.apache.hadoop.hbase.mapreduce.TableInputFormat
+
+object HBaseTest {
+ def main(args: Array[String]) {
+ val sc = new SparkContext(args(0), "HBaseTest",
+ System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
+
+ val conf = HBaseConfiguration.create()
+
+ // Other options for configuring scan behavior are available. More information available at
+ // http://hbase.apache.org/apidocs/org/apache/hadoop/hbase/mapreduce/TableInputFormat.html
+ conf.set(TableInputFormat.INPUT_TABLE, args(1))
+
+ // Initialize hBase table if necessary
+ val admin = new HBaseAdmin(conf)
+ if(!admin.isTableAvailable(args(1))) {
+ val tableDesc = new HTableDescriptor(args(1))
+ admin.createTable(tableDesc)
+ }
+
+ val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
+ classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
+ classOf[org.apache.hadoop.hbase.client.Result])
+
+ hBaseRDD.count()
+
+ System.exit(0)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/examples/src/main/scala/org/apache/spark/examples/HdfsTest.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/HdfsTest.scala b/examples/src/main/scala/org/apache/spark/examples/HdfsTest.scala
new file mode 100644
index 0000000..d6a88d3
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/HdfsTest.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples
+
+import org.apache.spark._
+
+object HdfsTest {
+ def main(args: Array[String]) {
+ val sc = new SparkContext(args(0), "HdfsTest",
+ System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
+ val file = sc.textFile(args(1))
+ val mapped = file.map(s => s.length).cache()
+ for (iter <- 1 to 10) {
+ val start = System.currentTimeMillis()
+ for (x <- mapped) { x + 2 }
+ // println("Processing: " + x)
+ val end = System.currentTimeMillis()
+ println("Iteration " + iter + " took " + (end-start) + " ms")
+ }
+ System.exit(0)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/examples/src/main/scala/org/apache/spark/examples/LocalALS.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/LocalALS.scala b/examples/src/main/scala/org/apache/spark/examples/LocalALS.scala
new file mode 100644
index 0000000..4af45b2
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/LocalALS.scala
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples
+
+import scala.math.sqrt
+import cern.jet.math._
+import cern.colt.matrix._
+import cern.colt.matrix.linalg._
+
+/**
+ * Alternating least squares matrix factorization.
+ */
+object LocalALS {
+ // Parameters set through command line arguments
+ var M = 0 // Number of movies
+ var U = 0 // Number of users
+ var F = 0 // Number of features
+ var ITERATIONS = 0
+
+ val LAMBDA = 0.01 // Regularization coefficient
+
+ // Some COLT objects
+ val factory2D = DoubleFactory2D.dense
+ val factory1D = DoubleFactory1D.dense
+ val algebra = Algebra.DEFAULT
+ val blas = SeqBlas.seqBlas
+
+ def generateR(): DoubleMatrix2D = {
+ val mh = factory2D.random(M, F)
+ val uh = factory2D.random(U, F)
+ return algebra.mult(mh, algebra.transpose(uh))
+ }
+
+ def rmse(targetR: DoubleMatrix2D, ms: Array[DoubleMatrix1D],
+ us: Array[DoubleMatrix1D]): Double =
+ {
+ val r = factory2D.make(M, U)
+ for (i <- 0 until M; j <- 0 until U) {
+ r.set(i, j, blas.ddot(ms(i), us(j)))
+ }
+ //println("R: " + r)
+ blas.daxpy(-1, targetR, r)
+ val sumSqs = r.aggregate(Functions.plus, Functions.square)
+ return sqrt(sumSqs / (M * U))
+ }
+
+ def updateMovie(i: Int, m: DoubleMatrix1D, us: Array[DoubleMatrix1D],
+ R: DoubleMatrix2D) : DoubleMatrix1D =
+ {
+ val XtX = factory2D.make(F, F)
+ val Xty = factory1D.make(F)
+ // For each user that rated the movie
+ for (j <- 0 until U) {
+ val u = us(j)
+ // Add u * u^t to XtX
+ blas.dger(1, u, u, XtX)
+ // Add u * rating to Xty
+ blas.daxpy(R.get(i, j), u, Xty)
+ }
+ // Add regularization coefs to diagonal terms
+ for (d <- 0 until F) {
+ XtX.set(d, d, XtX.get(d, d) + LAMBDA * U)
+ }
+ // Solve it with Cholesky
+ val ch = new CholeskyDecomposition(XtX)
+ val Xty2D = factory2D.make(Xty.toArray, F)
+ val solved2D = ch.solve(Xty2D)
+ return solved2D.viewColumn(0)
+ }
+
+ def updateUser(j: Int, u: DoubleMatrix1D, ms: Array[DoubleMatrix1D],
+ R: DoubleMatrix2D) : DoubleMatrix1D =
+ {
+ val XtX = factory2D.make(F, F)
+ val Xty = factory1D.make(F)
+ // For each movie that the user rated
+ for (i <- 0 until M) {
+ val m = ms(i)
+ // Add m * m^t to XtX
+ blas.dger(1, m, m, XtX)
+ // Add m * rating to Xty
+ blas.daxpy(R.get(i, j), m, Xty)
+ }
+ // Add regularization coefs to diagonal terms
+ for (d <- 0 until F) {
+ XtX.set(d, d, XtX.get(d, d) + LAMBDA * M)
+ }
+ // Solve it with Cholesky
+ val ch = new CholeskyDecomposition(XtX)
+ val Xty2D = factory2D.make(Xty.toArray, F)
+ val solved2D = ch.solve(Xty2D)
+ return solved2D.viewColumn(0)
+ }
+
+ def main(args: Array[String]) {
+ args match {
+ case Array(m, u, f, iters) => {
+ M = m.toInt
+ U = u.toInt
+ F = f.toInt
+ ITERATIONS = iters.toInt
+ }
+ case _ => {
+ System.err.println("Usage: LocalALS <M> <U> <F> <iters>")
+ System.exit(1)
+ }
+ }
+ printf("Running with M=%d, U=%d, F=%d, iters=%d\n", M, U, F, ITERATIONS);
+
+ val R = generateR()
+
+ // Initialize m and u randomly
+ var ms = Array.fill(M)(factory1D.random(F))
+ var us = Array.fill(U)(factory1D.random(F))
+
+ // Iteratively update movies then users
+ for (iter <- 1 to ITERATIONS) {
+ println("Iteration " + iter + ":")
+ ms = (0 until M).map(i => updateMovie(i, ms(i), us, R)).toArray
+ us = (0 until U).map(j => updateUser(j, us(j), ms, R)).toArray
+ println("RMSE = " + rmse(R, ms, us))
+ println()
+ }
+ }
+}