You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2014/01/07 17:10:09 UTC
[1/4] git commit: Suggested small changes to Java code for slightly
more standard style, encapsulation and in some cases performance
Updated Branches:
refs/heads/master 468af0fa0 -> 15d953450
Suggested small changes to Java code for slightly more standard style, encapsulation and in some cases performance
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/66d50127
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/66d50127
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/66d50127
Branch: refs/heads/master
Commit: 66d501276b5a066bd9abaa4e284cfad557665948
Parents: 3713f81
Author: Sean Owen <so...@cloudera.com>
Authored: Thu Jan 2 16:17:57 2014 +0000
Committer: Sean Owen <so...@cloudera.com>
Committed: Thu Jan 2 16:17:57 2014 +0000
----------------------------------------------------------------------
.../apache/spark/network/netty/FileClient.java | 5 +-
.../netty/FileClientChannelInitializer.java | 2 +-
.../apache/spark/network/netty/FileServer.java | 8 +--
.../netty/FileServerChannelInitializer.java | 4 +-
.../spark/network/netty/FileServerHandler.java | 6 +--
.../spark/network/netty/PathResolver.java | 52 ++++++++++----------
.../org/apache/spark/examples/JavaHdfsLR.java | 32 +++++++-----
.../org/apache/spark/examples/JavaKMeans.java | 26 +++++++---
.../org/apache/spark/examples/JavaLogQuery.java | 23 +++++----
.../org/apache/spark/examples/JavaPageRank.java | 14 ++++--
.../org/apache/spark/examples/JavaSparkPi.java | 11 +++--
.../java/org/apache/spark/examples/JavaTC.java | 19 ++++---
.../apache/spark/examples/JavaWordCount.java | 13 ++++-
.../apache/spark/mllib/examples/JavaALS.java | 21 +++++---
.../apache/spark/mllib/examples/JavaKMeans.java | 19 ++++---
.../org/apache/spark/mllib/examples/JavaLR.java | 22 ++++++---
.../streaming/examples/JavaFlumeEventCount.java | 5 +-
.../streaming/examples/JavaKafkaWordCount.java | 16 ++++--
.../examples/JavaNetworkWordCount.java | 15 ++++--
.../streaming/examples/JavaQueueStream.java | 11 +++--
20 files changed, 203 insertions(+), 121 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/66d50127/core/src/main/java/org/apache/spark/network/netty/FileClient.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/network/netty/FileClient.java b/core/src/main/java/org/apache/spark/network/netty/FileClient.java
index 46d6150..d2d778b 100644
--- a/core/src/main/java/org/apache/spark/network/netty/FileClient.java
+++ b/core/src/main/java/org/apache/spark/network/netty/FileClient.java
@@ -31,7 +31,8 @@ import java.util.concurrent.TimeUnit;
class FileClient {
- private Logger LOG = LoggerFactory.getLogger(this.getClass().getName());
+ private static final Logger LOG = LoggerFactory.getLogger(FileClient.class.getName());
+
private final FileClientHandler handler;
private Channel channel = null;
private Bootstrap bootstrap = null;
@@ -39,7 +40,7 @@ class FileClient {
private final int connectTimeout;
private final int sendTimeout = 60; // 1 min
- public FileClient(FileClientHandler handler, int connectTimeout) {
+ FileClient(FileClientHandler handler, int connectTimeout) {
this.handler = handler;
this.connectTimeout = connectTimeout;
}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/66d50127/core/src/main/java/org/apache/spark/network/netty/FileClientChannelInitializer.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/network/netty/FileClientChannelInitializer.java b/core/src/main/java/org/apache/spark/network/netty/FileClientChannelInitializer.java
index fb61be1..264cf97 100644
--- a/core/src/main/java/org/apache/spark/network/netty/FileClientChannelInitializer.java
+++ b/core/src/main/java/org/apache/spark/network/netty/FileClientChannelInitializer.java
@@ -25,7 +25,7 @@ class FileClientChannelInitializer extends ChannelInitializer<SocketChannel> {
private final FileClientHandler fhandler;
- public FileClientChannelInitializer(FileClientHandler handler) {
+ FileClientChannelInitializer(FileClientHandler handler) {
fhandler = handler;
}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/66d50127/core/src/main/java/org/apache/spark/network/netty/FileServer.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/network/netty/FileServer.java b/core/src/main/java/org/apache/spark/network/netty/FileServer.java
index aea7534..c93425e 100644
--- a/core/src/main/java/org/apache/spark/network/netty/FileServer.java
+++ b/core/src/main/java/org/apache/spark/network/netty/FileServer.java
@@ -33,15 +33,14 @@ import org.slf4j.LoggerFactory;
*/
class FileServer {
- private Logger LOG = LoggerFactory.getLogger(this.getClass().getName());
+ private static final Logger LOG = LoggerFactory.getLogger(FileServer.class.getName());
private EventLoopGroup bossGroup = null;
private EventLoopGroup workerGroup = null;
private ChannelFuture channelFuture = null;
private int port = 0;
- private Thread blockingThread = null;
- public FileServer(PathResolver pResolver, int port) {
+ FileServer(PathResolver pResolver, int port) {
InetSocketAddress addr = new InetSocketAddress(port);
// Configure the server.
@@ -70,7 +69,8 @@ class FileServer {
* Start the file server asynchronously in a new thread.
*/
public void start() {
- blockingThread = new Thread() {
+ Thread blockingThread = new Thread() {
+ @Override
public void run() {
try {
channelFuture.channel().closeFuture().sync();
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/66d50127/core/src/main/java/org/apache/spark/network/netty/FileServerChannelInitializer.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/network/netty/FileServerChannelInitializer.java b/core/src/main/java/org/apache/spark/network/netty/FileServerChannelInitializer.java
index 3f15ff8..46efec8 100644
--- a/core/src/main/java/org/apache/spark/network/netty/FileServerChannelInitializer.java
+++ b/core/src/main/java/org/apache/spark/network/netty/FileServerChannelInitializer.java
@@ -25,9 +25,9 @@ import io.netty.handler.codec.string.StringDecoder;
class FileServerChannelInitializer extends ChannelInitializer<SocketChannel> {
- PathResolver pResolver;
+ private final PathResolver pResolver;
- public FileServerChannelInitializer(PathResolver pResolver) {
+ FileServerChannelInitializer(PathResolver pResolver) {
this.pResolver = pResolver;
}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/66d50127/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java b/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java
index e2d9391..3ac045f 100644
--- a/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java
+++ b/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java
@@ -31,11 +31,11 @@ import org.slf4j.LoggerFactory;
class FileServerHandler extends SimpleChannelInboundHandler<String> {
- private Logger LOG = LoggerFactory.getLogger(this.getClass().getName());
+ private static final Logger LOG = LoggerFactory.getLogger(FileServerHandler.class.getName());
private final PathResolver pResolver;
- public FileServerHandler(PathResolver pResolver){
+ FileServerHandler(PathResolver pResolver){
this.pResolver = pResolver;
}
@@ -61,7 +61,7 @@ class FileServerHandler extends SimpleChannelInboundHandler<String> {
ctx.flush();
return;
}
- int len = new Long(length).intValue();
+ int len = (int) length;
ctx.write((new FileHeader(len, blockId)).buffer());
try {
ctx.write(new DefaultFileRegion(new FileInputStream(file)
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/66d50127/core/src/main/java/org/apache/spark/network/netty/PathResolver.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/network/netty/PathResolver.java b/core/src/main/java/org/apache/spark/network/netty/PathResolver.java
index 9f7ced4..7ad8d03 100755
--- a/core/src/main/java/org/apache/spark/network/netty/PathResolver.java
+++ b/core/src/main/java/org/apache/spark/network/netty/PathResolver.java
@@ -1,26 +1,26 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.netty;
-
-import org.apache.spark.storage.BlockId;
-import org.apache.spark.storage.FileSegment;
-
-public interface PathResolver {
- /** Get the file segment in which the given block resides. */
- public FileSegment getBlockLocation(BlockId blockId);
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.netty;
+
+import org.apache.spark.storage.BlockId;
+import org.apache.spark.storage.FileSegment;
+
+public interface PathResolver {
+ /** Get the file segment in which the given block resides. */
+ FileSegment getBlockLocation(BlockId blockId);
+}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/66d50127/examples/src/main/java/org/apache/spark/examples/JavaHdfsLR.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/JavaHdfsLR.java b/examples/src/main/java/org/apache/spark/examples/JavaHdfsLR.java
index be0d385..9f0e341 100644
--- a/examples/src/main/java/org/apache/spark/examples/JavaHdfsLR.java
+++ b/examples/src/main/java/org/apache/spark/examples/JavaHdfsLR.java
@@ -24,19 +24,22 @@ import org.apache.spark.api.java.function.Function2;
import java.io.Serializable;
import java.util.Arrays;
-import java.util.StringTokenizer;
import java.util.Random;
+import java.util.regex.Pattern;
/**
* Logistic regression based classification.
*/
-public class JavaHdfsLR {
+public final class JavaHdfsLR {
- static int D = 10; // Number of dimensions
- static Random rand = new Random(42);
+ private static final int D = 10; // Number of dimensions
+ private static final Random rand = new Random(42);
+
+ private JavaHdfsLR() {
+ }
static class DataPoint implements Serializable {
- public DataPoint(double[] x, double y) {
+ DataPoint(double[] x, double y) {
this.x = x;
this.y = y;
}
@@ -46,20 +49,22 @@ public class JavaHdfsLR {
}
static class ParsePoint extends Function<String, DataPoint> {
+ private static final Pattern SPACE = Pattern.compile(" ");
+
+ @Override
public DataPoint call(String line) {
- StringTokenizer tok = new StringTokenizer(line, " ");
- double y = Double.parseDouble(tok.nextToken());
+ String[] tok = SPACE.split(line);
+ double y = Double.parseDouble(tok[0]);
double[] x = new double[D];
- int i = 0;
- while (i < D) {
- x[i] = Double.parseDouble(tok.nextToken());
- i += 1;
+ for (int i = 0; i < D; i++) {
+ x[i] = Double.parseDouble(tok[i+1]);
}
return new DataPoint(x, y);
}
}
static class VectorSum extends Function2<double[], double[], double[]> {
+ @Override
public double[] call(double[] a, double[] b) {
double[] result = new double[D];
for (int j = 0; j < D; j++) {
@@ -70,12 +75,13 @@ public class JavaHdfsLR {
}
static class ComputeGradient extends Function<DataPoint, double[]> {
- double[] weights;
+ private final double[] weights;
- public ComputeGradient(double[] weights) {
+ ComputeGradient(double[] weights) {
this.weights = weights;
}
+ @Override
public double[] call(DataPoint p) {
double[] gradient = new double[D];
for (int i = 0; i < D; i++) {
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/66d50127/examples/src/main/java/org/apache/spark/examples/JavaKMeans.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/JavaKMeans.java b/examples/src/main/java/org/apache/spark/examples/JavaKMeans.java
index 5a6afe7..1671d0c 100644
--- a/examples/src/main/java/org/apache/spark/examples/JavaKMeans.java
+++ b/examples/src/main/java/org/apache/spark/examples/JavaKMeans.java
@@ -27,19 +27,27 @@ import org.apache.spark.util.Vector;
import java.util.List;
import java.util.Map;
+import java.util.regex.Pattern;
/**
* K-means clustering using Java API.
*/
-public class JavaKMeans {
+public final class JavaKMeans {
+
+ private static final Pattern SPACE = Pattern.compile(" ");
+
+ private JavaKMeans() {
+ }
/** Parses numbers split by whitespace to a vector */
static Vector parseVector(String line) {
- String[] splits = line.split(" ");
+ String[] splits = SPACE.split(line);
double[] data = new double[splits.length];
int i = 0;
- for (String s : splits)
- data[i] = Double.parseDouble(splits[i++]);
+ for (String s : splits) {
+ data[i] = Double.parseDouble(s);
+ i++;
+ }
return new Vector(data);
}
@@ -82,7 +90,7 @@ public class JavaKMeans {
JavaRDD<Vector> data = sc.textFile(path).map(
new Function<String, Vector>() {
@Override
- public Vector call(String line) throws Exception {
+ public Vector call(String line) {
return parseVector(line);
}
}
@@ -96,7 +104,7 @@ public class JavaKMeans {
JavaPairRDD<Integer, Vector> closest = data.map(
new PairFunction<Vector, Integer, Vector>() {
@Override
- public Tuple2<Integer, Vector> call(Vector vector) throws Exception {
+ public Tuple2<Integer, Vector> call(Vector vector) {
return new Tuple2<Integer, Vector>(
closestPoint(vector, centroids), vector);
}
@@ -107,7 +115,8 @@ public class JavaKMeans {
JavaPairRDD<Integer, List<Vector>> pointsGroup = closest.groupByKey();
Map<Integer, Vector> newCentroids = pointsGroup.mapValues(
new Function<List<Vector>, Vector>() {
- public Vector call(List<Vector> ps) throws Exception {
+ @Override
+ public Vector call(List<Vector> ps) {
return average(ps);
}
}).collectAsMap();
@@ -122,8 +131,9 @@ public class JavaKMeans {
} while (tempDist > convergeDist);
System.out.println("Final centers:");
- for (Vector c : centroids)
+ for (Vector c : centroids) {
System.out.println(c);
+ }
System.exit(0);
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/66d50127/examples/src/main/java/org/apache/spark/examples/JavaLogQuery.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/JavaLogQuery.java b/examples/src/main/java/org/apache/spark/examples/JavaLogQuery.java
index 407cd7c..1ce53fe 100644
--- a/examples/src/main/java/org/apache/spark/examples/JavaLogQuery.java
+++ b/examples/src/main/java/org/apache/spark/examples/JavaLogQuery.java
@@ -35,9 +35,9 @@ import java.util.regex.Pattern;
/**
* Executes a roll up-style query against Apache logs.
*/
-public class JavaLogQuery {
+public final class JavaLogQuery {
- public static List<String> exampleApacheLogs = Lists.newArrayList(
+ public static final List<String> exampleApacheLogs = Lists.newArrayList(
"10.10.10.10 - \"FRED\" [18/Jan/2013:17:56:07 +1100] \"GET http://images.com/2013/Generic.jpg " +
"HTTP/1.1\" 304 315 \"http://referall.com/\" \"Mozilla/4.0 (compatible; MSIE 7.0; " +
"Windows NT 5.1; GTB7.4; .NET CLR 2.0.50727; .NET CLR 3.0.04506.30; .NET CLR 3.0.04506.648; " +
@@ -51,14 +51,17 @@ public class JavaLogQuery {
"3.5.30729; Release=ARP)\" \"UD-1\" - \"image/jpeg\" \"whatever\" 0.352 \"-\" - \"\" 256 977 988 \"\" " +
"0 73.23.2.15 images.com 1358492557 - Whatup");
- public static Pattern apacheLogRegex = Pattern.compile(
+ public static final Pattern apacheLogRegex = Pattern.compile(
"^([\\d.]+) (\\S+) (\\S+) \\[([\\w\\d:/]+\\s[+\\-]\\d{4})\\] \"(.+?)\" (\\d{3}) ([\\d\\-]+) \"([^\"]+)\" \"([^\"]+)\".*");
+ private JavaLogQuery() {
+ }
+
/** Tracks the total query count and number of aggregate bytes for a particular group. */
public static class Stats implements Serializable {
- private int count;
- private int numBytes;
+ private final int count;
+ private final int numBytes;
public Stats(int count, int numBytes) {
this.count = count;
@@ -92,12 +95,12 @@ public class JavaLogQuery {
if (m.find()) {
int bytes = Integer.parseInt(m.group(7));
return new Stats(1, bytes);
- }
- else
+ } else {
return new Stats(1, 0);
+ }
}
- public static void main(String[] args) throws Exception {
+ public static void main(String[] args) {
if (args.length == 0) {
System.err.println("Usage: JavaLogQuery <master> [logFile]");
System.exit(1);
@@ -110,14 +113,14 @@ public class JavaLogQuery {
JavaPairRDD<Tuple3<String, String, String>, Stats> extracted = dataSet.map(new PairFunction<String, Tuple3<String, String, String>, Stats>() {
@Override
- public Tuple2<Tuple3<String, String, String>, Stats> call(String s) throws Exception {
+ public Tuple2<Tuple3<String, String, String>, Stats> call(String s) {
return new Tuple2<Tuple3<String, String, String>, Stats>(extractKey(s), extractStats(s));
}
});
JavaPairRDD<Tuple3<String, String, String>, Stats> counts = extracted.reduceByKey(new Function2<Stats, Stats, Stats>() {
@Override
- public Stats call(Stats stats, Stats stats2) throws Exception {
+ public Stats call(Stats stats, Stats stats2) {
return stats.merge(stats2);
}
});
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/66d50127/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java b/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java
index 89aed8f..447ba93 100644
--- a/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java
+++ b/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java
@@ -28,6 +28,7 @@ import org.apache.spark.api.java.function.PairFunction;
import java.util.List;
import java.util.ArrayList;
+import java.util.regex.Pattern;
/**
* Computes the PageRank of URLs from an input file. Input file should
@@ -38,7 +39,12 @@ import java.util.ArrayList;
* ...
* where URL and their neighbors are separated by space(s).
*/
-public class JavaPageRank {
+public final class JavaPageRank {
+ private static final Pattern SPACES = Pattern.compile("\\s+");
+
+ private JavaPageRank() {
+ }
+
private static class Sum extends Function2<Double, Double, Double> {
@Override
public Double call(Double a, Double b) {
@@ -66,7 +72,7 @@ public class JavaPageRank {
JavaPairRDD<String, List<String>> links = lines.map(new PairFunction<String, String, String>() {
@Override
public Tuple2<String, String> call(String s) {
- String[] parts = s.split("\\s+");
+ String[] parts = SPACES.split(s);
return new Tuple2<String, String>(parts[0], parts[1]);
}
}).distinct().groupByKey().cache();
@@ -74,7 +80,7 @@ public class JavaPageRank {
// Loads all URLs with other URL(s) link to from input file and initialize ranks of them to one.
JavaPairRDD<String, Double> ranks = links.mapValues(new Function<List<String>, Double>() {
@Override
- public Double call(List<String> rs) throws Exception {
+ public Double call(List<String> rs) {
return 1.0;
}
});
@@ -97,7 +103,7 @@ public class JavaPageRank {
// Re-calculates URL ranks based on neighbor contributions.
ranks = contribs.reduceByKey(new Sum()).mapValues(new Function<Double, Double>() {
@Override
- public Double call(Double sum) throws Exception {
+ public Double call(Double sum) {
return 0.15 + sum * 0.85;
}
});
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/66d50127/examples/src/main/java/org/apache/spark/examples/JavaSparkPi.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/JavaSparkPi.java b/examples/src/main/java/org/apache/spark/examples/JavaSparkPi.java
index 4a2380c..d2a2a1d 100644
--- a/examples/src/main/java/org/apache/spark/examples/JavaSparkPi.java
+++ b/examples/src/main/java/org/apache/spark/examples/JavaSparkPi.java
@@ -26,8 +26,10 @@ import java.util.ArrayList;
import java.util.List;
/** Computes an approximation to pi */
-public class JavaSparkPi {
+public final class JavaSparkPi {
+ private JavaSparkPi() {
+ }
public static void main(String[] args) throws Exception {
if (args.length == 0) {
@@ -41,21 +43,22 @@ public class JavaSparkPi {
int slices = (args.length == 2) ? Integer.parseInt(args[1]) : 2;
int n = 100000 * slices;
List<Integer> l = new ArrayList<Integer>(n);
- for (int i = 0; i < n; i++)
+ for (int i = 0; i < n; i++) {
l.add(i);
+ }
JavaRDD<Integer> dataSet = jsc.parallelize(l, slices);
int count = dataSet.map(new Function<Integer, Integer>() {
@Override
- public Integer call(Integer integer) throws Exception {
+ public Integer call(Integer integer) {
double x = Math.random() * 2 - 1;
double y = Math.random() * 2 - 1;
return (x * x + y * y < 1) ? 1 : 0;
}
}).reduce(new Function2<Integer, Integer, Integer>() {
@Override
- public Integer call(Integer integer, Integer integer2) throws Exception {
+ public Integer call(Integer integer, Integer integer2) {
return integer + integer2;
}
});
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/66d50127/examples/src/main/java/org/apache/spark/examples/JavaTC.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/JavaTC.java b/examples/src/main/java/org/apache/spark/examples/JavaTC.java
index 17f21f6..e61b9c4 100644
--- a/examples/src/main/java/org/apache/spark/examples/JavaTC.java
+++ b/examples/src/main/java/org/apache/spark/examples/JavaTC.java
@@ -31,11 +31,14 @@ import java.util.Set;
/**
* Transitive closure on a graph, implemented in Java.
*/
-public class JavaTC {
+public final class JavaTC {
- static int numEdges = 200;
- static int numVertices = 100;
- static Random rand = new Random(42);
+ private static final int numEdges = 200;
+ private static final int numVertices = 100;
+ private static final Random rand = new Random(42);
+
+ private JavaTC() {
+ }
static List<Tuple2<Integer, Integer>> generateGraph() {
Set<Tuple2<Integer, Integer>> edges = new HashSet<Tuple2<Integer, Integer>>(numEdges);
@@ -43,15 +46,18 @@ public class JavaTC {
int from = rand.nextInt(numVertices);
int to = rand.nextInt(numVertices);
Tuple2<Integer, Integer> e = new Tuple2<Integer, Integer>(from, to);
- if (from != to) edges.add(e);
+ if (from != to) {
+ edges.add(e);
+ }
}
return new ArrayList<Tuple2<Integer, Integer>>(edges);
}
static class ProjectFn extends PairFunction<Tuple2<Integer, Tuple2<Integer, Integer>>,
Integer, Integer> {
- static ProjectFn INSTANCE = new ProjectFn();
+ static final ProjectFn INSTANCE = new ProjectFn();
+ @Override
public Tuple2<Integer, Integer> call(Tuple2<Integer, Tuple2<Integer, Integer>> triple) {
return new Tuple2<Integer, Integer>(triple._2()._2(), triple._2()._1());
}
@@ -76,6 +82,7 @@ public class JavaTC {
// Because join() joins on keys, the edges are stored in reversed order.
JavaPairRDD<Integer, Integer> edges = tc.map(
new PairFunction<Tuple2<Integer, Integer>, Integer, Integer>() {
+ @Override
public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> e) {
return new Tuple2<Integer, Integer>(e._2(), e._1());
}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/66d50127/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java b/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java
index bd6383e..ed4e9b4 100644
--- a/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java
+++ b/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java
@@ -27,8 +27,14 @@ import org.apache.spark.api.java.function.PairFunction;
import java.util.Arrays;
import java.util.List;
+import java.util.regex.Pattern;
+
+public final class JavaWordCount {
+ private static final Pattern SPACE = Pattern.compile(" ");
+
+ private JavaWordCount() {
+ }
-public class JavaWordCount {
public static void main(String[] args) throws Exception {
if (args.length < 2) {
System.err.println("Usage: JavaWordCount <master> <file>");
@@ -40,18 +46,21 @@ public class JavaWordCount {
JavaRDD<String> lines = ctx.textFile(args[1], 1);
JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
+ @Override
public Iterable<String> call(String s) {
- return Arrays.asList(s.split(" "));
+ return Arrays.asList(SPACE.split(s));
}
});
JavaPairRDD<String, Integer> ones = words.map(new PairFunction<String, String, Integer>() {
+ @Override
public Tuple2<String, Integer> call(String s) {
return new Tuple2<String, Integer>(s, 1);
}
});
JavaPairRDD<String, Integer> counts = ones.reduceByKey(new Function2<Integer, Integer, Integer>() {
+ @Override
public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/66d50127/examples/src/main/java/org/apache/spark/mllib/examples/JavaALS.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/mllib/examples/JavaALS.java b/examples/src/main/java/org/apache/spark/mllib/examples/JavaALS.java
index 45a0d23..b33e648 100644
--- a/examples/src/main/java/org/apache/spark/mllib/examples/JavaALS.java
+++ b/examples/src/main/java/org/apache/spark/mllib/examples/JavaALS.java
@@ -26,28 +26,35 @@ import org.apache.spark.mllib.recommendation.MatrixFactorizationModel;
import org.apache.spark.mllib.recommendation.Rating;
import java.util.Arrays;
-import java.util.StringTokenizer;
+import java.util.regex.Pattern;
import scala.Tuple2;
/**
* Example using MLLib ALS from Java.
*/
-public class JavaALS {
+public final class JavaALS {
+
+ private JavaALS() {
+ }
static class ParseRating extends Function<String, Rating> {
+ private static final Pattern COMMA = Pattern.compile(",");
+
+ @Override
public Rating call(String line) {
- StringTokenizer tok = new StringTokenizer(line, ",");
- int x = Integer.parseInt(tok.nextToken());
- int y = Integer.parseInt(tok.nextToken());
- double rating = Double.parseDouble(tok.nextToken());
+ String[] tok = COMMA.split(line);
+ int x = Integer.parseInt(tok[0]);
+ int y = Integer.parseInt(tok[1]);
+ double rating = Double.parseDouble(tok[2]);
return new Rating(x, y, rating);
}
}
static class FeaturesToString extends Function<Tuple2<Object, double[]>, String> {
+ @Override
public String call(Tuple2<Object, double[]> element) {
- return element._1().toString() + "," + Arrays.toString(element._2());
+ return element._1() + "," + Arrays.toString(element._2());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/66d50127/examples/src/main/java/org/apache/spark/mllib/examples/JavaKMeans.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/mllib/examples/JavaKMeans.java b/examples/src/main/java/org/apache/spark/mllib/examples/JavaKMeans.java
index cd59a13..a9db04d 100644
--- a/examples/src/main/java/org/apache/spark/mllib/examples/JavaKMeans.java
+++ b/examples/src/main/java/org/apache/spark/mllib/examples/JavaKMeans.java
@@ -25,20 +25,25 @@ import org.apache.spark.mllib.clustering.KMeans;
import org.apache.spark.mllib.clustering.KMeansModel;
import java.util.Arrays;
-import java.util.StringTokenizer;
+import java.util.regex.Pattern;
/**
* Example using MLLib KMeans from Java.
*/
-public class JavaKMeans {
+public final class JavaKMeans {
+
+ private JavaKMeans() {
+ }
static class ParsePoint extends Function<String, double[]> {
+ private static final Pattern SPACE = Pattern.compile(" ");
+
+ @Override
public double[] call(String line) {
- StringTokenizer tok = new StringTokenizer(line, " ");
- int numTokens = tok.countTokens();
- double[] point = new double[numTokens];
- for (int i = 0; i < numTokens; ++i) {
- point[i] = Double.parseDouble(tok.nextToken());
+ String[] tok = SPACE.split(line);
+ double[] point = new double[tok.length];
+ for (int i = 0; i < tok.length; ++i) {
+ point[i] = Double.parseDouble(tok[i]);
}
return point;
}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/66d50127/examples/src/main/java/org/apache/spark/mllib/examples/JavaLR.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/mllib/examples/JavaLR.java b/examples/src/main/java/org/apache/spark/mllib/examples/JavaLR.java
index 258061c..5634131 100644
--- a/examples/src/main/java/org/apache/spark/mllib/examples/JavaLR.java
+++ b/examples/src/main/java/org/apache/spark/mllib/examples/JavaLR.java
@@ -27,22 +27,28 @@ import org.apache.spark.mllib.classification.LogisticRegressionModel;
import org.apache.spark.mllib.regression.LabeledPoint;
import java.util.Arrays;
-import java.util.StringTokenizer;
+import java.util.regex.Pattern;
/**
* Logistic regression based classification using ML Lib.
*/
-public class JavaLR {
+public final class JavaLR {
+
+ private JavaLR() {
+ }
static class ParsePoint extends Function<String, LabeledPoint> {
+ private static final Pattern COMMA = Pattern.compile(",");
+ private static final Pattern SPACE = Pattern.compile(" ");
+
+ @Override
public LabeledPoint call(String line) {
- String[] parts = line.split(",");
+ String[] parts = COMMA.split(line);
double y = Double.parseDouble(parts[0]);
- StringTokenizer tok = new StringTokenizer(parts[1], " ");
- int numTokens = tok.countTokens();
- double[] x = new double[numTokens];
- for (int i = 0; i < numTokens; ++i) {
- x[i] = Double.parseDouble(tok.nextToken());
+ String[] tok = SPACE.split(parts[1]);
+ double[] x = new double[tok.length];
+ for (int i = 0; i < tok.length; ++i) {
+ x[i] = Double.parseDouble(tok[i]);
}
return new LabeledPoint(y, x);
}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/66d50127/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java
index 261813b..bd0bbb5 100644
--- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java
+++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java
@@ -36,7 +36,10 @@ import org.apache.spark.streaming.dstream.SparkFlumeEvent;
* creates a server and listens for flume events.
* <port> is the port the Flume receiver will listen on.
*/
-public class JavaFlumeEventCount {
+public final class JavaFlumeEventCount {
+ private JavaFlumeEventCount() {
+ }
+
public static void main(String[] args) {
if (args.length != 3) {
System.err.println("Usage: JavaFlumeEventCount <master> <host> <port>");
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/66d50127/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java
index 22994fb..17eb871 100644
--- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java
+++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java
@@ -19,6 +19,7 @@ package org.apache.spark.streaming.examples;
import java.util.Map;
import java.util.HashMap;
+import java.util.regex.Pattern;
import com.google.common.collect.Lists;
import org.apache.spark.api.java.function.FlatMapFunction;
@@ -45,7 +46,12 @@ import scala.Tuple2;
* zoo03 my-consumer-group topic1,topic2 1`
*/
-public class JavaKafkaWordCount {
+public final class JavaKafkaWordCount {
+ private static final Pattern SPACE = Pattern.compile(" ");
+
+ private JavaKafkaWordCount() {
+ }
+
public static void main(String[] args) {
if (args.length < 5) {
System.err.println("Usage: KafkaWordCount <master> <zkQuorum> <group> <topics> <numThreads>");
@@ -67,7 +73,7 @@ public class JavaKafkaWordCount {
JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {
@Override
- public String call(Tuple2<String, String> tuple2) throws Exception {
+ public String call(Tuple2<String, String> tuple2) {
return tuple2._2();
}
});
@@ -75,19 +81,19 @@ public class JavaKafkaWordCount {
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterable<String> call(String x) {
- return Lists.newArrayList(x.split(" "));
+ return Lists.newArrayList(SPACE.split(x));
}
});
JavaPairDStream<String, Integer> wordCounts = words.map(
new PairFunction<String, String, Integer>() {
@Override
- public Tuple2<String, Integer> call(String s) throws Exception {
+ public Tuple2<String, Integer> call(String s) {
return new Tuple2<String, Integer>(s, 1);
}
}).reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
- public Integer call(Integer i1, Integer i2) throws Exception {
+ public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
});
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/66d50127/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java
index def87c1..fb090cc 100644
--- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java
+++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java
@@ -27,6 +27,8 @@ import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
+import java.util.regex.Pattern;
+
/**
* Counts words in UTF8 encoded, '\n' delimited text received from the network every second.
* Usage: NetworkWordCount <master> <hostname> <port>
@@ -38,7 +40,12 @@ import org.apache.spark.streaming.api.java.JavaStreamingContext;
* and then run the example
* `$ ./run spark.streaming.examples.JavaNetworkWordCount local[2] localhost 9999`
*/
-public class JavaNetworkWordCount {
+public final class JavaNetworkWordCount {
+ private static final Pattern SPACE = Pattern.compile(" ");
+
+ private JavaNetworkWordCount() {
+ }
+
public static void main(String[] args) {
if (args.length < 3) {
System.err.println("Usage: NetworkWordCount <master> <hostname> <port>\n" +
@@ -56,18 +63,18 @@ public class JavaNetworkWordCount {
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterable<String> call(String x) {
- return Lists.newArrayList(x.split(" "));
+ return Lists.newArrayList(SPACE.split(x));
}
});
JavaPairDStream<String, Integer> wordCounts = words.map(
new PairFunction<String, String, Integer>() {
@Override
- public Tuple2<String, Integer> call(String s) throws Exception {
+ public Tuple2<String, Integer> call(String s) {
return new Tuple2<String, Integer>(s, 1);
}
}).reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
- public Integer call(Integer i1, Integer i2) throws Exception {
+ public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
});
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/66d50127/examples/src/main/java/org/apache/spark/streaming/examples/JavaQueueStream.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaQueueStream.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaQueueStream.java
index c8c7389..6be9672 100644
--- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaQueueStream.java
+++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaQueueStream.java
@@ -31,8 +31,11 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
-public class JavaQueueStream {
- public static void main(String[] args) throws InterruptedException {
+public final class JavaQueueStream {
+ private JavaQueueStream() {
+ }
+
+ public static void main(String[] args) throws Exception {
if (args.length < 1) {
System.err.println("Usage: JavaQueueStream <master>");
System.exit(1);
@@ -62,14 +65,14 @@ public class JavaQueueStream {
JavaPairDStream<Integer, Integer> mappedStream = inputStream.map(
new PairFunction<Integer, Integer, Integer>() {
@Override
- public Tuple2<Integer, Integer> call(Integer i) throws Exception {
+ public Tuple2<Integer, Integer> call(Integer i) {
return new Tuple2<Integer, Integer>(i % 10, 1);
}
});
JavaPairDStream<Integer, Integer> reducedStream = mappedStream.reduceByKey(
new Function2<Integer, Integer, Integer>() {
@Override
- public Integer call(Integer i1, Integer i2) throws Exception {
+ public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
});
[2/4] git commit: Merge remote-tracking branch 'upstream/master'
Posted by rx...@apache.org.
Merge remote-tracking branch 'upstream/master'
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/7379b291
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/7379b291
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/7379b291
Branch: refs/heads/master
Commit: 7379b2915f5b75476932e2098b83050f7726829e
Parents: 66d5012 a2e7e04
Author: Sean Owen <so...@cloudera.com>
Authored: Mon Jan 6 15:13:16 2014 +0000
Committer: Sean Owen <so...@cloudera.com>
Committed: Mon Jan 6 15:13:16 2014 +0000
----------------------------------------------------------------------
.gitignore | 2 +
README.md | 28 +-
assembly/lib/PY4J_LICENSE.txt | 27 -
assembly/lib/PY4J_VERSION.txt | 1 -
assembly/lib/net/sf/py4j/py4j/0.7/py4j-0.7.jar | Bin 103286 -> 0 bytes
assembly/lib/net/sf/py4j/py4j/0.7/py4j-0.7.pom | 9 -
.../net/sf/py4j/py4j/maven-metadata-local.xml | 12 -
assembly/pom.xml | 14 +-
assembly/src/main/assembly/assembly.xml | 11 +-
bin/compute-classpath.cmd | 2 +-
bin/compute-classpath.sh | 2 +-
bin/pyspark | 70 ++
bin/pyspark.cmd | 23 +
bin/pyspark2.cmd | 55 +
bin/run-example | 91 ++
bin/run-example.cmd | 23 +
bin/run-example2.cmd | 61 ++
bin/slaves.sh | 91 --
bin/spark-class | 154 +++
bin/spark-class.cmd | 23 +
bin/spark-class2.cmd | 85 ++
bin/spark-config.sh | 36 -
bin/spark-daemon.sh | 183 ----
bin/spark-daemons.sh | 35 -
bin/spark-shell | 102 ++
bin/spark-shell.cmd | 23 +
bin/start-all.sh | 34 -
bin/start-master.sh | 52 -
bin/start-slave.sh | 35 -
bin/start-slaves.sh | 48 -
bin/stop-all.sh | 32 -
bin/stop-master.sh | 27 -
bin/stop-slaves.sh | 35 -
.../scala/org/apache/spark/SparkContext.scala | 34 +-
.../spark/api/java/JavaSparkContext.scala | 20 +
.../org/apache/spark/deploy/client/Client.scala | 12 +-
.../org/apache/spark/executor/Executor.scala | 24 +-
.../spark/rdd/PartitionerAwareUnionRDD.scala | 110 ++
.../main/scala/org/apache/spark/rdd/RDD.scala | 2 +-
.../apache/spark/rdd/RDDCheckpointData.scala | 2 +-
.../spark/scheduler/TaskSchedulerImpl.scala | 5 +-
.../apache/spark/scheduler/TaskSetManager.scala | 5 -
.../mesos/CoarseMesosSchedulerBackend.scala | 4 +-
.../cluster/mesos/MesosSchedulerBackend.scala | 4 +-
.../apache/spark/serializer/Serializer.scala | 3 +
.../spark/serializer/SerializerManager.scala | 15 +-
.../apache/spark/ui/UIWorkloadGenerator.scala | 4 +-
.../scala/org/apache/spark/util/AkkaUtils.scala | 16 +-
.../test/resources/uncommons-maths-1.2.2.jar | Bin 49019 -> 0 bytes
.../org/apache/spark/CheckpointSuite.scala | 361 ++++---
.../scala/org/apache/spark/DriverSuite.scala | 8 +-
.../org/apache/spark/FileServerSuite.scala | 108 +-
.../deploy/worker/ExecutorRunnerTest.scala | 4 +-
.../scala/org/apache/spark/rdd/RDDSuite.scala | 27 +
data/kmeans_data.txt | 6 +
data/lr_data.txt | 1000 ++++++++++++++++++
data/pagerank_data.txt | 6 +
docs/README.md | 4 +-
docs/_plugins/copy_api_dirs.rb | 4 +-
docs/api.md | 2 +-
docs/bagel-programming-guide.md | 4 +-
docs/building-with-maven.md | 14 +-
docs/hadoop-third-party-distributions.md | 2 +-
docs/index.md | 16 +-
docs/java-programming-guide.md | 4 +-
docs/mllib-guide.md | 2 +-
docs/python-programming-guide.md | 30 +-
docs/quick-start.md | 10 +-
docs/running-on-yarn.md | 17 +-
docs/scala-programming-guide.md | 16 +-
docs/spark-debugger.md | 2 +-
docs/spark-standalone.md | 20 +-
docs/streaming-programming-guide.md | 4 +-
ec2/spark_ec2.py | 2 +-
.../org/apache/spark/examples/JavaHdfsLR.java | 2 +-
.../org/apache/spark/examples/JavaKMeans.java | 2 +-
.../org/apache/spark/examples/JavaLogQuery.java | 2 +-
.../org/apache/spark/examples/JavaPageRank.java | 3 +-
.../org/apache/spark/examples/JavaSparkPi.java | 2 +-
.../java/org/apache/spark/examples/JavaTC.java | 2 +-
.../apache/spark/examples/JavaWordCount.java | 2 +-
.../apache/spark/mllib/examples/JavaALS.java | 2 +-
.../apache/spark/mllib/examples/JavaKMeans.java | 2 +-
.../org/apache/spark/mllib/examples/JavaLR.java | 2 +-
.../streaming/examples/JavaFlumeEventCount.java | 3 +-
.../streaming/examples/JavaKafkaWordCount.java | 5 +-
.../examples/JavaNetworkWordCount.java | 3 +-
.../streaming/examples/JavaQueueStream.java | 2 +-
.../apache/spark/examples/BroadcastTest.scala | 2 +-
.../spark/examples/ExceptionHandlingTest.scala | 2 +-
.../org/apache/spark/examples/GroupByTest.scala | 2 +-
.../org/apache/spark/examples/HBaseTest.scala | 2 +-
.../org/apache/spark/examples/HdfsTest.scala | 2 +-
.../org/apache/spark/examples/LogQuery.scala | 2 +-
.../spark/examples/MultiBroadcastTest.scala | 2 +-
.../examples/SimpleSkewedGroupByTest.scala | 2 +-
.../spark/examples/SkewedGroupByTest.scala | 2 +-
.../org/apache/spark/examples/SparkALS.scala | 2 +-
.../org/apache/spark/examples/SparkHdfsLR.scala | 2 +-
.../org/apache/spark/examples/SparkKMeans.scala | 2 +-
.../org/apache/spark/examples/SparkLR.scala | 2 +-
.../apache/spark/examples/SparkPageRank.scala | 2 +-
.../org/apache/spark/examples/SparkPi.scala | 2 +-
.../org/apache/spark/examples/SparkTC.scala | 2 +-
.../streaming/examples/ActorWordCount.scala | 6 +-
.../streaming/examples/FlumeEventCount.scala | 2 +-
.../streaming/examples/HdfsWordCount.scala | 4 +-
.../streaming/examples/KafkaWordCount.scala | 4 +-
.../streaming/examples/MQTTWordCount.scala | 6 +-
.../streaming/examples/NetworkWordCount.scala | 4 +-
.../spark/streaming/examples/QueueStream.scala | 2 +-
.../streaming/examples/RawNetworkGrep.scala | 2 +-
.../examples/StatefulNetworkWordCount.scala | 4 +-
.../streaming/examples/TwitterAlgebirdCMS.scala | 2 +-
.../streaming/examples/TwitterAlgebirdHLL.scala | 2 +-
.../streaming/examples/TwitterPopularTags.scala | 2 +-
.../streaming/examples/ZeroMQWordCount.scala | 6 +-
.../clickstream/PageViewGenerator.scala | 4 +-
.../examples/clickstream/PageViewStream.scala | 6 +-
kmeans_data.txt | 6 -
lr_data.txt | 1000 ------------------
make-distribution.sh | 24 +-
.../spark/mllib/classification/NaiveBayes.scala | 119 +++
.../mllib/classification/NaiveBayesSuite.scala | 108 ++
new-yarn/pom.xml | 161 ---
.../spark/deploy/yarn/ApplicationMaster.scala | 446 --------
.../yarn/ApplicationMasterArguments.scala | 94 --
.../org/apache/spark/deploy/yarn/Client.scala | 521 ---------
.../spark/deploy/yarn/ClientArguments.scala | 149 ---
.../yarn/ClientDistributedCacheManager.scala | 228 ----
.../spark/deploy/yarn/WorkerLauncher.scala | 222 ----
.../spark/deploy/yarn/WorkerRunnable.scala | 209 ----
.../deploy/yarn/YarnAllocationHandler.scala | 687 ------------
.../spark/deploy/yarn/YarnSparkHadoopUtil.scala | 43 -
.../cluster/YarnClientClusterScheduler.scala | 48 -
.../cluster/YarnClientSchedulerBackend.scala | 110 --
.../cluster/YarnClusterScheduler.scala | 56 -
.../ClientDistributedCacheManagerSuite.scala | 220 ----
pagerank_data.txt | 6 -
pom.xml | 59 +-
project/SparkBuild.scala | 48 +-
pyspark | 70 --
pyspark.cmd | 23 -
pyspark2.cmd | 55 -
python/lib/py4j-0.8.1-src.zip | Bin 0 -> 37662 bytes
python/lib/py4j0.7.egg | Bin 191756 -> 0 bytes
python/pyspark/__init__.py | 2 +-
python/pyspark/java_gateway.py | 2 +-
python/pyspark/rdd.py | 66 +-
python/pyspark/shell.py | 2 +-
python/run-tests | 2 +-
repl-bin/src/deb/bin/run | 3 +-
repl/pom.xml | 1 -
.../org/apache/spark/repl/SparkILoop.scala | 4 +-
run-example | 91 --
run-example.cmd | 23 -
run-example2.cmd | 61 --
sbin/slaves.sh | 91 ++
sbin/spark-config.sh | 36 +
sbin/spark-daemon.sh | 183 ++++
sbin/spark-daemons.sh | 35 +
sbin/spark-executor | 23 +
sbin/start-all.sh | 34 +
sbin/start-master.sh | 52 +
sbin/start-slave.sh | 35 +
sbin/start-slaves.sh | 48 +
sbin/stop-all.sh | 32 +
sbin/stop-master.sh | 27 +
sbin/stop-slaves.sh | 35 +
sbt/sbt | 43 -
sbt/sbt-launch-0.11.3-2.jar | Bin 1096763 -> 0 bytes
sbt/sbt.cmd | 25 -
spark-class | 154 ---
spark-class.cmd | 23 -
spark-class2.cmd | 85 --
spark-executor | 22 -
spark-shell | 102 --
spark-shell.cmd | 22 -
.../spark/streaming/PairDStreamFunctions.scala | 13 +-
.../spark/streaming/StreamingContext.scala | 6 +
.../streaming/api/java/JavaPairDStream.scala | 18 +-
.../api/java/JavaStreamingContext.scala | 25 +-
.../streaming/dstream/ShuffledDStream.scala | 9 +-
.../streaming/dstream/WindowedDStream.scala | 16 +-
.../spark/streaming/WindowOperationsSuite.scala | 4 +-
yarn/README.md | 12 +
yarn/alpha/pom.xml | 32 +
.../spark/deploy/yarn/ApplicationMaster.scala | 464 ++++++++
.../org/apache/spark/deploy/yarn/Client.scala | 509 +++++++++
.../spark/deploy/yarn/WorkerLauncher.scala | 250 +++++
.../spark/deploy/yarn/WorkerRunnable.scala | 236 +++++
.../deploy/yarn/YarnAllocationHandler.scala | 680 ++++++++++++
.../yarn/ApplicationMasterArguments.scala | 94 ++
.../spark/deploy/yarn/ClientArguments.scala | 150 +++
.../yarn/ClientDistributedCacheManager.scala | 228 ++++
.../spark/deploy/yarn/YarnSparkHadoopUtil.scala | 43 +
.../cluster/YarnClientClusterScheduler.scala | 48 +
.../cluster/YarnClientSchedulerBackend.scala | 110 ++
.../cluster/YarnClusterScheduler.scala | 56 +
.../ClientDistributedCacheManagerSuite.scala | 220 ++++
yarn/pom.xml | 84 +-
.../spark/deploy/yarn/ApplicationMaster.scala | 477 ---------
.../yarn/ApplicationMasterArguments.scala | 94 --
.../org/apache/spark/deploy/yarn/Client.scala | 505 ---------
.../spark/deploy/yarn/ClientArguments.scala | 146 ---
.../yarn/ClientDistributedCacheManager.scala | 228 ----
.../spark/deploy/yarn/WorkerLauncher.scala | 243 -----
.../spark/deploy/yarn/WorkerRunnable.scala | 235 ----
.../deploy/yarn/YarnAllocationHandler.scala | 673 ------------
.../spark/deploy/yarn/YarnSparkHadoopUtil.scala | 43 -
.../cluster/YarnClientClusterScheduler.scala | 48 -
.../cluster/YarnClientSchedulerBackend.scala | 110 --
.../cluster/YarnClusterScheduler.scala | 59 --
.../ClientDistributedCacheManagerSuite.scala | 220 ----
yarn/stable/pom.xml | 32 +
.../spark/deploy/yarn/ApplicationMaster.scala | 432 ++++++++
.../org/apache/spark/deploy/yarn/Client.scala | 525 +++++++++
.../spark/deploy/yarn/WorkerLauncher.scala | 230 ++++
.../spark/deploy/yarn/WorkerRunnable.scala | 210 ++++
.../deploy/yarn/YarnAllocationHandler.scala | 695 ++++++++++++
220 files changed, 8767 insertions(+), 9302 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/7379b291/examples/src/main/java/org/apache/spark/examples/JavaHdfsLR.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/7379b291/examples/src/main/java/org/apache/spark/examples/JavaKMeans.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/7379b291/examples/src/main/java/org/apache/spark/examples/JavaLogQuery.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/7379b291/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/7379b291/examples/src/main/java/org/apache/spark/examples/JavaSparkPi.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/7379b291/examples/src/main/java/org/apache/spark/examples/JavaTC.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/7379b291/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/7379b291/examples/src/main/java/org/apache/spark/mllib/examples/JavaALS.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/7379b291/examples/src/main/java/org/apache/spark/mllib/examples/JavaKMeans.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/7379b291/examples/src/main/java/org/apache/spark/mllib/examples/JavaLR.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/7379b291/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/7379b291/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/7379b291/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/7379b291/examples/src/main/java/org/apache/spark/streaming/examples/JavaQueueStream.java
----------------------------------------------------------------------
[4/4] git commit: Merge pull request #318 from srowen/master
Posted by rx...@apache.org.
Merge pull request #318 from srowen/master
Suggested small changes to Java code for slightly more standard style, encapsulation and in some cases performance
Sorry if this is too abrupt or not a welcome set of changes, but thought I'd see if I could contribute a little. I'm a Java developer and just getting seriously into Spark. So I thought I'd suggest a number of small changes to the couple Java parts of the code to make it a little tighter, more standard and even a bit faster.
Feel free to take all, some or none of this. Happy to explain any of it.
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/15d95345
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/15d95345
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/15d95345
Branch: refs/heads/master
Commit: 15d953450167c4ec45c9d0a2c7ab8ee71be2e576
Parents: 468af0f 4b92a20
Author: Reynold Xin <rx...@apache.org>
Authored: Tue Jan 7 08:10:02 2014 -0800
Committer: Reynold Xin <rx...@apache.org>
Committed: Tue Jan 7 08:10:02 2014 -0800
----------------------------------------------------------------------
.../apache/spark/network/netty/FileClient.java | 5 +-
.../netty/FileClientChannelInitializer.java | 2 +-
.../apache/spark/network/netty/FileServer.java | 8 +--
.../netty/FileServerChannelInitializer.java | 4 +-
.../spark/network/netty/FileServerHandler.java | 6 +--
.../spark/network/netty/PathResolver.java | 52 ++++++++++----------
.../org/apache/spark/examples/JavaHdfsLR.java | 29 ++++++-----
.../org/apache/spark/examples/JavaKMeans.java | 23 ++++++---
.../org/apache/spark/examples/JavaLogQuery.java | 20 ++++----
.../org/apache/spark/examples/JavaPageRank.java | 12 +++--
.../org/apache/spark/examples/JavaSparkPi.java | 10 ++--
.../java/org/apache/spark/examples/JavaTC.java | 16 +++---
.../apache/spark/examples/JavaWordCount.java | 10 +++-
.../apache/spark/mllib/examples/JavaALS.java | 18 ++++---
.../apache/spark/mllib/examples/JavaKMeans.java | 16 +++---
.../org/apache/spark/mllib/examples/JavaLR.java | 19 ++++---
.../streaming/examples/JavaFlumeEventCount.java | 5 +-
.../streaming/examples/JavaKafkaWordCount.java | 16 ++++--
.../examples/JavaNetworkWordCount.java | 15 ++++--
.../streaming/examples/JavaQueueStream.java | 11 +++--
20 files changed, 174 insertions(+), 123 deletions(-)
----------------------------------------------------------------------
[3/4] git commit: Issue #318 : minor style updates per review from
Reynold Xin
Posted by rx...@apache.org.
Issue #318 : minor style updates per review from Reynold Xin
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/4b92a202
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/4b92a202
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/4b92a202
Branch: refs/heads/master
Commit: 4b92a20232bc24fd858ed4eb7c45462241e36829
Parents: 7379b29
Author: Sean Owen <so...@cloudera.com>
Authored: Tue Jan 7 09:38:45 2014 +0000
Committer: Sean Owen <so...@cloudera.com>
Committed: Tue Jan 7 09:38:45 2014 +0000
----------------------------------------------------------------------
.../src/main/java/org/apache/spark/examples/JavaHdfsLR.java | 5 +----
.../src/main/java/org/apache/spark/examples/JavaKMeans.java | 3 ---
.../src/main/java/org/apache/spark/examples/JavaLogQuery.java | 3 ---
.../src/main/java/org/apache/spark/examples/JavaPageRank.java | 4 ----
.../src/main/java/org/apache/spark/examples/JavaSparkPi.java | 3 ---
examples/src/main/java/org/apache/spark/examples/JavaTC.java | 3 ---
.../src/main/java/org/apache/spark/examples/JavaWordCount.java | 3 ---
.../src/main/java/org/apache/spark/mllib/examples/JavaALS.java | 5 +----
.../main/java/org/apache/spark/mllib/examples/JavaKMeans.java | 3 ---
.../src/main/java/org/apache/spark/mllib/examples/JavaLR.java | 3 ---
10 files changed, 2 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/4b92a202/examples/src/main/java/org/apache/spark/examples/JavaHdfsLR.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/JavaHdfsLR.java b/examples/src/main/java/org/apache/spark/examples/JavaHdfsLR.java
index 71bd3b4..d552c47 100644
--- a/examples/src/main/java/org/apache/spark/examples/JavaHdfsLR.java
+++ b/examples/src/main/java/org/apache/spark/examples/JavaHdfsLR.java
@@ -35,9 +35,6 @@ public final class JavaHdfsLR {
private static final int D = 10; // Number of dimensions
private static final Random rand = new Random(42);
- private JavaHdfsLR() {
- }
-
static class DataPoint implements Serializable {
DataPoint(double[] x, double y) {
this.x = x;
@@ -57,7 +54,7 @@ public final class JavaHdfsLR {
double y = Double.parseDouble(tok[0]);
double[] x = new double[D];
for (int i = 0; i < D; i++) {
- x[i] = Double.parseDouble(tok[i+1]);
+ x[i] = Double.parseDouble(tok[i + 1]);
}
return new DataPoint(x, y);
}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/4b92a202/examples/src/main/java/org/apache/spark/examples/JavaKMeans.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/JavaKMeans.java b/examples/src/main/java/org/apache/spark/examples/JavaKMeans.java
index 0808f33..0dc8792 100644
--- a/examples/src/main/java/org/apache/spark/examples/JavaKMeans.java
+++ b/examples/src/main/java/org/apache/spark/examples/JavaKMeans.java
@@ -36,9 +36,6 @@ public final class JavaKMeans {
private static final Pattern SPACE = Pattern.compile(" ");
- private JavaKMeans() {
- }
-
/** Parses numbers split by whitespace to a vector */
static Vector parseVector(String line) {
String[] splits = SPACE.split(line);
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/4b92a202/examples/src/main/java/org/apache/spark/examples/JavaLogQuery.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/JavaLogQuery.java b/examples/src/main/java/org/apache/spark/examples/JavaLogQuery.java
index d45d96d..9eb1cad 100644
--- a/examples/src/main/java/org/apache/spark/examples/JavaLogQuery.java
+++ b/examples/src/main/java/org/apache/spark/examples/JavaLogQuery.java
@@ -54,9 +54,6 @@ public final class JavaLogQuery {
public static final Pattern apacheLogRegex = Pattern.compile(
"^([\\d.]+) (\\S+) (\\S+) \\[([\\w\\d:/]+\\s[+\\-]\\d{4})\\] \"(.+?)\" (\\d{3}) ([\\d\\-]+) \"([^\"]+)\" \"([^\"]+)\".*");
- private JavaLogQuery() {
- }
-
/** Tracks the total query count and number of aggregate bytes for a particular group. */
public static class Stats implements Serializable {
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/4b92a202/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java b/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java
index 12d2cce..a84245b 100644
--- a/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java
+++ b/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java
@@ -17,7 +17,6 @@
package org.apache.spark.examples;
-import org.apache.spark.SparkContext;
import scala.Tuple2;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
@@ -43,9 +42,6 @@ import java.util.regex.Pattern;
public final class JavaPageRank {
private static final Pattern SPACES = Pattern.compile("\\s+");
- private JavaPageRank() {
- }
-
private static class Sum extends Function2<Double, Double, Double> {
@Override
public Double call(Double a, Double b) {
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/4b92a202/examples/src/main/java/org/apache/spark/examples/JavaSparkPi.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/JavaSparkPi.java b/examples/src/main/java/org/apache/spark/examples/JavaSparkPi.java
index f6ed510..3ec4a58 100644
--- a/examples/src/main/java/org/apache/spark/examples/JavaSparkPi.java
+++ b/examples/src/main/java/org/apache/spark/examples/JavaSparkPi.java
@@ -28,9 +28,6 @@ import java.util.List;
/** Computes an approximation to pi */
public final class JavaSparkPi {
- private JavaSparkPi() {
- }
-
public static void main(String[] args) throws Exception {
if (args.length == 0) {
System.err.println("Usage: JavaLogQuery <master> [slices]");
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/4b92a202/examples/src/main/java/org/apache/spark/examples/JavaTC.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/JavaTC.java b/examples/src/main/java/org/apache/spark/examples/JavaTC.java
index 12b564d..2ceb0fd 100644
--- a/examples/src/main/java/org/apache/spark/examples/JavaTC.java
+++ b/examples/src/main/java/org/apache/spark/examples/JavaTC.java
@@ -37,9 +37,6 @@ public final class JavaTC {
private static final int numVertices = 100;
private static final Random rand = new Random(42);
- private JavaTC() {
- }
-
static List<Tuple2<Integer, Integer>> generateGraph() {
Set<Tuple2<Integer, Integer>> edges = new HashSet<Tuple2<Integer, Integer>>(numEdges);
while (edges.size() < numEdges) {
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/4b92a202/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java b/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java
index fc9beb8..6651f98 100644
--- a/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java
+++ b/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java
@@ -32,9 +32,6 @@ import java.util.regex.Pattern;
public final class JavaWordCount {
private static final Pattern SPACE = Pattern.compile(" ");
- private JavaWordCount() {
- }
-
public static void main(String[] args) throws Exception {
if (args.length < 2) {
System.err.println("Usage: JavaWordCount <master> <file>");
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/4b92a202/examples/src/main/java/org/apache/spark/mllib/examples/JavaALS.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/mllib/examples/JavaALS.java b/examples/src/main/java/org/apache/spark/mllib/examples/JavaALS.java
index c42d9cb..435a86e 100644
--- a/examples/src/main/java/org/apache/spark/mllib/examples/JavaALS.java
+++ b/examples/src/main/java/org/apache/spark/mllib/examples/JavaALS.java
@@ -33,10 +33,7 @@ import scala.Tuple2;
/**
* Example using MLLib ALS from Java.
*/
-public final class JavaALS {
-
- private JavaALS() {
- }
+public final class JavaALS {
static class ParseRating extends Function<String, Rating> {
private static final Pattern COMMA = Pattern.compile(",");
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/4b92a202/examples/src/main/java/org/apache/spark/mllib/examples/JavaKMeans.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/mllib/examples/JavaKMeans.java b/examples/src/main/java/org/apache/spark/mllib/examples/JavaKMeans.java
index 9d10473..4b2658f 100644
--- a/examples/src/main/java/org/apache/spark/mllib/examples/JavaKMeans.java
+++ b/examples/src/main/java/org/apache/spark/mllib/examples/JavaKMeans.java
@@ -32,9 +32,6 @@ import java.util.regex.Pattern;
*/
public final class JavaKMeans {
- private JavaKMeans() {
- }
-
static class ParsePoint extends Function<String, double[]> {
private static final Pattern SPACE = Pattern.compile(" ");
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/4b92a202/examples/src/main/java/org/apache/spark/mllib/examples/JavaLR.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/mllib/examples/JavaLR.java b/examples/src/main/java/org/apache/spark/mllib/examples/JavaLR.java
index b057f71..21586ce 100644
--- a/examples/src/main/java/org/apache/spark/mllib/examples/JavaLR.java
+++ b/examples/src/main/java/org/apache/spark/mllib/examples/JavaLR.java
@@ -34,9 +34,6 @@ import java.util.regex.Pattern;
*/
public final class JavaLR {
- private JavaLR() {
- }
-
static class ParsePoint extends Function<String, LabeledPoint> {
private static final Pattern COMMA = Pattern.compile(",");
private static final Pattern SPACE = Pattern.compile(" ");