You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/09/01 21:04:35 UTC

[1/2] git commit: Add Pi approximation Java example

Repository: incubator-flink
Updated Branches:
  refs/heads/master c9dd60385 -> 4d4151d53


Add Pi approximation Java example

This closes #78


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/01ff302e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/01ff302e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/01ff302e

Branch: refs/heads/master
Commit: 01ff302ef6cac731de26c231ba28e88f6b8a0ead
Parents: c9dd603
Author: Mariem Ayadi <ma...@smith.edu>
Authored: Thu Jul 24 14:32:01 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Sep 1 20:37:32 2014 +0200

----------------------------------------------------------------------
 .../flink/example/java/pi/PiEstimation.java     | 137 +++++++++++++++++++
 1 file changed, 137 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/01ff302e/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/pi/PiEstimation.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/pi/PiEstimation.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/pi/PiEstimation.java
new file mode 100644
index 0000000..a09a2d6
--- /dev/null
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/pi/PiEstimation.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.example.java.pi;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.FilterFunction;
+import org.apache.flink.api.java.functions.MapFunction;
+import org.apache.flink.api.java.functions.ReduceFunction;
+
+/** 
+ * Estimates the value of Pi using the Monte Carlo method.
+ * The area of a circle is Pi * R^2, R being the radius of the circle 
+ * The area of a square is 4 * R^2, where the length of the square's edge is 2*R.
+ * 
+ * Thus Pi = 4 * (area of circle / area of square).
+ * 
+ * The idea is to find a way to estimate the circle to square area ratio.
+ * The Monte Carlo method suggests collecting random points (within the square)
+ * ```
+ * x = Math.random() * 2 - 1
+ * y = Math.random() * 2 - 1
+ * ```
+ * then counting the number of points that fall within the circle 
+ * ```
+ * x * x + y * y < 1
+ * ```
+ */
+public class PiEstimation {
+	
+	public static void main(String[] args) throws Exception {
+
+		//Sets up the execution environment
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		
+		//Sets the degree of parallelism
+		int degOfParal = (env.getDegreeOfParallelism() > 0) ? env.getDegreeOfParallelism() : 2;
+		
+		int n = 100000 * degOfParal;
+		
+		DataSet<Integer> dataSet = env.generateSequence(0l, n)
+				.map(new MapFunction<Long, Integer>() {
+					private static final long serialVersionUID = 1L;
+					
+					//Converts from Long to Integer, explicitly choosing "1" as the returned value. 
+					//(Will later be used by the mapper for summation purposes.)
+					@Override
+					public Integer map(Long value) throws Exception {
+						return 1;
+					}
+				});
+
+		DataSet<Double> count = dataSet
+				.filter(new PiFilter())
+				.setParallelism(degOfParal)
+				.reduce(new PiReducer())
+				.map(new PiMapper(n));
+
+		System.out.println("We estimate Pi to be:");
+		count.print();
+
+		env.execute();
+	}
+
+	//*************************************************************************
+	//     USER FUNCTIONS
+	//*************************************************************************
+	
+	// FilterFunction that filters out all Integers smaller than zero.
+	
+	/** 
+	 * PiFilter randomly emits points that fall within a square of edge 2*x = 2*y = 2.
+	 * It calculates the distance to the center of a virtually centered circle of radius x = y = 1
+	 * If the distance is less than 1, then and only then does it return a value (in this case 1) - later used by PiMapper.
+	 */
+	public static class PiFilter extends FilterFunction<Integer> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public boolean filter(Integer value) throws Exception{
+			double x = Math.random() * 2 - 1;
+			double y = Math.random() * 2 - 1;
+			return (x * x + y * y) < 1;
+		}
+	}
+
+	
+	/** 
+	 * PiReducer takes over the filter. It goes through the selected 1s and returns the sum.
+	 */
+	public static final class PiReducer extends ReduceFunction<Integer>{
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public Integer reduce(Integer value1, Integer value2) throws Exception {
+			return value1 + value2;
+		}
+	}
+	
+	
+	/** 
+	 * The PiMapper's role is to apply one final operation on the count thus returning the estimated Pi value.
+	 */
+	public static final class PiMapper extends MapFunction<Integer,Double> {
+		private static final long serialVersionUID = 1L;
+		private int n;
+		
+		public PiMapper(int n) {
+			this.n = n;
+		}
+		
+		@Override
+		public Double map(Integer intSum) throws Exception {
+			return intSum*4.0 / this.n;
+		}
+	}
+	
+}
\ No newline at end of file


[2/2] git commit: Some simplifications to the PiEstimation example

Posted by se...@apache.org.
Some simplifications to the PiEstimation example


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/4d4151d5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/4d4151d5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/4d4151d5

Branch: refs/heads/master
Commit: 4d4151d537423caa428df0c89e60ccafd74fa8fe
Parents: 01ff302
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Sep 1 20:56:26 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Sep 1 20:56:26 2014 +0200

----------------------------------------------------------------------
 .../flink/example/java/misc/PiEstimation.java   | 108 +++++++++++++++
 .../flink/example/java/pi/PiEstimation.java     | 137 -------------------
 2 files changed, 108 insertions(+), 137 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4d4151d5/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/misc/PiEstimation.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/misc/PiEstimation.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/misc/PiEstimation.java
