You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/09/01 21:04:35 UTC
[1/2] git commit: Add Pi approximation Java example
Repository: incubator-flink
Updated Branches:
refs/heads/master c9dd60385 -> 4d4151d53
Add Pi approximation Java example
This closes #78
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/01ff302e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/01ff302e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/01ff302e
Branch: refs/heads/master
Commit: 01ff302ef6cac731de26c231ba28e88f6b8a0ead
Parents: c9dd603
Author: Mariem Ayadi <ma...@smith.edu>
Authored: Thu Jul 24 14:32:01 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Sep 1 20:37:32 2014 +0200
----------------------------------------------------------------------
.../flink/example/java/pi/PiEstimation.java | 137 +++++++++++++++++++
1 file changed, 137 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/01ff302e/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/pi/PiEstimation.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/pi/PiEstimation.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/pi/PiEstimation.java
new file mode 100644
index 0000000..a09a2d6
--- /dev/null
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/pi/PiEstimation.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.example.java.pi;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.FilterFunction;
+import org.apache.flink.api.java.functions.MapFunction;
+import org.apache.flink.api.java.functions.ReduceFunction;
+
+/**
+ * Estimates the value of Pi using the Monte Carlo method.
+ * The area of a circle is Pi * R^2, R being the radius of the circle
+ * The area of a square is 4 * R^2, where the length of the square's edge is 2*R.
+ *
+ * Thus Pi = 4 * (area of circle / area of square).
+ *
+ * The idea is to find a way to estimate the circle to square area ratio.
+ * The Monte Carlo method suggests collecting random points (within the square)
+ * ```
+ * x = Math.random() * 2 - 1
+ * y = Math.random() * 2 - 1
+ * ```
+ * then counting the number of points that fall within the circle
+ * ```
+ * x * x + y * y < 1
+ * ```
+ */
+public class PiEstimation {
+
+ public static void main(String[] args) throws Exception {
+
+ //Sets up the execution environment
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ //Sets the degree of parallelism
+ int degOfParal = (env.getDegreeOfParallelism() > 0) ? env.getDegreeOfParallelism() : 2;
+
+ int n = 100000 * degOfParal;
+
+ DataSet<Integer> dataSet = env.generateSequence(0l, n)
+ .map(new MapFunction<Long, Integer>() {
+ private static final long serialVersionUID = 1L;
+
+ //Converts from Long to Integer, explicitly choosing "1" as the returned value.
+ //(Will later be used by the mapper for summation purposes.)
+ @Override
+ public Integer map(Long value) throws Exception {
+ return 1;
+ }
+ });
+
+ DataSet<Double> count = dataSet
+ .filter(new PiFilter())
+ .setParallelism(degOfParal)
+ .reduce(new PiReducer())
+ .map(new PiMapper(n));
+
+ System.out.println("We estimate Pi to be:");
+ count.print();
+
+ env.execute();
+ }
+
+ //*************************************************************************
+ // USER FUNCTIONS
+ //*************************************************************************
+
+ // FilterFunction that filters out all Integers smaller than zero.
+
+ /**
+ * PiFilter randomly emits points that fall within a square of edge 2*x = 2*y = 2.
+ * It calculates the distance to the center of a virtually centered circle of radius x = y = 1
+ * If the distance is less than 1, then and only then does it return a value (in this case 1) - later used by PiMapper.
+ */
+ public static class PiFilter extends FilterFunction<Integer> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public boolean filter(Integer value) throws Exception{
+ double x = Math.random() * 2 - 1;
+ double y = Math.random() * 2 - 1;
+ return (x * x + y * y) < 1;
+ }
+ }
+
+
+ /**
+ * PiReducer takes over the filter. It goes through the selected 1s and returns the sum.
+ */
+ public static final class PiReducer extends ReduceFunction<Integer>{
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Integer reduce(Integer value1, Integer value2) throws Exception {
+ return value1 + value2;
+ }
+ }
+
+
+ /**
+ * The PiMapper's role is to apply one final operation on the count thus returning the estimated Pi value.
+ */
+ public static final class PiMapper extends MapFunction<Integer,Double> {
+ private static final long serialVersionUID = 1L;
+ private int n;
+
+ public PiMapper(int n) {
+ this.n = n;
+ }
+
+ @Override
+ public Double map(Integer intSum) throws Exception {
+ return intSum*4.0 / this.n;
+ }
+ }
+
+}
\ No newline at end of file
[2/2] git commit: Some simplifications to the PiEstimation example
Posted by se...@apache.org.
Some simplifications to the PiEstimation example
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/4d4151d5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/4d4151d5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/4d4151d5
Branch: refs/heads/master
Commit: 4d4151d537423caa428df0c89e60ccafd74fa8fe
Parents: 01ff302
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Sep 1 20:56:26 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Sep 1 20:56:26 2014 +0200
----------------------------------------------------------------------
.../flink/example/java/misc/PiEstimation.java | 108 +++++++++++++++
.../flink/example/java/pi/PiEstimation.java | 137 -------------------
2 files changed, 108 insertions(+), 137 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4d4151d5/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/misc/PiEstimation.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/misc/PiEstimation.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/misc/PiEstimation.java
new file mode 100644
index 0000000..ef336da
--- /dev/null
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/misc/PiEstimation.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.example.java.misc;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+
+/**
+ * Estimates the value of Pi using the Monte Carlo method.
+ * The area of a circle is Pi * R^2, R being the radius of the circle
+ * The area of a square is 4 * R^2, where the length of the square's edge is 2*R.
+ *
+ * Thus Pi = 4 * (area of circle / area of square).
+ *
+ * The idea is to find a way to estimate the circle to square area ratio.
+ * The Monte Carlo method suggests collecting random points (within the square)
+ * and then counting the number of points that fall within the circle
+ *
+ * <pre>
+ * {@code
+ * x = Math.random()
+ * y = Math.random()
+ *
+ * x * x + y * y < 1
+ * }
+ * </pre>
+ */
+@SuppressWarnings("serial")
+public class PiEstimation implements java.io.Serializable {
+
+
+ public static void main(String[] args) throws Exception {
+
+ final long numSamples = args.length > 0 ? Long.parseLong(args[0]) : 1000000;
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ // count how many of the samples would randomly fall into
+ // the unit circle
+ DataSet<Long> count =
+ env.generateSequence(1, numSamples)
+ .map(new Sampler())
+ .reduce(new SumReducer());
+
+ // the ratio of the unit circle surface to 4 times the unit square is pi
+ DataSet<Double> pi = count
+ .map(new MapFunction<Long, Double>() {
+ public Double map(Long value) {
+ return value * 4.0 / numSamples;
+ }
+ });
+
+ System.out.println("We estimate Pi to be:");
+ pi.print();
+
+ env.execute();
+ }
+
+ //*************************************************************************
+ // USER FUNCTIONS
+ //*************************************************************************
+
+
+ /**
+ * Sampler randomly emits points that fall within a square of edge x * y.
+ * It calculates the distance to the center of a virtually centered circle of radius x = y = 1
+ * If the distance is less than 1, then and only then does it returns a 1.
+ */
+ public static class Sampler implements MapFunction<Long, Long> {
+
+ @Override
+ public Long map(Long value) throws Exception{
+ double x = Math.random();
+ double y = Math.random();
+ return (x * x + y * y) < 1 ? 1L : 0L;
+ }
+ }
+
+
+ /**
+ * Simply sums up all long values.
+ */
+ public static final class SumReducer implements ReduceFunction<Long>{
+
+ @Override
+ public Long reduce(Long value1, Long value2) throws Exception {
+ return value1 + value2;
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4d4151d5/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/pi/PiEstimation.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/pi/PiEstimation.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/pi/PiEstimation.java
deleted file mode 100644
index a09a2d6..0000000
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/pi/PiEstimation.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.example.java.pi;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.functions.FilterFunction;
-import org.apache.flink.api.java.functions.MapFunction;
-import org.apache.flink.api.java.functions.ReduceFunction;
-
-/**
- * Estimates the value of Pi using the Monte Carlo method.
- * The area of a circle is Pi * R^2, R being the radius of the circle
- * The area of a square is 4 * R^2, where the length of the square's edge is 2*R.
- *
- * Thus Pi = 4 * (area of circle / area of square).
- *
- * The idea is to find a way to estimate the circle to square area ratio.
- * The Monte Carlo method suggests collecting random points (within the square)
- * ```
- * x = Math.random() * 2 - 1
- * y = Math.random() * 2 - 1
- * ```
- * then counting the number of points that fall within the circle
- * ```
- * x * x + y * y < 1
- * ```
- */
-public class PiEstimation {
-
- public static void main(String[] args) throws Exception {
-
- //Sets up the execution environment
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- //Sets the degree of parallelism
- int degOfParal = (env.getDegreeOfParallelism() > 0) ? env.getDegreeOfParallelism() : 2;
-
- int n = 100000 * degOfParal;
-
- DataSet<Integer> dataSet = env.generateSequence(0l, n)
- .map(new MapFunction<Long, Integer>() {
- private static final long serialVersionUID = 1L;
-
- //Converts from Long to Integer, explicitly choosing "1" as the returned value.
- //(Will later be used by the mapper for summation purposes.)
- @Override
- public Integer map(Long value) throws Exception {
- return 1;
- }
- });
-
- DataSet<Double> count = dataSet
- .filter(new PiFilter())
- .setParallelism(degOfParal)
- .reduce(new PiReducer())
- .map(new PiMapper(n));
-
- System.out.println("We estimate Pi to be:");
- count.print();
-
- env.execute();
- }
-
- //*************************************************************************
- // USER FUNCTIONS
- //*************************************************************************
-
- // FilterFunction that filters out all Integers smaller than zero.
-
- /**
- * PiFilter randomly emits points that fall within a square of edge 2*x = 2*y = 2.
- * It calculates the distance to the center of a virtually centered circle of radius x = y = 1
- * If the distance is less than 1, then and only then does it return a value (in this case 1) - later used by PiMapper.
- */
- public static class PiFilter extends FilterFunction<Integer> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public boolean filter(Integer value) throws Exception{
- double x = Math.random() * 2 - 1;
- double y = Math.random() * 2 - 1;
- return (x * x + y * y) < 1;
- }
- }
-
-
- /**
- * PiReducer takes over the filter. It goes through the selected 1s and returns the sum.
- */
- public static final class PiReducer extends ReduceFunction<Integer>{
- private static final long serialVersionUID = 1L;
-
- @Override
- public Integer reduce(Integer value1, Integer value2) throws Exception {
- return value1 + value2;
- }
- }
-
-
- /**
- * The PiMapper's role is to apply one final operation on the count thus returning the estimated Pi value.
- */
- public static final class PiMapper extends MapFunction<Integer,Double> {
- private static final long serialVersionUID = 1L;
- private int n;
-
- public PiMapper(int n) {
- this.n = n;
- }
-
- @Override
- public Double map(Integer intSum) throws Exception {
- return intSum*4.0 / this.n;
- }
- }
-
-}
\ No newline at end of file