You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by gallenvara <gi...@git.apache.org> on 2015/09/09 04:26:39 UTC

[GitHub] flink pull request: [FLINK-2533] [core] Gap based random sample op...

GitHub user gallenvara opened a pull request:

    https://github.com/apache/flink/pull/1110

    [FLINK-2533] [core] Gap based random sample optimization.

    For random sampler with fraction, like BernoulliSampler and PoissonSampler, Gap based random sampler could exploit O(np) sample implementation instead of previous O(n) sample implementation, it should perform better while sample fraction is very small.When deal with large fraction, it's better to use previous sample implementation. So we add a threshold to control the sampling method according to the fraction.(threshold_Bernoulli = 0.33, threshold_Poisson = 0.4)
    ![bernoullisampler](https://cloud.githubusercontent.com/assets/12931563/9751893/fd4195a2-56dc-11e5-8937-30ebfa927960.PNG)
    ![poissonsampler](https://cloud.githubusercontent.com/assets/12931563/9751894/fd4c35a2-56dc-11e5-9d71-e7b62e5dcc05.PNG)

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/gallenvara/flink gap_sampling

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/1110.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1110
    
----
commit b4b431471736cd32a50b49cc9e91e038a4387808
Author: gallenvara <ga...@126.com>
Date:   2015-09-07T06:55:11Z

    [FLINK-2533] [core] Gap based random sample optimization.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2533] [core] Gap based random sample op...

Posted by ChengXiangLi <gi...@git.apache.org>.
Github user ChengXiangLi commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1110#discussion_r39003481
  
    --- Diff: flink-java/src/main/java/org/apache/flink/api/java/sampling/PoissonSampler.java ---
    @@ -93,29 +98,59 @@ public boolean hasNext() {
     					}
     				}
     			}
    -
    -			private void moveToNextElement() {
    -				while (input.hasNext()) {
    +			
    +			public int poisson_ge1(double p){
    +				// sample 'k' from Poisson(p), conditioned to k >= 1
    +				double q = Math.pow(Math.E, -p);
    +				// simulate a poisson trial such that k >= 1
    +				double t = q + (1 - q)*random.nextDouble();
    --- End diff --
    
    Format: require blanks before and after *.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2533] [core] Gap based random sample op...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the pull request:

    https://github.com/apache/flink/pull/1110#issuecomment-140058876
  
    @gallenvara, thanks for the PR.
    Looks good to me.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2533] [core] Gap based random sample op...

Posted by ChengXiangLi <gi...@git.apache.org>.
Github user ChengXiangLi commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1110#discussion_r39003759
  
    --- Diff: flink-java/src/main/java/org/apache/flink/api/java/sampling/PoissonSampler.java ---
    @@ -93,29 +98,59 @@ public boolean hasNext() {
     					}
     				}
     			}
    -
    -			private void moveToNextElement() {
    -				while (input.hasNext()) {
    +			
    +			public int poisson_ge1(double p){
    --- End diff --
    
    Why implement a new method instead of reuse poissonDistribution.sample() as before?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2533] [core] Gap based random sample op...

Posted by ChengXiangLi <gi...@git.apache.org>.
Github user ChengXiangLi commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1110#discussion_r39003293
  
    --- Diff: flink-java/src/main/java/org/apache/flink/api/java/sampling/BernoulliSampler.java ---
    @@ -102,15 +106,31 @@ public T next() {
     			}
     
     			private T getNextSampledElement() {
    -				while (input.hasNext()) {
    -					T element = input.next();
    -
    -					if (random.nextDouble() <= fraction) {
    +				if (fraction <= THRESHOLD) {
    +					double rand = random.nextDouble();
    +					double u = Math.max(rand, EPSILON);
    +					int gap = (int) (Math.log(u) / Math.log(1 - fraction));
    +					int elementCount = 0;
    +					if (input.hasNext()) {
    +						T element = input.next();
    +						while (input.hasNext() && elementCount < gap) {
    --- End diff --
    
    While gap is larger than the input remain elements number, this should return null, right? but the implementation here return the last input element.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2533] [core] Gap based random sample op...

Posted by ChengXiangLi <gi...@git.apache.org>.
Github user ChengXiangLi commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1110#discussion_r39002678
  
    --- Diff: flink-java/src/main/java/org/apache/flink/api/java/sampling/BernoulliSampler.java ---
    @@ -28,12 +28,16 @@
      * Bernoulli experiment.
      *
      * @param <T> The type of sample.
    + * @see <a href="http://erikerlandson.github.io/blog/2014/09/11/faster-random-samples-with-gap-sampling/">Gap Sampling</a>
      */
     public class BernoulliSampler<T> extends RandomSampler<T> {
     	
     	private final double fraction;
     	private final Random random;
     	
    +	//THRESHOLD	 is a tuning parameter for choosing sampling method according to the fraction
    --- End diff --
    
    Format: A blank after // and no extra blanks after THRESHOULD, and a period at the stop.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2533] [core] Gap based random sample op...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the pull request:

    https://github.com/apache/flink/pull/1110#issuecomment-140212159
  
    I will merge this PR tomorrow.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2533] [core] Gap based random sample op...

Posted by ChengXiangLi <gi...@git.apache.org>.
Github user ChengXiangLi commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1110#discussion_r39002786
  
    --- Diff: flink-java/src/main/java/org/apache/flink/api/java/sampling/PoissonSampler.java ---
    @@ -28,11 +29,16 @@
      *
      * @param <T> The type of sample.
      * @see <a href="https://en.wikipedia.org/wiki/Poisson_distribution">https://en.wikipedia.org/wiki/Poisson_distribution</a>
    + * @see <a href="http://erikerlandson.github.io/blog/2014/09/11/faster-random-samples-with-gap-sampling/">Gap Sampling</a>
      */
     public class PoissonSampler<T> extends RandomSampler<T> {
     	
     	private PoissonDistribution poissonDistribution;
     	private final double fraction;
    +	private final Random random = new Random();
    --- End diff --
    
    The "random" instance should better to be initialized in constructor method and use the seed parameter while available.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2533] [core] Gap based random sample op...

Posted by ChengXiangLi <gi...@git.apache.org>.
Github user ChengXiangLi commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1110#discussion_r39002941
  
    --- Diff: flink-java/src/main/java/org/apache/flink/api/java/sampling/BernoulliSampler.java ---
    @@ -102,15 +106,31 @@ public T next() {
     			}
     
     			private T getNextSampledElement() {
    -				while (input.hasNext()) {
    -					T element = input.next();
    -
    -					if (random.nextDouble() <= fraction) {
    +				if (fraction <= THRESHOLD) {
    +					double rand = random.nextDouble();
    +					double u = Math.max(rand, EPSILON);
    +					int gap = (int) (Math.log(u) / Math.log(1 - fraction));
    +					int elementCount = 0;
    +					if (input.hasNext()) {
    +						T element = input.next();
    +						while (input.hasNext() && elementCount < gap) {
    +							element = input.next();
    +							elementCount++;
    +						}
     						return element;
    +					} else {
    +						return null;
     					}
    -				}
    +				}else {
    --- End diff --
    
    Format: add a blank between right brace and "else".


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2533] [core] Gap based random sample op...

Posted by ChengXiangLi <gi...@git.apache.org>.
Github user ChengXiangLi commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1110#discussion_r39002862
  
    --- Diff: flink-java/src/main/java/org/apache/flink/api/java/sampling/PoissonSampler.java ---
    @@ -93,29 +98,59 @@ public boolean hasNext() {
     					}
     				}
     			}
    -
    -			private void moveToNextElement() {
    -				while (input.hasNext()) {
    +			
    +			public int poisson_ge1(double p){
    +				// sample 'k' from Poisson(p), conditioned to k >= 1
    +				double q = Math.pow(Math.E, -p);
    +				// simulate a poisson trial such that k >= 1
    +				double t = q + (1 - q)*random.nextDouble();
    +				int k = 1;
    +				// continue standard poisson generation trials
    +				t = t * random.nextDouble();
    +				while (t > q) {
    +					k++;
    +					t = t * random.nextDouble();
    +				}
    +				return k;
    +			}
    +			
    +			private void moveToNextElement(int num) {
    +				// skip elements with replication factor zero
    +				int elementCount = 0;
    +				while (input.hasNext() && elementCount < num){
     					currentElement = input.next();
    -					currentCount = poissonDistribution.sample();
    -					if (currentCount > 0) {
    -						break;
    +					elementCount++;
    +				}
    +			}
    +			
    +			private void samplingProcess(){
    +				if (fraction <= THRESHOLD) {
    +					double u = Math.max(random.nextDouble(), EPSILON);
    +					int gap = (int) (Math.log(u) / -fraction);
    +					moveToNextElement(gap);
    +					if (input.hasNext()) {
    +						currentElement = input.next();
    +						currentCount = poisson_ge1(fraction);
    +					}
    +				}
    +				else {
    --- End diff --
    
    Format: else should follow the right curly brace in the same line.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2533] [core] Gap based random sample op...

Posted by ChengXiangLi <gi...@git.apache.org>.
Github user ChengXiangLi commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1110#discussion_r39003398
  
    --- Diff: flink-java/src/main/java/org/apache/flink/api/java/sampling/PoissonSampler.java ---
    @@ -93,29 +98,59 @@ public boolean hasNext() {
     					}
     				}
     			}
    -
    -			private void moveToNextElement() {
    -				while (input.hasNext()) {
    +			
    +			public int poisson_ge1(double p){
    +				// sample 'k' from Poisson(p), conditioned to k >= 1
    +				double q = Math.pow(Math.E, -p);
    +				// simulate a poisson trial such that k >= 1
    +				double t = q + (1 - q)*random.nextDouble();
    +				int k = 1;
    +				// continue standard poisson generation trials
    +				t = t * random.nextDouble();
    +				while (t > q) {
    +					k++;
    +					t = t * random.nextDouble();
    +				}
    +				return k;
    +			}
    +			
    +			private void moveToNextElement(int num) {
    +				// skip elements with replication factor zero
    +				int elementCount = 0;
    +				while (input.hasNext() && elementCount < num){
     					currentElement = input.next();
    -					currentCount = poissonDistribution.sample();
    -					if (currentCount > 0) {
    -						break;
    +					elementCount++;
    +				}
    +			}
    +			
    +			private void samplingProcess(){
    +				if (fraction <= THRESHOLD) {
    +					double u = Math.max(random.nextDouble(), EPSILON);
    +					int gap = (int) (Math.log(u) / -fraction);
    +					moveToNextElement(gap);
    +					if (input.hasNext()) {
    +						currentElement = input.next();
    +						currentCount = poisson_ge1(fraction);
    +					}
    +				}
    +				else {
    +					while (input.hasNext()){
    +						currentElement = input.next();
    +						currentCount = poissonDistribution.sample();
    +						if (currentCount > 0) {
    +							break;
    +						}
     					}
     				}
     			}
     			
     			@Override
     			public T next() {
    --- End diff --
    
    Format: aggregate the public method at the top and private methods at the bottom.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2533] [core] Gap based random sample op...

Posted by ChengXiangLi <gi...@git.apache.org>.
Github user ChengXiangLi commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1110#discussion_r39003867
  
    --- Diff: flink-java/src/main/java/org/apache/flink/api/java/sampling/PoissonSampler.java ---
    @@ -93,29 +98,59 @@ public boolean hasNext() {
     					}
     				}
     			}
    -
    -			private void moveToNextElement() {
    -				while (input.hasNext()) {
    +			
    +			public int poisson_ge1(double p){
    +				// sample 'k' from Poisson(p), conditioned to k >= 1
    +				double q = Math.pow(Math.E, -p);
    +				// simulate a poisson trial such that k >= 1
    +				double t = q + (1 - q)*random.nextDouble();
    +				int k = 1;
    +				// continue standard poisson generation trials
    +				t = t * random.nextDouble();
    +				while (t > q) {
    +					k++;
    +					t = t * random.nextDouble();
    +				}
    +				return k;
    +			}
    +			
    +			private void moveToNextElement(int num) {
    --- End diff --
    
    This method skip gap elements, you actually move to the next sampled element in ```samplingProcess```, how about rename to ```skipGapElements```?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2533] [core] Gap based random sample op...

Posted by ChengXiangLi <gi...@git.apache.org>.
Github user ChengXiangLi commented on the pull request:

    https://github.com/apache/flink/pull/1110#issuecomment-138766071
  
    Thanks for contributing on this, @gallenvara , the micro benchmark result is quite impressive, and the patch looks good, i left some comments, most of them are about file format.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2533] [core] Gap based random sample op...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/1110


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---