new file mode 100644
index 0000000..ef336da
--- /dev/null
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/misc/PiEstimation.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.example.java.misc;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+
+/** 
+ * Estimates the value of Pi using the Monte Carlo method.
+ * The area of a circle is Pi * R^2, R being the radius of the circle 
+ * The area of a square is 4 * R^2, where the length of the square's edge is 2*R.
+ * 
+ * Thus Pi = 4 * (area of circle / area of square).
+ * 
+ * The idea is to find a way to estimate the circle to square area ratio.
+ * The Monte Carlo method suggests collecting random points (within the square)
+ * and then counting the number of points that fall within the circle
+ * 
+ * <pre>
+ * {@code
+ * x = Math.random()
+ * y = Math.random()
+ * 
+ * x * x + y * y < 1
+ * }
+ * </pre>
+ */
+@SuppressWarnings("serial")
+public class PiEstimation implements java.io.Serializable {
+	
+	
+	public static void main(String[] args) throws Exception {
+
+		final long numSamples = args.length > 0 ? Long.parseLong(args[0]) : 1000000;
+		
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		
+		// count how many of the samples would randomly fall into
+		// the unit circle
+		DataSet<Long> count = 
+				env.generateSequence(1, numSamples)
+				.map(new Sampler())
+				.reduce(new SumReducer());
+
+		// the ratio of the unit circle surface to 4 times the unit square is pi
+		DataSet<Double> pi = count
+				.map(new MapFunction<Long, Double>() {
+					public Double map(Long value) {
+						return value * 4.0 / numSamples;
+					}
+				});
+
+		System.out.println("We estimate Pi to be:");
+		pi.print();
+
+		env.execute();
+	}
+
+	//*************************************************************************
+	//     USER FUNCTIONS
+	//*************************************************************************
+	
+	
+	/** 
+	 * Sampler randomly emits points that fall within a square of edge x * y.
+	 * It calculates the distance to the center of a virtually centered circle of radius x = y = 1
+	 * If the distance is less than 1, then and only then does it returns a 1.
+	 */
+	public static class Sampler implements MapFunction<Long, Long> {
+
+		@Override
+		public Long map(Long value) throws Exception{
+			double x = Math.random();
+			double y = Math.random();
+			return (x * x + y * y) < 1 ? 1L : 0L;
+		}
+	}
+
+	
+	/** 
+	 * Simply sums up all long values.
+	 */
+	public static final class SumReducer implements ReduceFunction<Long>{
+
+		@Override
+		public Long reduce(Long value1, Long value2) throws Exception {
+			return value1 + value2;
+		}
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4d4151d5/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/pi/PiEstimation.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/pi/PiEstimation.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/pi/PiEstimation.java
deleted file mode 100644
index a09a2d6..0000000
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/pi/PiEstimation.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.example.java.pi;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.functions.FilterFunction;
-import org.apache.flink.api.java.functions.MapFunction;
-import org.apache.flink.api.java.functions.ReduceFunction;
-
-/** 
- * Estimates the value of Pi using the Monte Carlo method.
- * The area of a circle is Pi * R^2, R being the radius of the circle 
- * The area of a square is 4 * R^2, where the length of the square's edge is 2*R.
- * 
- * Thus Pi = 4 * (area of circle / area of square).
- * 
- * The idea is to find a way to estimate the circle to square area ratio.
- * The Monte Carlo method suggests collecting random points (within the square)
- * ```
- * x = Math.random() * 2 - 1
- * y = Math.random() * 2 - 1
- * ```
- * then counting the number of points that fall within the circle 
- * ```
- * x * x + y * y < 1
- * ```
- */
-public class PiEstimation {
-	
-	public static void main(String[] args) throws Exception {
-
-		//Sets up the execution environment
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		
-		//Sets the degree of parallelism
-		int degOfParal = (env.getDegreeOfParallelism() > 0) ? env.getDegreeOfParallelism() : 2;
-		
-		int n = 100000 * degOfParal;
-		
-		DataSet<Integer> dataSet = env.generateSequence(0l, n)
-				.map(new MapFunction<Long, Integer>() {
-					private static final long serialVersionUID = 1L;
-					
-					//Converts from Long to Integer, explicitly choosing "1" as the returned value. 
-					//(Will later be used by the mapper for summation purposes.)
-					@Override
-					public Integer map(Long value) throws Exception {
-						return 1;
-					}
-				});
-
-		DataSet<Double> count = dataSet
-				.filter(new PiFilter())
-				.setParallelism(degOfParal)
-				.reduce(new PiReducer())
-				.map(new PiMapper(n));
-
-		System.out.println("We estimate Pi to be:");
-		count.print();
-
-		env.execute();
-	}
-
-	//*************************************************************************
-	//     USER FUNCTIONS
-	//*************************************************************************
-	
-	// FilterFunction that filters out all Integers smaller than zero.
-	
-	/** 
-	 * PiFilter randomly emits points that fall within a square of edge 2*x = 2*y = 2.
-	 * It calculates the distance to the center of a virtually centered circle of radius x = y = 1
-	 * If the distance is less than 1, then and only then does it return a value (in this case 1) - later used by PiMapper.
-	 */
-	public static class PiFilter extends FilterFunction<Integer> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public boolean filter(Integer value) throws Exception{
-			double x = Math.random() * 2 - 1;
-			double y = Math.random() * 2 - 1;
-			return (x * x + y * y) < 1;
-		}
-	}
-
-	
-	/** 
-	 * PiReducer takes over the filter. It goes through the selected 1s and returns the sum.
-	 */
-	public static final class PiReducer extends ReduceFunction<Integer>{
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public Integer reduce(Integer value1, Integer value2) throws Exception {
-			return value1 + value2;
-		}
-	}
-	
-	
-	/** 
-	 * The PiMapper's role is to apply one final operation on the count thus returning the estimated Pi value.
-	 */
-	public static final class PiMapper extends MapFunction<Integer,Double> {
-		private static final long serialVersionUID = 1L;
-		private int n;
-		
-		public PiMapper(int n) {
-			this.n = n;
-		}
-		
-		@Override
-		public Double map(Integer intSum) throws Exception {
-			return intSum*4.0 / this.n;
-		}
-	}
-	
-}
\ No newline at end of file