You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/05/25 11:04:13 UTC
[4/7] flink git commit: [FLINK-6707] [examples] Activate strict
checkstyle for flink-examples
[FLINK-6707] [examples] Activate strict checkstyle for flink-examples
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/789ed8a8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/789ed8a8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/789ed8a8
Branch: refs/heads/master
Commit: 789ed8a8246d140e1621a5860645a747132d6618
Parents: d481f29
Author: Greg Hogan <co...@greghogan.com>
Authored: Wed May 24 13:24:02 2017 -0400
Committer: zentol <ch...@apache.org>
Committed: Thu May 25 09:48:30 2017 +0200
----------------------------------------------------------------------
.../flink/examples/java/clustering/KMeans.java | 31 +-
.../java/clustering/util/KMeansData.java | 4 +-
.../clustering/util/KMeansDataGenerator.java | 68 +-
.../flink/examples/java/distcp/DistCp.java | 12 +-
.../examples/java/distcp/FileCopyTask.java | 9 +-
.../java/distcp/FileCopyTaskInputFormat.java | 6 +-
.../java/distcp/FileCopyTaskInputSplit.java | 6 +-
.../java/graph/ConnectedComponents.java | 74 +-
.../examples/java/graph/EnumTriangles.java | 70 +-
.../flink/examples/java/graph/PageRank.java | 84 +--
.../java/graph/TransitiveClosureNaive.java | 22 +-
.../graph/util/ConnectedComponentsData.java | 18 +-
.../java/graph/util/EnumTrianglesData.java | 19 +-
.../java/graph/util/EnumTrianglesDataTypes.java | 128 ++--
.../examples/java/graph/util/PageRankData.java | 22 +-
.../java/misc/CollectionExecutionExample.java | 50 +-
.../flink/examples/java/misc/PiEstimation.java | 35 +-
.../examples/java/ml/LinearRegression.java | 80 +--
.../java/ml/util/LinearRegressionData.java | 2 +-
.../ml/util/LinearRegressionDataGenerator.java | 21 +-
.../relational/EmptyFieldsCountAccumulator.java | 28 +-
.../examples/java/relational/TPCHQuery10.java | 107 ++-
.../examples/java/relational/TPCHQuery3.java | 183 +++--
.../java/relational/WebLogAnalysis.java | 72 +-
.../java/relational/util/WebLogData.java | 709 +++++++++----------
.../relational/util/WebLogDataGenerator.java | 27 +-
.../examples/java/wordcount/WordCount.java | 33 +-
.../examples/java/wordcount/WordCountPojo.java | 38 +-
.../examples/scala/graph/EnumTriangles.scala | 11 +-
.../examples/scala/graph/PageRankBasic.scala | 3 +-
.../examples/scala/relational/TPCHQuery10.scala | 3 +-
.../examples/scala/relational/TPCHQuery3.scala | 3 +-
.../examples/async/AsyncIOExample.java | 13 +-
.../examples/iteration/IterateExample.java | 14 +-
.../iteration/util/IterateExampleData.java | 3 +
.../streaming/examples/join/WindowJoin.java | 17 +-
.../streaming/examples/kafka/ReadFromKafka.java | 5 +-
.../examples/kafka/WriteIntoKafka.java | 12 +-
.../ml/IncrementalLearningSkeleton.java | 8 +-
.../util/IncrementalLearningSkeletonData.java | 3 +
.../examples/sideoutput/SideOutputExample.java | 16 +-
.../examples/socket/SocketWindowWordCount.java | 4 +-
.../examples/twitter/TwitterExample.java | 31 +-
.../twitter/util/TwitterExampleData.java | 4 +-
.../examples/utils/ThrottledIterator.java | 4 +-
.../GroupedProcessingTimeWindowExample.java | 40 +-
.../examples/windowing/SessionWindowing.java | 6 +-
.../examples/windowing/TopSpeedWindowing.java | 2 +-
.../examples/windowing/WindowWordCount.java | 12 +-
.../windowing/util/SessionWindowingData.java | 3 +
.../util/TopSpeedWindowingExampleData.java | 3 +
.../examples/wordcount/PojoExample.java | 14 +-
.../streaming/examples/wordcount/WordCount.java | 16 +-
.../scala/examples/join/WindowJoin.scala | 2 +-
.../iteration/IterateExampleITCase.java | 4 +-
.../join/WindowJoinData.java | 2 +-
.../join/WindowJoinITCase.java | 19 +-
.../ml/IncrementalLearningSkeletonITCase.java | 3 +
.../twitter/TwitterStreamITCase.java | 3 +
.../windowing/SessionWindowingITCase.java | 3 +
.../TopSpeedWindowingExampleITCase.java | 5 +-
.../windowing/WindowWordCountITCase.java | 3 +
.../wordcount/PojoExampleITCase.java | 3 +
.../wordcount/WordCountITCase.java | 3 +
.../TopSpeedWindowingExampleITCase.java | 5 +-
.../socket/SocketWindowWordCountITCase.java | 42 +-
.../src/test/resources/log4j-test.properties | 2 +-
.../scala/examples/WindowJoinITCase.scala | 6 +-
.../flink/table/examples/java/WordCountSQL.java | 10 +-
.../table/examples/java/WordCountTable.java | 21 +-
flink-examples/pom.xml | 50 +-
71 files changed, 1251 insertions(+), 1143 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/789ed8a8/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java
index 8e51df8..101eda3 100644
--- a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java
+++ b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java
@@ -18,27 +18,26 @@
package org.apache.flink.examples.java.clustering;
-import java.io.Serializable;
-import java.util.Collection;
-
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
+import org.apache.flink.api.java.operators.IterativeDataSet;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.examples.java.clustering.util.KMeansData;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.operators.IterativeDataSet;
+
+import java.io.Serializable;
+import java.util.Collection;
/**
* This example implements a basic K-Means clustering algorithm.
*
- * <p>
- * K-Means is an iterative clustering algorithm and works as follows:<br>
+ * <p>K-Means is an iterative clustering algorithm and works as follows:<br>
* K-Means is given a set of data points to be clustered and an initial set of <i>K</i> cluster centers.
* In each iteration, the algorithm computes the distance of each data point to each cluster center.
* Each point is assigned to the cluster center which is closest to it.
@@ -48,13 +47,11 @@ import org.apache.flink.api.java.operators.IterativeDataSet;
* or if cluster centers do not (significantly) move in an iteration.<br>
* This is the Wikipedia entry for the <a href="http://en.wikipedia.org/wiki/K-means_clustering">K-Means Clustering algorithm</a>.
*
- * <p>
- * This implementation works on two-dimensional data points. <br>
+ * <p>This implementation works on two-dimensional data points. <br>
* It computes an assignment of data points to cluster centers, i.e.,
* each data point is annotated with the id of the final cluster (center) it belongs to.
*
- * <p>
- * Input files are plain text files and must be formatted as follows:
+ * <p>Input files are plain text files and must be formatted as follows:
* <ul>
* <li>Data points are represented as two double values separated by a blank character.
* Data points are separated by newline characters.<br>
@@ -63,12 +60,10 @@ import org.apache.flink.api.java.operators.IterativeDataSet;
* For example <code>"1 6.2 3.2\n2 2.9 5.7\n"</code> gives two centers (id=1, x=6.2, y=3.2) and (id=2, x=2.9, y=5.7).
* </ul>
*
- * <p>
- * Usage: <code>KMeans --points <path> --centroids <path> --output <path> --iterations <n></code><br>
+ * <p>Usage: <code>KMeans --points <path> --centroids <path> --output <path> --iterations <n></code><br>
* If no parameters are provided, the program is run with default data from {@link org.apache.flink.examples.java.clustering.util.KMeansData} and 10 iterations.
*
- * <p>
- * This example shows how to use:
+ * <p>This example shows how to use:
* <ul>
* <li>Bulk iterations
* <li>Broadcast variables in bulk iterations
@@ -187,7 +182,7 @@ public class KMeans {
}
public double euclideanDistance(Point other) {
- return Math.sqrt((x-other.x)*(x-other.x) + (y-other.y)*(y-other.y));
+ return Math.sqrt((x - other.x) * (x - other.x) + (y - other.y) * (y - other.y));
}
public void clear() {
@@ -210,7 +205,7 @@ public class KMeans {
public Centroid() {}
public Centroid(int id, double x, double y) {
- super(x,y);
+ super(x, y);
this.id = id;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/789ed8a8/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/util/KMeansData.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/util/KMeansData.java b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/util/KMeansData.java
index e165612..24c30a8 100644
--- a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/util/KMeansData.java
+++ b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/util/KMeansData.java
@@ -92,7 +92,7 @@ public class KMeansData {
}
return env.fromCollection(centroidList);
}
-
+
public static DataSet<Point> getDefaultPointDataSet(ExecutionEnvironment env) {
List<Point> pointList = new LinkedList<Point>();
for (Object[] point : POINTS) {
@@ -100,5 +100,5 @@ public class KMeansData {
}
return env.fromCollection(pointList);
}
-
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/789ed8a8/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/util/KMeansDataGenerator.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/util/KMeansDataGenerator.java b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/util/KMeansDataGenerator.java
index 8f48d0a..9f7c98d 100644
--- a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/util/KMeansDataGenerator.java
+++ b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/util/KMeansDataGenerator.java
@@ -16,9 +16,11 @@
* limitations under the License.
*/
-
package org.apache.flink.examples.java.clustering.util;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.examples.java.clustering.KMeans;
+
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
@@ -27,18 +29,15 @@ import java.text.DecimalFormat;
import java.util.Locale;
import java.util.Random;
-import org.apache.flink.api.java.utils.ParameterTool;
-import org.apache.flink.examples.java.clustering.KMeans;
-
/**
* Generates data for the {@link KMeans} example program.
*/
public class KMeansDataGenerator {
-
+
static {
Locale.setDefault(Locale.US);
}
-
+
private static final String CENTERS_FILE = "centers";
private static final String POINTS_FILE = "points";
private static final long DEFAULT_SEED = 4650285087650871364L;
@@ -50,14 +49,14 @@ public class KMeansDataGenerator {
/**
* Main method to generate data for the {@link KMeans} example program.
- * <p>
- * The generator creates to files:
+ *
+ * <p>The generator creates to files:
* <ul>
* <li><code>< output-path >/points</code> for the data points
* <li><code>< output-path >/centers</code> for the cluster centers
- * </ul>
- *
- * @param args
+ * </ul>
+ *
+ * @param args
* <ol>
* <li>Int: Number of data points
* <li>Int: Number of cluster centers
@@ -87,22 +86,21 @@ public class KMeansDataGenerator {
final double range = params.getDouble("range", DEFAULT_VALUE_RANGE);
final long firstSeed = params.getLong("seed", DEFAULT_SEED);
-
final double absoluteStdDev = stddev * range;
final Random random = new Random(firstSeed);
-
+
// the means around which data points are distributed
final double[][] means = uniformRandomCenters(random, k, DIMENSIONALITY, range);
-
+
// write the points out
BufferedWriter pointsOut = null;
try {
- pointsOut = new BufferedWriter(new FileWriter(new File(outDir+"/"+POINTS_FILE)));
+ pointsOut = new BufferedWriter(new FileWriter(new File(outDir + "/" + POINTS_FILE)));
StringBuilder buffer = new StringBuilder();
-
+
double[] point = new double[DIMENSIONALITY];
int nextCentroid = 0;
-
+
for (int i = 1; i <= numDataPoints; i++) {
// generate a point for the current centroid
double[] centroid = means[nextCentroid];
@@ -118,15 +116,15 @@ public class KMeansDataGenerator {
pointsOut.close();
}
}
-
+
// write the uniformly distributed centers to a file
BufferedWriter centersOut = null;
try {
- centersOut = new BufferedWriter(new FileWriter(new File(outDir+"/"+CENTERS_FILE)));
+ centersOut = new BufferedWriter(new FileWriter(new File(outDir + "/" + CENTERS_FILE)));
StringBuilder buffer = new StringBuilder();
-
+
double[][] centers = uniformRandomCenters(random, k, DIMENSIONALITY, range);
-
+
for (int i = 0; i < k; i++) {
writeCenter(i + 1, centers[i], buffer, centersOut);
}
@@ -136,41 +134,41 @@ public class KMeansDataGenerator {
centersOut.close();
}
}
-
- System.out.println("Wrote "+numDataPoints+" data points to "+outDir+"/"+POINTS_FILE);
- System.out.println("Wrote "+k+" cluster centers to "+outDir+"/"+CENTERS_FILE);
+
+ System.out.println("Wrote " + numDataPoints + " data points to " + outDir + "/" + POINTS_FILE);
+ System.out.println("Wrote " + k + " cluster centers to " + outDir + "/" + CENTERS_FILE);
}
-
+
private static double[][] uniformRandomCenters(Random rnd, int num, int dimensionality, double range) {
final double halfRange = range / 2;
final double[][] points = new double[num][dimensionality];
-
+
for (int i = 0; i < num; i++) {
- for (int dim = 0; dim < dimensionality; dim ++) {
+ for (int dim = 0; dim < dimensionality; dim++) {
points[i][dim] = (rnd.nextDouble() * range) - halfRange;
}
}
return points;
}
-
+
private static void writePoint(double[] coordinates, StringBuilder buffer, BufferedWriter out) throws IOException {
buffer.setLength(0);
-
+
// write coordinates
for (int j = 0; j < coordinates.length; j++) {
buffer.append(FORMAT.format(coordinates[j]));
- if(j < coordinates.length - 1) {
+ if (j < coordinates.length - 1) {
buffer.append(DELIMITER);
}
}
-
+
out.write(buffer.toString());
out.newLine();
}
-
+
private static void writeCenter(long id, double[] coordinates, StringBuilder buffer, BufferedWriter out) throws IOException {
buffer.setLength(0);
-
+
// write id
buffer.append(id);
buffer.append(DELIMITER);
@@ -178,11 +176,11 @@ public class KMeansDataGenerator {
// write coordinates
for (int j = 0; j < coordinates.length; j++) {
buffer.append(FORMAT.format(coordinates[j]));
- if(j < coordinates.length - 1) {
+ if (j < coordinates.length - 1) {
buffer.append(DELIMITER);
}
}
-
+
out.write(buffer.toString());
out.newLine();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/789ed8a8/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/distcp/DistCp.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/distcp/DistCp.java b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/distcp/DistCp.java
index 82f1c52..a358490 100644
--- a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/distcp/DistCp.java
+++ b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/distcp/DistCp.java
@@ -18,8 +18,6 @@
package org.apache.flink.examples.java.distcp;
-import org.apache.commons.io.IOUtils;
-
import org.apache.flink.api.common.accumulators.LongCounter;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.java.DataSet;
@@ -37,6 +35,7 @@ import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.util.Collector;
+import org.apache.commons.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -52,12 +51,12 @@ import java.util.Map;
* (see <a href="http://hadoop.apache.org/docs/r1.2.1/distcp.html">http://hadoop.apache.org/docs/r1.2.1/distcp.html</a>)
* with a dynamic input format
* Note that this tool does not deal with retriability. Additionally, empty directories are not copied over.
- * <p>
- * When running locally, local file systems paths can be used.
+ *
+ * <p>When running locally, local file systems paths can be used.
* However, in a distributed environment HDFS paths must be provided both as input and output.
*/
public class DistCp {
-
+
private static final Logger LOGGER = LoggerFactory.getLogger(DistCp.class);
public static final String BYTES_COPIED_CNT_NAME = "BYTES_COPIED";
public static final String FILES_COPIED_CNT_NAME = "FILES_COPIED";
@@ -100,7 +99,6 @@ public class DistCp {
new FileCopyTaskInputFormat(tasks),
new GenericTypeInfo<>(FileCopyTask.class), "fileCopyTasks");
-
FlatMapOperator<FileCopyTask, Object> res = inputTasks.flatMap(new RichFlatMapFunction<FileCopyTask, Object>() {
private static final long serialVersionUID = 1109254230243989929L;
@@ -139,7 +137,7 @@ public class DistCp {
IOUtils.closeQuietly(inputStream);
IOUtils.closeQuietly(outputStream);
}
- fileCounter.add(1l);
+ fileCounter.add(1L);
}
});
http://git-wip-us.apache.org/repos/asf/flink/blob/789ed8a8/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTask.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTask.java b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTask.java
index 7f38a8b..4a8f38b 100644
--- a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTask.java
+++ b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTask.java
@@ -18,18 +18,19 @@
package org.apache.flink.examples.java.distcp;
-import org.apache.commons.lang3.StringUtils;
import org.apache.flink.core.fs.Path;
+import org.apache.commons.lang3.StringUtils;
+
import java.io.Serializable;
/**
- * A Java POJO that represents a task for copying a single file
+ * A Java POJO that represents a task for copying a single file.
*/
public class FileCopyTask implements Serializable {
-
+
private static final long serialVersionUID = -8760082278978316032L;
-
+
private final Path path;
private final String relativePath;
http://git-wip-us.apache.org/repos/asf/flink/blob/789ed8a8/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTaskInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTaskInputFormat.java b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTaskInputFormat.java
index d6e6713..dfd9bf0 100644
--- a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTaskInputFormat.java
+++ b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTaskInputFormat.java
@@ -35,14 +35,14 @@ import java.util.Queue;
/**
* An implementation of an input format that dynamically assigns {@code FileCopyTask} to the mappers
- * that have finished previously assigned tasks
+ * that have finished previously assigned tasks.
*/
public class FileCopyTaskInputFormat implements InputFormat<FileCopyTask, FileCopyTaskInputSplit> {
private static final long serialVersionUID = -644394866425221151L;
-
+
private static final Logger LOGGER = LoggerFactory.getLogger(FileCopyTaskInputFormat.class);
-
+
private final List<FileCopyTask> tasks;
http://git-wip-us.apache.org/repos/asf/flink/blob/789ed8a8/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTaskInputSplit.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTaskInputSplit.java b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTaskInputSplit.java
index 33943b6..b7ec0c9 100644
--- a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTaskInputSplit.java
+++ b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTaskInputSplit.java
@@ -21,12 +21,12 @@ package org.apache.flink.examples.java.distcp;
import org.apache.flink.core.io.InputSplit;
/**
- * Implementation of {@code InputSplit} for copying files
+ * Implementation of {@code InputSplit} for copying files.
*/
public class FileCopyTaskInputSplit implements InputSplit {
-
+
private static final long serialVersionUID = -7621656017747660450L;
-
+
private final FileCopyTask task;
private final int splitNumber;
http://git-wip-us.apache.org/repos/asf/flink/blob/789ed8a8/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/graph/ConnectedComponents.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/graph/ConnectedComponents.java b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/graph/ConnectedComponents.java
index 3bd6522..9568b31 100644
--- a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/graph/ConnectedComponents.java
+++ b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/graph/ConnectedComponents.java
@@ -16,69 +16,63 @@
* limitations under the License.
*/
-
package org.apache.flink.examples.java.graph;
import org.apache.flink.api.common.functions.FlatJoinFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.aggregation.Aggregations;
import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst;
import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond;
+import org.apache.flink.api.java.operators.DeltaIteration;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.examples.java.graph.util.ConnectedComponentsData;
import org.apache.flink.util.Collector;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.operators.DeltaIteration;
-import org.apache.flink.api.java.ExecutionEnvironment;
/**
* An implementation of the connected components algorithm, using a delta iteration.
- *
- * <p>
- * Initially, the algorithm assigns each vertex an unique ID. In each step, a vertex picks the minimum of its own ID and its
+ *
+ * <p>Initially, the algorithm assigns each vertex an unique ID. In each step, a vertex picks the minimum of its own ID and its
* neighbors' IDs, as its new ID and tells its neighbors about its new ID. After the algorithm has completed, all vertices in the
* same component will have the same ID.
- *
- * <p>
- * A vertex whose component ID did not change needs not propagate its information in the next step. Because of that,
+ *
+ * <p>A vertex whose component ID did not change needs not propagate its information in the next step. Because of that,
* the algorithm is easily expressible via a delta iteration. We here model the solution set as the vertices with
* their current component ids, and the workset as the changed vertices. Because we see all vertices initially as
* changed, the initial workset and the initial solution set are identical. Also, the delta to the solution set
* is consequently also the next workset.<br>
- *
- * <p>
- * Input files are plain text files and must be formatted as follows:
+ *
+ * <p>Input files are plain text files and must be formatted as follows:
* <ul>
- * <li>Vertices represented as IDs and separated by new-line characters.<br>
+ * <li>Vertices represented as IDs and separated by new-line characters.<br>
* For example <code>"1\n2\n12\n42\n63"</code> gives five vertices (1), (2), (12), (42), and (63).
- * <li>Edges are represented as pairs for vertex IDs which are separated by space
+ * <li>Edges are represented as pairs for vertex IDs which are separated by space
* characters. Edges are separated by new-line characters.<br>
* For example <code>"1 2\n2 12\n1 12\n42 63"</code> gives four (undirected) edges (1)-(2), (2)-(12), (1)-(12), and (42)-(63).
* </ul>
- *
- * <p>
- * Usage: <code>ConnectedComponents --vertices <path> --edges <path> --output <path> --iterations <n></code><br>
- * If no parameters are provided, the program is run with default data from {@link org.apache.flink.examples.java.graph.util.ConnectedComponentsData} and 10 iterations.
- *
- * <p>
- * This example shows how to use:
+ *
+ * <p>Usage: <code>ConnectedComponents --vertices <path> --edges <path> --output <path> --iterations <n></code><br>
+ * If no parameters are provided, the program is run with default data from {@link org.apache.flink.examples.java.graph.util.ConnectedComponentsData} and 10 iterations.
+ *
+ * <p>This example shows how to use:
* <ul>
* <li>Delta Iterations
- * <li>Generic-typed Functions
+ * <li>Generic-typed Functions
* </ul>
*/
@SuppressWarnings("serial")
public class ConnectedComponents {
-
+
// *************************************************************************
// PROGRAM
// *************************************************************************
-
+
public static void main(String... args) throws Exception {
// Checking input parameters
@@ -91,19 +85,19 @@ public class ConnectedComponents {
// make parameters available in the web interface
env.getConfig().setGlobalJobParameters(params);
-
+
// read vertex and edge data
DataSet<Long> vertices = getVertexDataSet(env, params);
DataSet<Tuple2<Long, Long>> edges = getEdgeDataSet(env, params).flatMap(new UndirectEdge());
-
+
// assign the initial components (equal to the vertex id)
DataSet<Tuple2<Long, Long>> verticesWithInitialId =
vertices.map(new DuplicateValue<Long>());
-
+
// open a delta iteration
DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iteration =
verticesWithInitialId.iterateDelta(verticesWithInitialId, maxIterations, 0);
-
+
// apply the step logic: join with the edges, select the minimum neighbor, update if the component of the candidate is smaller
DataSet<Tuple2<Long, Long>> changes = iteration.getWorkset().join(edges).where(0).equalTo(0).with(new NeighborWithComponentIDJoin())
.groupBy(0).aggregate(Aggregations.MIN, 1)
@@ -112,7 +106,7 @@ public class ConnectedComponents {
// close the delta iteration (delta and new workset are identical)
DataSet<Tuple2<Long, Long>> result = iteration.closeWith(changes, changes);
-
+
// emit result
if (params.has("output")) {
result.writeAsCsv(params.get("output"), "\n", " ");
@@ -123,29 +117,29 @@ public class ConnectedComponents {
result.print();
}
}
-
+
// *************************************************************************
// USER FUNCTIONS
// *************************************************************************
-
+
/**
* Function that turns a value into a 2-tuple where both fields are that value.
*/
@ForwardedFields("*->f0")
public static final class DuplicateValue<T> implements MapFunction<T, Tuple2<T, T>> {
-
+
@Override
public Tuple2<T, T> map(T vertex) {
return new Tuple2<T, T>(vertex, vertex);
}
}
-
+
/**
* Undirected edges by emitting for each input edge the input edges itself and an inverted version.
*/
public static final class UndirectEdge implements FlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
Tuple2<Long, Long> invertedEdge = new Tuple2<Long, Long>();
-
+
@Override
public void flatMap(Tuple2<Long, Long> edge, Collector<Tuple2<Long, Long>> out) {
invertedEdge.f0 = edge.f1;
@@ -154,7 +148,7 @@ public class ConnectedComponents {
out.collect(invertedEdge);
}
}
-
+
/**
* UDF that joins a (Vertex-ID, Component-ID) pair that represents the current component that
* a vertex is associated with, with a (Source-Vertex-ID, Target-VertexID) edge. The function
@@ -169,9 +163,11 @@ public class ConnectedComponents {
return new Tuple2<Long, Long>(edge.f1, vertexWithComponent.f1);
}
}
-
-
+ /**
+ * Emit the candidate (Vertex-ID, Component-ID) pair if and only if the
+ * candidate component ID is less than the vertex's current component ID.
+ */
@ForwardedFieldsFirst("*")
public static final class ComponentIdFilter implements FlatJoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
@@ -211,6 +207,4 @@ public class ConnectedComponents {
return ConnectedComponentsData.getDefaultEdgeDataSet(env);
}
}
-
-
}
http://git-wip-us.apache.org/repos/asf/flink/blob/789ed8a8/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/graph/EnumTriangles.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/graph/EnumTriangles.java b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/graph/EnumTriangles.java
index 5fbb321..2c553e4 100644
--- a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/graph/EnumTriangles.java
+++ b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/graph/EnumTriangles.java
@@ -39,17 +39,15 @@ import java.util.List;
/**
* Triangle enumeration is a pre-processing step to find closely connected parts in graphs.
* A triangle consists of three edges that connect three vertices with each other.
- *
- * <p>
- * The algorithm works as follows:
- * It groups all edges that share a common vertex and builds triads, i.e., triples of vertices
- * that are connected by two edges. Finally, all triads are filtered for which no third edge exists
+ *
+ * <p>The algorithm works as follows:
+ * It groups all edges that share a common vertex and builds triads, i.e., triples of vertices
+ * that are connected by two edges. Finally, all triads are filtered for which no third edge exists
* that closes the triangle.
- *
- * <p>
- * Input files are plain text files and must be formatted as follows:
+ *
+ * <p>Input files are plain text files and must be formatted as follows:
* <ul>
- * <li>Edges are represented as pairs for vertex IDs which are separated by space
+ * <li>Edges are represented as pairs for vertex IDs which are separated by space
* characters. Edges are separated by new-line characters.<br>
* For example <code>"1 2\n2 12\n1 12\n42 63"</code> gives four (undirected) edges (1)-(2), (2)-(12), (1)-(12), and (42)-(63)
* that include a triangle
@@ -59,17 +57,15 @@ import java.util.List;
* / \
* (2)-(12)
* </pre>
- *
- * Usage: <code>EnumTriangleBasic --edges <path> --output <path></code><br>
- * If no parameters are provided, the program is run with default data from {@link EnumTrianglesData}.
- *
- * <p>
- * This example shows how to use:
+ *
+ * <p>Usage: <code>EnumTriangleBasic --edges <path> --output <path></code><br>
+ * If no parameters are provided, the program is run with default data from {@link EnumTrianglesData}.
+ *
+ * <p>This example shows how to use:
* <ul>
* <li>Custom Java objects which extend Tuple
* <li>Group Sorting
* </ul>
- *
*/
@SuppressWarnings("serial")
public class EnumTriangles {
@@ -77,7 +73,7 @@ public class EnumTriangles {
// *************************************************************************
// PROGRAM
// *************************************************************************
-
+
public static void main(String[] args) throws Exception {
// Checking input parameters
@@ -88,7 +84,7 @@ public class EnumTriangles {
// make parameters available in the web interface
env.getConfig().setGlobalJobParameters(params);
-
+
// read input data
DataSet<Edge> edges;
if (params.has("edges")) {
@@ -106,7 +102,7 @@ public class EnumTriangles {
// project edges by vertex id
DataSet<Edge> edgesById = edges
.map(new EdgeByIdProjector());
-
+
DataSet<Triad> triangles = edgesById
// build triads
.groupBy(Edge.V1).sortGroup(Edge.V2, Order.ASCENDING).reduceGroup(new TriadBuilder())
@@ -128,60 +124,60 @@ public class EnumTriangles {
// USER FUNCTIONS
// *************************************************************************
- /** Converts a Tuple2 into an Edge */
+ /** Converts a Tuple2 into an Edge. */
@ForwardedFields("0;1")
public static class TupleEdgeConverter implements MapFunction<Tuple2<Integer, Integer>, Edge> {
private final Edge outEdge = new Edge();
-
+
@Override
public Edge map(Tuple2<Integer, Integer> t) throws Exception {
outEdge.copyVerticesFromTuple2(t);
return outEdge;
}
}
-
+
/** Projects an edge (pair of vertices) such that the id of the first is smaller than the id of the second. */
private static class EdgeByIdProjector implements MapFunction<Edge, Edge> {
-
+
@Override
public Edge map(Edge inEdge) throws Exception {
-
+
// flip vertices if necessary
- if(inEdge.getFirstVertex() > inEdge.getSecondVertex()) {
+ if (inEdge.getFirstVertex() > inEdge.getSecondVertex()) {
inEdge.flipVertices();
}
-
+
return inEdge;
}
}
-
+
/**
* Builds triads (triples of vertices) from pairs of edges that share a vertex.
- * The first vertex of a triad is the shared vertex, the second and third vertex are ordered by vertexId.
+ * The first vertex of a triad is the shared vertex, the second and third vertex are ordered by vertexId.
* Assumes that input edges share the first vertex and are in ascending order of the second vertex.
*/
@ForwardedFields("0")
private static class TriadBuilder implements GroupReduceFunction<Edge, Triad> {
private final List<Integer> vertices = new ArrayList<Integer>();
private final Triad outTriad = new Triad();
-
+
@Override
public void reduce(Iterable<Edge> edgesIter, Collector<Triad> out) throws Exception {
-
+
final Iterator<Edge> edges = edgesIter.iterator();
-
+
// clear vertex list
vertices.clear();
-
+
// read first edge
Edge firstEdge = edges.next();
outTriad.setFirstVertex(firstEdge.getFirstVertex());
vertices.add(firstEdge.getSecondVertex());
-
+
// build and emit triads
while (edges.hasNext()) {
Integer higherVertexId = edges.next().getSecondVertex();
-
+
// combine vertex with all previously read vertices
for (Integer lowerVertexId : vertices) {
outTriad.setSecondVertex(lowerVertexId);
@@ -192,14 +188,14 @@ public class EnumTriangles {
}
}
}
-
+
/** Filters triads (three vertices connected by two edges) without a closing third edge. */
private static class TriadFilter implements JoinFunction<Triad, Edge, Triad> {
-
+
@Override
public Triad join(Triad triad, Edge edge) throws Exception {
return triad;
}
}
-
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/789ed8a8/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/graph/PageRank.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/graph/PageRank.java b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/graph/PageRank.java
index 33305af..f22f2e6 100644
--- a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/graph/PageRank.java
+++ b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/graph/PageRank.java
@@ -38,32 +38,28 @@ import static org.apache.flink.api.java.aggregation.Aggregations.SUM;
/**
* A basic implementation of the Page Rank algorithm using a bulk iteration.
- *
- * <p>
- * This implementation requires a set of pages and a set of directed links as input and works as follows. <br>
+ *
+ * <p>This implementation requires a set of pages and a set of directed links as input and works as follows. <br>
* In each iteration, the rank of every page is evenly distributed to all pages it points to.
* Each page collects the partial ranks of all pages that point to it, sums them up, and applies a dampening factor to the sum.
* The result is the new rank of the page. A new iteration is started with the new ranks of all pages.
* This implementation terminates after a fixed number of iterations.<br>
- * This is the Wikipedia entry for the <a href="http://en.wikipedia.org/wiki/Page_rank">Page Rank algorithm</a>.
- *
- * <p>
- * Input files are plain text files and must be formatted as follows:
+ * This is the Wikipedia entry for the <a href="http://en.wikipedia.org/wiki/Page_rank">Page Rank algorithm</a>.
+ *
+ * <p>Input files are plain text files and must be formatted as follows:
* <ul>
- * <li>Pages represented as an (long) ID separated by new-line characters.<br>
+ * <li>Pages represented as an (long) ID separated by new-line characters.<br>
* For example <code>"1\n2\n12\n42\n63"</code> gives five pages with IDs 1, 2, 12, 42, and 63.
- * <li>Links are represented as pairs of page IDs which are separated by space
+ * <li>Links are represented as pairs of page IDs which are separated by space
* characters. Links are separated by new-line characters.<br>
* For example <code>"1 2\n2 12\n1 12\n42 63"</code> gives four (directed) links (1)->(2), (2)->(12), (1)->(12), and (42)->(63).<br>
* For this simple implementation it is required that each page has at least one incoming and one outgoing link (a page can point to itself).
* </ul>
- *
- * <p>
- * Usage: <code>PageRankBasic --pages <path> --links <path> --output <path> --numPages <n> --iterations <n></code><br>
+ *
+ * <p>Usage: <code>PageRankBasic --pages <path> --links <path> --output <path> --numPages <n> --iterations <n></code><br>
* If no parameters are provided, the program is run with default data from {@link org.apache.flink.examples.java.graph.util.PageRankData} and 10 iterations.
- *
- * <p>
- * This example shows how to use:
+ *
+ * <p>This example shows how to use:
* <ul>
* <li>Bulk Iterations
* <li>Default Join
@@ -72,42 +68,42 @@ import static org.apache.flink.api.java.aggregation.Aggregations.SUM;
*/
@SuppressWarnings("serial")
public class PageRank {
-
+
private static final double DAMPENING_FACTOR = 0.85;
private static final double EPSILON = 0.0001;
-
+
// *************************************************************************
// PROGRAM
// *************************************************************************
-
+
public static void main(String[] args) throws Exception {
ParameterTool params = ParameterTool.fromArgs(args);
final int numPages = params.getInt("numPages", PageRankData.getNumberOfPages());
final int maxIterations = params.getInt("iterations", 10);
-
+
// set up execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// make the parameters available to the web ui
env.getConfig().setGlobalJobParameters(params);
-
+
// get input data
DataSet<Long> pagesInput = getPagesDataSet(env, params);
DataSet<Tuple2<Long, Long>> linksInput = getLinksDataSet(env, params);
-
+
// assign initial rank to pages
DataSet<Tuple2<Long, Double>> pagesWithRanks = pagesInput.
map(new RankAssigner((1.0d / numPages)));
-
+
// build adjacency list from link input
- DataSet<Tuple2<Long, Long[]>> adjacencyListInput =
+ DataSet<Tuple2<Long, Long[]>> adjacencyListInput =
linksInput.groupBy(0).reduceGroup(new BuildOutgoingEdgeList());
-
+
// set iterative data set
IterativeDataSet<Tuple2<Long, Double>> iteration = pagesWithRanks.iterate(maxIterations);
-
+
DataSet<Tuple2<Long, Double>> newRanks = iteration
// join pages with outgoing edges and distribute rank
.join(adjacencyListInput).where(0).equalTo(0).flatMap(new JoinVertexWithEdgesMatch())
@@ -115,9 +111,9 @@ public class PageRank {
.groupBy(0).aggregate(SUM, 1)
// apply dampening factor
.map(new Dampener(DAMPENING_FACTOR, numPages));
-
+
DataSet<Tuple2<Long, Double>> finalPageRanks = iteration.closeWith(
- newRanks,
+ newRanks,
newRanks.join(iteration).where(0).equalTo(0)
// termination condition
.filter(new EpsilonFilter()));
@@ -131,45 +127,43 @@ public class PageRank {
System.out.println("Printing result to stdout. Use --output to specify output path.");
finalPageRanks.print();
}
-
-
}
// *************************************************************************
// USER FUNCTIONS
// *************************************************************************
- /**
- * A map function that assigns an initial rank to all pages.
+ /**
+ * A map function that assigns an initial rank to all pages.
*/
public static final class RankAssigner implements MapFunction<Long, Tuple2<Long, Double>> {
Tuple2<Long, Double> outPageWithRank;
-
+
public RankAssigner(double rank) {
this.outPageWithRank = new Tuple2<Long, Double>(-1L, rank);
}
-
+
@Override
public Tuple2<Long, Double> map(Long page) {
outPageWithRank.f0 = page;
return outPageWithRank;
}
}
-
+
/**
* A reduce function that takes a sequence of edges and builds the adjacency list for the vertex where the edges
* originate. Run as a pre-processing step.
*/
@ForwardedFields("0")
public static final class BuildOutgoingEdgeList implements GroupReduceFunction<Tuple2<Long, Long>, Tuple2<Long, Long[]>> {
-
+
private final ArrayList<Long> neighbors = new ArrayList<Long>();
-
+
@Override
public void reduce(Iterable<Tuple2<Long, Long>> values, Collector<Tuple2<Long, Long[]>> out) {
neighbors.clear();
Long id = 0L;
-
+
for (Tuple2<Long, Long> n : values) {
id = n.f0;
neighbors.add(n.f1);
@@ -177,7 +171,7 @@ public class PageRank {
out.collect(new Tuple2<Long, Long[]>(id, neighbors.toArray(new Long[neighbors.size()])));
}
}
-
+
/**
* Join function that distributes a fraction of a vertex's rank to all neighbors.
*/
@@ -194,16 +188,16 @@ public class PageRank {
}
}
}
-
+
/**
- * The function that applies the page rank dampening formula
+ * The function that applies the page rank dampening formula.
*/
@ForwardedFields("0")
- public static final class Dampener implements MapFunction<Tuple2<Long,Double>, Tuple2<Long,Double>> {
+ public static final class Dampener implements MapFunction<Tuple2<Long, Double>, Tuple2<Long, Double>> {
private final double dampening;
private final double randomJump;
-
+
public Dampener(double dampening, double numVertices) {
this.dampening = dampening;
this.randomJump = (1 - dampening) / numVertices;
@@ -215,7 +209,7 @@ public class PageRank {
return value;
}
}
-
+
/**
* Filter that filters vertices where the rank difference is below a threshold.
*/
@@ -226,11 +220,11 @@ public class PageRank {
return Math.abs(value.f0.f1 - value.f1.f1) > EPSILON;
}
}
-
+
// *************************************************************************
// UTIL METHODS
// *************************************************************************
-
+
private static DataSet<Long> getPagesDataSet(ExecutionEnvironment env, ParameterTool params) {
if (params.has("pages")) {
return env.readCsvFile(params.get("pages"))
http://git-wip-us.apache.org/repos/asf/flink/blob/789ed8a8/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/graph/TransitiveClosureNaive.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/graph/TransitiveClosureNaive.java b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/graph/TransitiveClosureNaive.java
index 50e86ec..2857a0c 100644
--- a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/graph/TransitiveClosureNaive.java
+++ b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/graph/TransitiveClosureNaive.java
@@ -32,6 +32,15 @@ import org.apache.flink.util.Collector;
import java.util.HashSet;
import java.util.Set;
+/**
+ * The transitive closure of a graph contains an edge for each pair of vertices
+ * which are endpoints of at least one path in the graph.
+ *
+ * <p>This algorithm is implemented using a delta iteration. The transitive
+ * closure solution set is grown in each step by joining the workset of newly
+ * discovered path endpoints with the original graph edges and discarding
+ * previously discovered path endpoints (already in the solution set).
+ */
@SuppressWarnings("serial")
public class TransitiveClosureNaive {
@@ -57,9 +66,9 @@ public class TransitiveClosureNaive {
edges = ConnectedComponentsData.getDefaultEdgeDataSet(env);
}
- IterativeDataSet<Tuple2<Long,Long>> paths = edges.iterate(maxIterations);
+ IterativeDataSet<Tuple2<Long, Long>> paths = edges.iterate(maxIterations);
- DataSet<Tuple2<Long,Long>> nextPaths = paths
+ DataSet<Tuple2<Long, Long>> nextPaths = paths
.join(edges)
.where(1)
.equalTo(0)
@@ -83,17 +92,17 @@ public class TransitiveClosureNaive {
}
}).withForwardedFields("0;1");
- DataSet<Tuple2<Long,Long>> newPaths = paths
+ DataSet<Tuple2<Long, Long>> newPaths = paths
.coGroup(nextPaths)
.where(0).equalTo(0)
.with(new CoGroupFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>>() {
- Set<Tuple2<Long,Long>> prevSet = new HashSet<Tuple2<Long,Long>>();
+ Set<Tuple2<Long, Long>> prevSet = new HashSet<Tuple2<Long, Long>>();
@Override
public void coGroup(Iterable<Tuple2<Long, Long>> prevPaths, Iterable<Tuple2<Long, Long>> nextPaths, Collector<Tuple2<Long, Long>> out) throws Exception {
- for (Tuple2<Long,Long> prev : prevPaths) {
+ for (Tuple2<Long, Long> prev : prevPaths) {
prevSet.add(prev);
}
- for (Tuple2<Long,Long> next: nextPaths) {
+ for (Tuple2<Long, Long> next: nextPaths) {
if (!prevSet.contains(next)) {
out.collect(next);
}
@@ -103,7 +112,6 @@ public class TransitiveClosureNaive {
DataSet<Tuple2<Long, Long>> transitiveClosure = paths.closeWith(nextPaths, newPaths);
-
// emit result
if (params.has("output")) {
transitiveClosure.writeAsCsv(params.get("output"), "\n", " ");
http://git-wip-us.apache.org/repos/asf/flink/blob/789ed8a8/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/graph/util/ConnectedComponentsData.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/graph/util/ConnectedComponentsData.java b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/graph/util/ConnectedComponentsData.java
index dd1f596..6fb39d8 100644
--- a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/graph/util/ConnectedComponentsData.java
+++ b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/graph/util/ConnectedComponentsData.java
@@ -18,12 +18,12 @@
package org.apache.flink.examples.java.graph.util;
-import java.util.LinkedList;
-import java.util.List;
-
-import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import java.util.LinkedList;
+import java.util.List;
/**
* Provides the default data sets used for the Connected Components example program.
@@ -31,7 +31,7 @@ import org.apache.flink.api.java.ExecutionEnvironment;
*
*/
public class ConnectedComponentsData {
-
+
public static final long[] VERTICES = new long[] {
1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16};
@@ -42,7 +42,7 @@ public class ConnectedComponentsData {
}
return env.fromCollection(verticesList);
}
-
+
public static final Object[][] EDGES = new Object[][] {
new Object[]{1L, 2L},
new Object[]{2L, 3L},
@@ -59,14 +59,14 @@ public class ConnectedComponentsData {
new Object[]{1L, 15L},
new Object[]{16L, 1L}
};
-
+
public static DataSet<Tuple2<Long, Long>> getDefaultEdgeDataSet(ExecutionEnvironment env) {
-
+
List<Tuple2<Long, Long>> edgeList = new LinkedList<Tuple2<Long, Long>>();
for (Object[] edge : EDGES) {
edgeList.add(new Tuple2<Long, Long>((Long) edge[0], (Long) edge[1]));
}
return env.fromCollection(edgeList);
}
-
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/789ed8a8/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/graph/util/EnumTrianglesData.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/graph/util/EnumTrianglesData.java b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/graph/util/EnumTrianglesData.java
index a54b3da..cc3c3ac 100644
--- a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/graph/util/EnumTrianglesData.java
+++ b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/graph/util/EnumTrianglesData.java
@@ -16,16 +16,15 @@
* limitations under the License.
*/
-
package org.apache.flink.examples.java.graph.util;
-import java.util.ArrayList;
-import java.util.List;
-
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.examples.java.graph.util.EnumTrianglesDataTypes.Edge;
+import java.util.ArrayList;
+import java.util.List;
+
/**
* Provides the default data sets used for the Triangle Enumeration example programs.
* The default data sets are used, if no parameters are given to the program.
@@ -36,7 +35,7 @@ public class EnumTrianglesData {
public static final Object[][] EDGES = {
{1, 2},
{1, 3},
- {1 ,4},
+ {1, 4},
{1, 5},
{2, 3},
{2, 5},
@@ -46,14 +45,14 @@ public class EnumTrianglesData {
{5, 6},
{7, 8}
};
-
+
public static DataSet<EnumTrianglesDataTypes.Edge> getDefaultEdgeDataSet(ExecutionEnvironment env) {
-
+
List<EnumTrianglesDataTypes.Edge> edges = new ArrayList<EnumTrianglesDataTypes.Edge>();
- for(Object[] e : EDGES) {
- edges.add(new Edge((Integer)e[0], (Integer)e[1]));
+ for (Object[] e : EDGES) {
+ edges.add(new Edge((Integer) e[0], (Integer) e[1]));
}
-
+
return env.fromCollection(edges);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/789ed8a8/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/graph/util/EnumTrianglesDataTypes.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/graph/util/EnumTrianglesDataTypes.java b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/graph/util/EnumTrianglesDataTypes.java
index 5c6e8b0..0aa0ab0 100644
--- a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/graph/util/EnumTrianglesDataTypes.java
+++ b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/graph/util/EnumTrianglesDataTypes.java
@@ -22,62 +22,88 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple4;
+/**
+ * The data classes for EnumTriangles.
+ */
public class EnumTrianglesDataTypes {
+ /**
+ * A POJO storing two vertex IDs.
+ */
public static class Edge extends Tuple2<Integer, Integer> {
private static final long serialVersionUID = 1L;
-
+
public static final int V1 = 0;
public static final int V2 = 1;
-
+
public Edge() {}
-
+
public Edge(final Integer v1, final Integer v2) {
this.setFirstVertex(v1);
this.setSecondVertex(v2);
}
-
- public Integer getFirstVertex() { return this.getField(V1); }
-
- public Integer getSecondVertex() { return this.getField(V2); }
-
- public void setFirstVertex(final Integer vertex1) { this.setField(vertex1, V1); }
-
- public void setSecondVertex(final Integer vertex2) { this.setField(vertex2, V2); }
-
+
+ public Integer getFirstVertex() {
+ return this.getField(V1);
+ }
+
+ public Integer getSecondVertex() {
+ return this.getField(V2);
+ }
+
+ public void setFirstVertex(final Integer vertex1) {
+ this.setField(vertex1, V1);
+ }
+
+ public void setSecondVertex(final Integer vertex2) {
+ this.setField(vertex2, V2);
+ }
+
public void copyVerticesFromTuple2(Tuple2<Integer, Integer> t) {
this.setFirstVertex(t.f0);
this.setSecondVertex(t.f1);
}
-
+
public void copyVerticesFromEdgeWithDegrees(EdgeWithDegrees ewd) {
this.setFirstVertex(ewd.getFirstVertex());
this.setSecondVertex(ewd.getSecondVertex());
}
-
+
public void flipVertices() {
Integer tmp = this.getFirstVertex();
this.setFirstVertex(this.getSecondVertex());
this.setSecondVertex(tmp);
}
}
-
+
+ /**
+ * A POJO storing three vertex IDs.
+ */
public static class Triad extends Tuple3<Integer, Integer, Integer> {
private static final long serialVersionUID = 1L;
-
+
public static final int V1 = 0;
public static final int V2 = 1;
public static final int V3 = 2;
-
+
public Triad() {}
-
- public void setFirstVertex(final Integer vertex1) { this.setField(vertex1, V1); }
-
- public void setSecondVertex(final Integer vertex2) { this.setField(vertex2, V2); }
-
- public void setThirdVertex(final Integer vertex3) { this.setField(vertex3, V3); }
+
+ public void setFirstVertex(final Integer vertex1) {
+ this.setField(vertex1, V1);
+ }
+
+ public void setSecondVertex(final Integer vertex2) {
+ this.setField(vertex2, V2);
+ }
+
+ public void setThirdVertex(final Integer vertex3) {
+ this.setField(vertex3, V3);
+ }
}
-
+
+ /**
+ * A POJO storing two vertex IDs with degree.
+ */
public static class EdgeWithDegrees extends Tuple4<Integer, Integer, Integer, Integer> {
private static final long serialVersionUID = 1L;
@@ -85,25 +111,41 @@ public class EnumTrianglesDataTypes {
public static final int V2 = 1;
public static final int D1 = 2;
public static final int D2 = 3;
-
+
public EdgeWithDegrees() { }
-
- public Integer getFirstVertex() { return this.getField(V1); }
-
- public Integer getSecondVertex() { return this.getField(V2); }
-
- public Integer getFirstDegree() { return this.getField(D1); }
-
- public Integer getSecondDegree() { return this.getField(D2); }
-
- public void setFirstVertex(final Integer vertex1) { this.setField(vertex1, V1); }
-
- public void setSecondVertex(final Integer vertex2) { this.setField(vertex2, V2); }
-
- public void setFirstDegree(final Integer degree1) { this.setField(degree1, D1); }
-
- public void setSecondDegree(final Integer degree2) { this.setField(degree2, D2); }
-
+
+ public Integer getFirstVertex() {
+ return this.getField(V1);
+ }
+
+ public Integer getSecondVertex() {
+ return this.getField(V2);
+ }
+
+ public Integer getFirstDegree() {
+ return this.getField(D1);
+ }
+
+ public Integer getSecondDegree() {
+ return this.getField(D2);
+ }
+
+ public void setFirstVertex(final Integer vertex1) {
+ this.setField(vertex1, V1);
+ }
+
+ public void setSecondVertex(final Integer vertex2) {
+ this.setField(vertex2, V2);
+ }
+
+ public void setFirstDegree(final Integer degree1) {
+ this.setField(degree1, D1);
+ }
+
+ public void setSecondDegree(final Integer degree2) {
+ this.setField(degree2, D2);
+ }
+
public void copyFrom(final EdgeWithDegrees edge) {
this.setFirstVertex(edge.getFirstVertex());
this.setSecondVertex(edge.getSecondVertex());
@@ -111,6 +153,4 @@ public class EnumTrianglesDataTypes {
this.setSecondDegree(edge.getSecondDegree());
}
}
-
-
}
http://git-wip-us.apache.org/repos/asf/flink/blob/789ed8a8/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/graph/util/PageRankData.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/graph/util/PageRankData.java b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/graph/util/PageRankData.java
index f2d9078..1c0bde7 100644
--- a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/graph/util/PageRankData.java
+++ b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/graph/util/PageRankData.java
@@ -18,13 +18,13 @@
package org.apache.flink.examples.java.graph.util;
-import java.util.ArrayList;
-import java.util.List;
-
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
+import java.util.ArrayList;
+import java.util.List;
+
/**
* Provides the default data sets used for the PageRank example program.
* The default data sets are used, if no parameters are given to the program.
@@ -63,24 +63,24 @@ public class PageRankData {
{14L, 12L},
{15L, 1L},
};
-
+
private static int numPages = 15;
-
+
public static DataSet<Tuple2<Long, Long>> getDefaultEdgeDataSet(ExecutionEnvironment env) {
-
+
List<Tuple2<Long, Long>> edges = new ArrayList<Tuple2<Long, Long>>();
- for(Object[] e : EDGES) {
- edges.add(new Tuple2<Long, Long>((Long)e[0], (Long)e[1]));
+ for (Object[] e : EDGES) {
+ edges.add(new Tuple2<Long, Long>((Long) e[0], (Long) e[1]));
}
return env.fromCollection(edges);
}
-
+
public static DataSet<Long> getDefaultPagesDataSet(ExecutionEnvironment env) {
return env.generateSequence(1, 15);
}
-
+
public static int getNumberOfPages() {
return numPages;
}
-
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/789ed8a8/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/misc/CollectionExecutionExample.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/misc/CollectionExecutionExample.java b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/misc/CollectionExecutionExample.java
index 44b566b..79ac9ec 100644
--- a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/misc/CollectionExecutionExample.java
+++ b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/misc/CollectionExecutionExample.java
@@ -18,39 +18,41 @@
package org.apache.flink.examples.java.misc;
-import java.util.List;
-
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
-/**
+import java.util.List;
+
+/**
* This example shows how to use the collection based execution of Flink.
- *
- * The collection based execution is a local mode that is not using the full Flink runtime.
+ *
+ * <p>The collection based execution is a local mode that is not using the full Flink runtime.
* DataSet transformations are executed on Java collections.
- *
- * See the "Local Execution" section in the documentation for more details:
+ *
+ * <p>See the "Local Execution" section in the documentation for more details:
* http://flink.apache.org/docs/latest/apis/local_execution.html
- *
*/
public class CollectionExecutionExample {
-
+
/**
- * POJO class representing a user
+ * POJO class representing a user.
*/
public static class User {
public int userIdentifier;
public String name;
+
public User() {}
+
public User(int userIdentifier, String name) {
this.userIdentifier = userIdentifier; this.name = name;
}
+
public String toString() {
- return "User{userIdentifier="+userIdentifier+" name="+name+"}";
+ return "User{userIdentifier=" + userIdentifier + " name=" + name + "}";
}
}
-
+
/**
* POJO for an EMail.
*/
@@ -58,36 +60,40 @@ public class CollectionExecutionExample {
public int userId;
public String subject;
public String body;
+
public EMail() {}
+
public EMail(int userId, String subject, String body) {
this.userId = userId; this.subject = subject; this.body = body;
}
+
public String toString() {
- return "eMail{userId="+userId+" subject="+subject+" body="+body+"}";
+ return "eMail{userId=" + userId + " subject=" + subject + " body=" + body + "}";
}
-
+
}
+
public static void main(String[] args) throws Exception {
// initialize a new Collection-based execution environment
final ExecutionEnvironment env = ExecutionEnvironment.createCollectionsEnvironment();
-
+
// create objects for users and emails
User[] usersArray = { new User(1, "Peter"), new User(2, "John"), new User(3, "Bill") };
-
+
EMail[] emailsArray = {new EMail(1, "Re: Meeting", "How about 1pm?"),
new EMail(1, "Re: Meeting", "Sorry, I'm not availble"),
new EMail(3, "Re: Re: Project proposal", "Give me a few more days to think about it.")};
-
+
// convert objects into a DataSet
DataSet<User> users = env.fromElements(usersArray);
DataSet<EMail> emails = env.fromElements(emailsArray);
-
+
// join the two DataSets
- DataSet<Tuple2<User,EMail>> joined = users.join(emails).where("userIdentifier").equalTo("userId");
-
+ DataSet<Tuple2<User, EMail>> joined = users.join(emails).where("userIdentifier").equalTo("userId");
+
// retrieve the resulting Tuple2 elements into a ArrayList.
- List<Tuple2<User,EMail>> result = joined.collect();
-
+ List<Tuple2<User, EMail>> result = joined.collect();
+
// Do some work with the resulting ArrayList (=Collection).
for (Tuple2<User, EMail> t : result) {
System.err.println("Result = " + t);
http://git-wip-us.apache.org/repos/asf/flink/blob/789ed8a8/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/misc/PiEstimation.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/misc/PiEstimation.java b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/misc/PiEstimation.java
index fc85110..f33d095 100644
--- a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/misc/PiEstimation.java
+++ b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/misc/PiEstimation.java
@@ -23,39 +23,38 @@ import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
-/**
+/**
* Estimates the value of Pi using the Monte Carlo method.
- * The area of a circle is Pi * R^2, R being the radius of the circle
+ * The area of a circle is Pi * R^2, R being the radius of the circle
* The area of a square is 4 * R^2, where the length of the square's edge is 2*R.
- *
- * Thus Pi = 4 * (area of circle / area of square).
- *
- * The idea is to find a way to estimate the circle to square area ratio.
+ *
+ * <p>Thus Pi = 4 * (area of circle / area of square).
+ *
+ * <p>The idea is to find a way to estimate the circle to square area ratio.
* The Monte Carlo method suggests collecting random points (within the square)
* and then counting the number of points that fall within the circle
- *
+ *
* <pre>
* {@code
* x = Math.random()
* y = Math.random()
- *
+ *
* x * x + y * y < 1
* }
* </pre>
*/
@SuppressWarnings("serial")
public class PiEstimation implements java.io.Serializable {
-
-
+
public static void main(String[] args) throws Exception {
final long numSamples = args.length > 0 ? Long.parseLong(args[0]) : 1000000;
-
+
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
+
// count how many of the samples would randomly fall into
// the unit circle
- DataSet<Long> count =
+ DataSet<Long> count =
env.generateSequence(1, numSamples)
.map(new Sampler())
.reduce(new SumReducer());
@@ -68,9 +67,9 @@ public class PiEstimation implements java.io.Serializable {
//*************************************************************************
// USER FUNCTIONS
//*************************************************************************
-
-
- /**
+
+
+ /**
* Sampler randomly emits points that fall within a square of edge x * y.
* It calculates the distance to the center of a virtually centered circle of radius x = y = 1
* If the distance is less than 1, then and only then does it returns a 1.
@@ -85,8 +84,8 @@ public class PiEstimation implements java.io.Serializable {
}
}
-
- /**
+
+ /**
* Simply sums up all long values.
*/
public static final class SumReducer implements ReduceFunction<Long>{
http://git-wip-us.apache.org/repos/asf/flink/blob/789ed8a8/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/ml/LinearRegression.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/ml/LinearRegression.java b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/ml/LinearRegression.java
index 90ad67a..7e9d41d 100644
--- a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/ml/LinearRegression.java
+++ b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/ml/LinearRegression.java
@@ -18,45 +18,41 @@
package org.apache.flink.examples.java.ml;
-import java.io.Serializable;
-import java.util.Collection;
-
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.operators.IterativeDataSet;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.examples.java.ml.util.LinearRegressionData;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.operators.IterativeDataSet;
+
+import java.io.Serializable;
+import java.util.Collection;
/**
* This example implements a basic Linear Regression to solve the y = theta0 + theta1*x problem using batch gradient descent algorithm.
*
- * <p>
- * Linear Regression with BGD(batch gradient descent) algorithm is an iterative clustering algorithm and works as follows:<br>
+ * <p>Linear Regression with BGD(batch gradient descent) algorithm is an iterative clustering algorithm and works as follows:<br>
* Giving a data set and target set, the BGD try to find out the best parameters for the data set to fit the target set.
* In each iteration, the algorithm computes the gradient of the cost function and use it to update all the parameters.
* The algorithm terminates after a fixed number of iterations (as in this implementation)
* With enough iteration, the algorithm can minimize the cost function and find the best parameters
* This is the Wikipedia entry for the <a href = "http://en.wikipedia.org/wiki/Linear_regression">Linear regression</a> and <a href = "http://en.wikipedia.org/wiki/Gradient_descent">Gradient descent algorithm</a>.
- *
- * <p>
- * This implementation works on one-dimensional data. And find the two-dimensional theta.<br>
+ *
+ * <p>This implementation works on one-dimensional data. And find the two-dimensional theta.<br>
* It find the best Theta parameter to fit the target.
- *
- * <p>
- * Input files are plain text files and must be formatted as follows:
+ *
+ * <p>Input files are plain text files and must be formatted as follows:
* <ul>
* <li>Data points are represented as two double values separated by a blank character. The first one represent the X(the training data) and the second represent the Y(target).
* Data points are separated by newline characters.<br>
* For example <code>"-0.02 -0.04\n5.3 10.6\n"</code> gives two data points (x=-0.02, y=-0.04) and (x=5.3, y=10.6).
* </ul>
- *
- * <p>
- * This example shows how to use:
+ *
+ * <p>This example shows how to use:
* <ul>
* <li> Bulk iterations
* <li> Broadcast variables in bulk iterations
@@ -102,7 +98,7 @@ public class LinearRegression {
// set number of bulk iterations for SGD linear Regression
IterativeDataSet<Params> loop = parameters.iterate(iterations);
- DataSet<Params> new_parameters = data
+ DataSet<Params> newParameters = data
// compute a single step using every sample
.map(new SubUpdate()).withBroadcastSet(loop, "parameters")
// sum up all the steps
@@ -111,10 +107,10 @@ public class LinearRegression {
.map(new Update());
// feed new parameters back into next iteration
- DataSet<Params> result = loop.closeWith(new_parameters);
+ DataSet<Params> result = loop.closeWith(newParameters);
// emit result
- if(params.has("output")) {
+ if (params.has("output")) {
result.writeAsText(params.get("output"));
// execute program
env.execute("Linear Regression example");
@@ -132,11 +128,11 @@ public class LinearRegression {
* A simple data sample, x means the input, and y means the target.
*/
public static class Data implements Serializable{
- public double x,y;
+ public double x, y;
- public Data() {};
+ public Data() {}
- public Data(double x ,double y){
+ public Data(double x, double y) {
this.x = x;
this.y = y;
}
@@ -153,11 +149,11 @@ public class LinearRegression {
*/
public static class Params implements Serializable{
- private double theta0,theta1;
+ private double theta0, theta1;
- public Params(){};
+ public Params() {}
- public Params(double x0, double x1){
+ public Params(double x0, double x1) {
this.theta0 = x0;
this.theta1 = x1;
}
@@ -183,9 +179,9 @@ public class LinearRegression {
this.theta1 = theta1;
}
- public Params div(Integer a){
- this.theta0 = theta0 / a ;
- this.theta1 = theta1 / a ;
+ public Params div(Integer a) {
+ this.theta0 = theta0 / a;
+ this.theta1 = theta1 / a;
return this;
}
@@ -198,9 +194,9 @@ public class LinearRegression {
/**
* Compute a single BGD type update for every parameters.
*/
- public static class SubUpdate extends RichMapFunction<Data,Tuple2<Params,Integer>> {
+ public static class SubUpdate extends RichMapFunction<Data, Tuple2<Params, Integer>> {
- private Collection<Params> parameters;
+ private Collection<Params> parameters;
private Params parameter;
@@ -215,18 +211,18 @@ public class LinearRegression {
@Override
public Tuple2<Params, Integer> map(Data in) throws Exception {
- for(Params p : parameters){
- this.parameter = p;
+ for (Params p : parameters){
+ this.parameter = p;
}
- double theta_0 = parameter.theta0 - 0.01*((parameter.theta0 + (parameter.theta1*in.x)) - in.y);
- double theta_1 = parameter.theta1 - 0.01*(((parameter.theta0 + (parameter.theta1*in.x)) - in.y) * in.x);
+ double theta0 = parameter.theta0 - 0.01 * ((parameter.theta0 + (parameter.theta1 * in.x)) - in.y);
+ double theta1 = parameter.theta1 - 0.01 * (((parameter.theta0 + (parameter.theta1 * in.x)) - in.y) * in.x);
- return new Tuple2<Params,Integer>(new Params(theta_0,theta_1),count);
+ return new Tuple2<Params, Integer>(new Params(theta0, theta1), count);
}
}
- /**
+ /**
* Accumulator all the update.
* */
public static class UpdateAccumulator implements ReduceFunction<Tuple2<Params, Integer>> {
@@ -234,10 +230,10 @@ public class LinearRegression {
@Override
public Tuple2<Params, Integer> reduce(Tuple2<Params, Integer> val1, Tuple2<Params, Integer> val2) {
- double new_theta0 = val1.f0.theta0 + val2.f0.theta0;
- double new_theta1 = val1.f0.theta1 + val2.f0.theta1;
- Params result = new Params(new_theta0,new_theta1);
- return new Tuple2<Params, Integer>( result, val1.f1 + val2.f1);
+ double newTheta0 = val1.f0.theta0 + val2.f0.theta0;
+ double newTheta1 = val1.f0.theta1 + val2.f0.theta1;
+ Params result = new Params(newTheta0, newTheta1);
+ return new Tuple2<Params, Integer>(result, val1.f1 + val2.f1);
}
}
@@ -245,7 +241,7 @@ public class LinearRegression {
/**
* Compute the final update by average them.
*/
- public static class Update implements MapFunction<Tuple2<Params, Integer>,Params> {
+ public static class Update implements MapFunction<Tuple2<Params, Integer>, Params> {
@Override
public Params map(Tuple2<Params, Integer> arg0) throws Exception {
http://git-wip-us.apache.org/repos/asf/flink/blob/789ed8a8/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/ml/util/LinearRegressionData.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/ml/util/LinearRegressionData.java b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/ml/util/LinearRegressionData.java
index 838e320..3d0ad03 100644
--- a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/ml/util/LinearRegressionData.java
+++ b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/ml/util/LinearRegressionData.java
@@ -20,8 +20,8 @@ package org.apache.flink.examples.java.ml.util;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.examples.java.ml.LinearRegression.Params;
import org.apache.flink.examples.java.ml.LinearRegression.Data;
+import org.apache.flink.examples.java.ml.LinearRegression.Params;
import java.util.LinkedList;
import java.util.List;
http://git-wip-us.apache.org/repos/asf/flink/blob/789ed8a8/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/ml/util/LinearRegressionDataGenerator.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/ml/util/LinearRegressionDataGenerator.java b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/ml/util/LinearRegressionDataGenerator.java
index a9f9e08..96ee56c 100644
--- a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/ml/util/LinearRegressionDataGenerator.java
+++ b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/ml/util/LinearRegressionDataGenerator.java
@@ -43,13 +43,13 @@ public class LinearRegressionDataGenerator {
/**
* Main method to generate data for the {@link org.apache.flink.examples.java.ml.LinearRegression} example program.
- * <p>
- * The generator creates to files:
+ *
+ * <p>The generator creates to files:
* <ul>
* <li><code>{tmp.dir}/data</code> for the data points
- * </ul>
- *
- * @param args
+ * </ul>
+ *
+ * @param args
* <ol>
* <li>Int: Number of data points
* <li><b>Optional</b> Long: Random seed
@@ -72,15 +72,15 @@ public class LinearRegressionDataGenerator {
// write the points out
BufferedWriter pointsOut = null;
try {
- pointsOut = new BufferedWriter(new FileWriter(new File(tmpDir+"/"+POINTS_FILE)));
+ pointsOut = new BufferedWriter(new FileWriter(new File(tmpDir + "/" + POINTS_FILE)));
StringBuilder buffer = new StringBuilder();
// DIMENSIONALITY + 1 means that the number of x(dimensionality) and target y
- double[] point = new double[DIMENSIONALITY+1];
+ double[] point = new double[DIMENSIONALITY + 1];
for (int i = 1; i <= numDataPoints; i++) {
point[0] = random.nextGaussian();
- point[1] = 2 * point[0] + 0.01*random.nextGaussian();
+ point[1] = 2 * point[0] + 0.01 * random.nextGaussian();
writePoint(point, buffer, pointsOut);
}
@@ -91,17 +91,16 @@ public class LinearRegressionDataGenerator {
}
}
- System.out.println("Wrote "+numDataPoints+" data points to "+tmpDir+"/"+POINTS_FILE);
+ System.out.println("Wrote " + numDataPoints + " data points to " + tmpDir + "/" + POINTS_FILE);
}
-
private static void writePoint(double[] data, StringBuilder buffer, BufferedWriter out) throws IOException {
buffer.setLength(0);
// write coordinates
for (int j = 0; j < data.length; j++) {
buffer.append(FORMAT.format(data[j]));
- if(j < data.length - 1) {
+ if (j < data.length - 1) {
buffer.append(DELIMITER);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/789ed8a8/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/EmptyFieldsCountAccumulator.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/EmptyFieldsCountAccumulator.java b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/EmptyFieldsCountAccumulator.java
index 87b5bff..feec3ef 100644
--- a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/EmptyFieldsCountAccumulator.java
+++ b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/EmptyFieldsCountAccumulator.java
@@ -18,32 +18,32 @@
package org.apache.flink.examples.java.relational;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-
-import org.apache.commons.lang3.StringUtils;
-
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.api.common.functions.RichFilterFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.common.functions.RichFilterFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
/**
* This program filters lines from a CSV file with empty fields. In doing so, it counts the number of empty fields per
* column within a CSV file using a custom accumulator for vectors. In this context, empty fields are those, that at
* most contain whitespace characters like space and tab.
- * <p>
- * The input file is a plain text CSV file with the semicolon as field separator and double quotes as field delimiters
+ *
+ * <p>The input file is a plain text CSV file with the semicolon as field separator and double quotes as field delimiters
* and three columns. See {@link #getDataSet(ExecutionEnvironment, ParameterTool)} for configuration.
- * <p>
- * Usage: <code>EmptyFieldsCountAccumulator --input <path> --output <path></code> <br>
- * <p>
- * This example shows how to use:
+ *
+ * <p>Usage: <code>EmptyFieldsCountAccumulator --input <path> --output <path></code> <br>
+ *
+ * <p>This example shows how to use:
* <ul>
* <li>custom accumulators
* <li>tuple data types
@@ -122,7 +122,7 @@ public class EmptyFieldsCountAccumulator {
/**
* This function filters all incoming tuples that have one or more empty fields.
- * In doing so, it also counts the number of empty fields per attribute with an accumulator (registered under
+ * In doing so, it also counts the number of empty fields per attribute with an accumulator (registered under
* {@link EmptyFieldsCountAccumulator#EMPTY_FIELD_ACCUMULATOR}).
*/
public static final class EmptyFieldFilter extends RichFilterFunction<StringTriple> {