You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by gallenvara <gi...@git.apache.org> on 2016/03/09 03:24:58 UTC

[GitHub] flink pull request: [FLINK-2997] Support range partition with user...

GitHub user gallenvara opened a pull request:

    https://github.com/apache/flink/pull/1776

    [FLINK-2997] Support range partition with user customized data distribution.

    Sometime user have better knowledge of the source data, and they can build customized `data distribution` to do range partition more efficiently.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/gallenvara/flink flink-2997

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/1776.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1776
    
----
commit 1e00767da3e83d688847ffe5c4ab77cab68f2062
Author: gallenvara <ga...@126.com>
Date:   2016-01-27T17:34:10Z

    Enable range partition with custom data distribution.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2997] Support range partition with user...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1776#discussion_r57141686
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java ---
    @@ -0,0 +1,230 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.test.javaApiOperators;
    +
    +import org.apache.flink.api.common.distributions.DataDistribution;
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.common.functions.RichMapPartitionFunction;
    +import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.io.DiscardingOutputFormat;
    +import org.apache.flink.api.java.tuple.Tuple3;
    +import org.apache.flink.api.java.utils.DataSetUtils;
    +import org.apache.flink.core.memory.DataInputView;
    +import org.apache.flink.core.memory.DataOutputView;
    +import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
    +import org.apache.flink.util.Collector;
    +import org.junit.Test;
    +
    +
    +import java.io.IOException;
    +
    +import static org.junit.Assert.fail;
    +
    +
    +public class CustomDistributionITCase {
    +	
    +	@Test
    +	public void testPartitionWithDistribution1() throws Exception{
    +		/*
    +		 * Test the record partitioned rightly with one field according to the customized data distribution
    +		 */
    +
    +		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
    +
    +		DataSet<Tuple3<Integer, Long, String>> input1 = CollectionDataSets.get3TupleDataSet(env);
    +		final TestDataDist1 dist = new TestDataDist1(1);
    +
    +		env.setParallelism(dist.getParallelism());
    +
    +		DataSet<Boolean> result = DataSetUtils.partitionByRange(input1, dist, 0).mapPartition(new RichMapPartitionFunction<Tuple3<Integer, Long, String>, Boolean>() {
    +			@Override
    +			public void mapPartition(Iterable<Tuple3<Integer, Long, String>> values, Collector<Boolean> out) throws Exception {
    +				int partitionIndex = getRuntimeContext().getIndexOfThisSubtask();
    +
    +				for (Tuple3<Integer, Long, String> s : values) {
    +					if ((s.f0 - 1) / 7 != partitionIndex) {
    +						fail("Record was not correctly partitioned: " + s.toString());
    +					}
    +				}
    +			}
    +		});
    +
    +		result.output(new DiscardingOutputFormat()); 
    +		env.execute();
    +	}
    +
    +	@Test
    +	public void testRangeWithDistribution2() throws Exception{
    +		/*
    +		 * Test the record partitioned rightly with two fields according to the customized data distribution
    +		 */
    +
    +		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
    +
    +		DataSet<Tuple3<Integer, Long, String>> input1 = env.fromElements(
    +						new Tuple3<>(1, 5L, "Hi"),
    +						new Tuple3<>(1, 11L, "Hello"),
    +						new Tuple3<>(2, 3L, "World"),
    +						new Tuple3<>(2, 13L, "Hello World"),
    +						new Tuple3<>(3, 8L, "Say"),
    +						new Tuple3<>(4, 0L, "Why"),
    +						new Tuple3<>(4, 2L, "Java"),
    +						new Tuple3<>(4, 11L, "Say Hello"),
    +						new Tuple3<>(5, 2L, "Hi Java"));
    +
    +		final TestDataDist2 dist = new TestDataDist2(2);
    +
    +		env.setParallelism(dist.getParallelism());
    +
    +		DataSet<Boolean> result = DataSetUtils.partitionByRange(input1.map(new MapFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, Integer, String>>() {
    +			@Override
    +			public Tuple3<Integer, Integer, String> map(Tuple3<Integer, Long, String> value) throws Exception {
    +				return new Tuple3<>(value.f0, value.f1.intValue(), value.f2);
    +			}
    +		}), dist, 0, 1).mapPartition(new RichMapPartitionFunction<Tuple3<Integer, Integer, String>, Boolean>() {
    +			@Override
    +			public void mapPartition(Iterable<Tuple3<Integer, Integer, String>> values, Collector<Boolean> out) throws Exception {
    +				int partitionIndex = getRuntimeContext().getIndexOfThisSubtask();
    +				for (Tuple3<Integer, Integer, String> s : values) {
    +					
    +					if ((s.f0 > partitionIndex + 1) || ((s.f0 == partitionIndex + 1) && (s.f1 > dist.rightBoundary[partitionIndex]))) {
    +							fail("Record was not correctly partitioned: " + s.toString());
    +					}
    +				}
    +			}
    +		});
    +
    +		result.output(new DiscardingOutputFormat());
    +		env.execute();
    +	}
    +
    +	/**
    +	 * The class is used to do the tests of range partition with one key.
    +	 */
    +	public static class TestDataDist1 implements DataDistribution {
    +
    +		private int dim;
    +
    +		public TestDataDist1() {}
    +
    +		/**
    +		 * Constructor of the customized distribution for range partition.
    +		 * @param dim the number of the fields.
    +		 */
    +		public TestDataDist1(int dim) {
    +			this.dim = dim;
    +		}
    +
    +		public int getParallelism() {
    +			return 3;
    +		}
    +
    +		@Override
    +		public Object[] getBucketBoundary(int bucketNum, int totalNumBuckets) {
    +
    +			/*
    +			for the first test, the boundary is just like : 
    +			(0, 7]
    +			(7, 14]
    +			(14, 21]
    +			 */
    +			return new Integer[]{(bucketNum + 1) * 7};
    +		}
    +
    +		@Override
    +		public int getNumberOfFields() {
    +			return this.dim;
    +		}
    +
    +		@Override
    +		public TypeInformation[] getKeyTypes() {
    +			return new TypeInformation[]{BasicTypeInfo.INT_TYPE_INFO};
    +		}
    +
    +		@Override
    +		public void write(DataOutputView out) throws IOException {
    +			out.writeInt(this.dim);
    +		}
    +
    +		@Override
    +		public void read(DataInputView in) throws IOException {
    +			this.dim = in.readInt();
    +		}
    +	}
    +
    +	/**
    +	 * The class is used to do the tests of range partition with two keys.
    +	 */
    +	public static class TestDataDist2 implements DataDistribution {
    +
    +		public int rightBoundary[] = new int[]{6, 4, 9, 1, 2};
    +		private int dim;
    --- End diff --
    
    `dim` should always be `2`. Remove the field, the constructor, and update `write()` and `read()`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2997] Support range partition with user...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1776#discussion_r57140198
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java ---
    @@ -110,52 +118,93 @@ public void mapPartition(Iterable<Tuple3<Integer, Integer, String>> values, Coll
     	}
     
     	/**
    -	 * The class is used to do the tests of range partition with customed data distribution.
    +	 * The class is used to do the tests of range partition with one key.
     	 */
    -	public static class TestDataDist implements DataDistribution {
    +	public static class TestDataDist1 implements DataDistribution {
     
     		private int dim;
     
    -		public TestDataDist() {}
    +		public TestDataDist1() {}
     
     		/**
     		 * Constructor of the customized distribution for range partition.
     		 * @param dim the number of the fields.
     		 */
    -		public TestDataDist(int dim) {
    +		public TestDataDist1(int dim) {
     			this.dim = dim;
     		}
     
     		public int getParallelism() {
    -			if (dim == 1) {
    -				return 3;
    -			}
    -			return 6;
    +			return 3;
     		}
     
     		@Override
     		public Object[] getBucketBoundary(int bucketNum, int totalNumBuckets) {
    -			if (dim == 1) {
    -				/*
    -				for the first test, the boundary is just like : 
    -				(0, 7]
    -				(7, 14]
    -				(14, 21]
    -				 */
    -
    -				return new Integer[]{(bucketNum + 1) * 7};
    -			}
    +
     			/*
    -			for the second test, the boundary is just like : 
    -			(0, 1], (0, 1]
    -			(1, 3], (1, 2]
    -			(3, 6], (2, 3]
    -			(6, 10], (3, 4]
    -			(10, 15], (4, 5]
    -			(15, 21], (5, 6]
    +			for the first test, the boundary is just like : 
    +			(0, 7]
    +			(7, 14]
    +			(14, 21]
     			 */
    +			return new Integer[]{(bucketNum + 1) * 7};
    +		}
    +
    +		@Override
    +		public int getNumberOfFields() {
    +			return this.dim;
    +		}
     
    -			return new Integer[]{(bucketNum + 1) * (bucketNum + 2) / 2, bucketNum + 1};
    +		@Override
    +		public TypeInformation[] getKeyTypes() {
    +			return new TypeInformation[]{BasicTypeInfo.INT_TYPE_INFO};
    +		}
    +
    +		@Override
    +		public void write(DataOutputView out) throws IOException {
    +			out.writeInt(this.dim);
    +		}
    +
    +		@Override
    +		public void read(DataInputView in) throws IOException {
    +			this.dim = in.readInt();
    +		}
    +	}
    +
    +	/**
    +	 * The class is used to do the tests of range partition with two keys.
    +	 */
    +	public static class TestDataDist2 implements DataDistribution {
    +
    +		public int rightBoundary[] = new int[]{6, 4, 9, 1, 2};
    +		private int dim;
    --- End diff --
    
    `dim` should always be `2`. Remove the field, the constructor, and update `write()` and `read()`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2997] Support range partition with user...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1776#discussion_r56344323
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java ---
    @@ -0,0 +1,161 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.test.javaApiOperators;
    +
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.common.functions.RichMapPartitionFunction;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.test.distribution.CustomDistribution;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.tuple.Tuple3;
    +import org.apache.flink.api.java.utils.DataSetUtils;
    +import org.apache.flink.types.IntValue;
    +import org.apache.flink.util.Collector;
    +import org.junit.Test;
    +
    +
    +import java.util.List;
    +
    +import static org.junit.Assert.assertTrue;
    +
    +
    +public class CustomDistributionITCase {
    +	
    +	@Test
    +	public void testPartitionWithDistribution1() throws Exception{
    +		/*
    +		 * Test the record partitioned rightly with one field according to the customized data distribution
    +		 */
    +
    +		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
    +
    +		DataSet<Tuple3<Integer, Integer, String>> input1 = env.fromElements(
    +				new Tuple3<>(1, 1, "Hi"),
    +				new Tuple3<>(1, 2, "Hello"),
    +				new Tuple3<>(1, 3, "Hello world"),
    +				new Tuple3<>(2, 4, "how are you?"),
    +				new Tuple3<>(2, 5, "I am fine."),
    +				new Tuple3<>(3, 6, "Luke Skywalker"),
    +				new Tuple3<>(4, 7, "Comment#1"),
    +				new Tuple3<>(4, 8, "Comment#2"),
    +				new Tuple3<>(4, 9, "Comment#3"),
    +				new Tuple3<>(5, 10, "Comment#4"));
    +
    +		final IntValue[] keys = new IntValue[3];
    +
    +		for (int i = 0; i < 3; i++) {
    +			keys[i] = new IntValue((i + 1) * 2);
    +		}
    +
    +		final CustomDistribution cd = new CustomDistribution(keys);
    +
    +		env.setParallelism(3);
    +
    +		DataSet<Boolean> out1 = DataSetUtils.partitionByRange(input1.map(new MapFunction<Tuple3<Integer, Integer, String>, Tuple2<IntValue, IntValue>>() {
    +			@Override
    +			public Tuple2<IntValue, IntValue> map(Tuple3<Integer, Integer, String> value) throws Exception {
    +				IntValue key1;
    +				IntValue key2;
    +				key1 = new IntValue(value.f0);
    +				key2 = new IntValue(value.f1);
    +				return new Tuple2<>(key1, key2);
    +			}
    +		}), cd, 0).mapPartition(new RichMapPartitionFunction<Tuple2<IntValue, IntValue>, Boolean>() {
    +			@Override
    +			public void mapPartition(Iterable<Tuple2<IntValue, IntValue>> values, Collector<Boolean> out) throws Exception {
    +				boolean boo = true;
    +				for (Tuple2<IntValue, IntValue> s : values) {
    +					IntValue intValues= (IntValue)cd.getBucketBoundary(getRuntimeContext().getIndexOfThisSubtask(), 3)[0];
    +					if (s.f0.getValue() > intValues.getValue()) {
    --- End diff --
    
    This does only check one side of the bucket boundary. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2997] Support range partition with user...

Posted by gallenvara <gi...@git.apache.org>.
Github user gallenvara commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1776#discussion_r56980076
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java ---
    @@ -0,0 +1,184 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.test.javaApiOperators;
    +
    +import org.apache.flink.api.common.distributions.DataDistribution;
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.common.functions.RichMapPartitionFunction;
    +import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.io.DiscardingOutputFormat;
    +import org.apache.flink.api.java.tuple.Tuple3;
    +import org.apache.flink.api.java.utils.DataSetUtils;
    +import org.apache.flink.core.memory.DataInputView;
    +import org.apache.flink.core.memory.DataOutputView;
    +import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
    +import org.apache.flink.util.Collector;
    +import org.junit.Test;
    +
    +
    +import java.io.IOException;
    +
    +import static org.junit.Assert.fail;
    +
    +
    +public class CustomDistributionITCase {
    +	
    +	@Test
    +	public void testPartitionWithDistribution1() throws Exception{
    +		/*
    +		 * Test the record partitioned rightly with one field according to the customized data distribution
    +		 */
    +
    +		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
    +
    +		DataSet<Tuple3<Integer, Long, String>> input1 = CollectionDataSets.get3TupleDataSet(env);
    +		final TestDataDist dist = new TestDataDist(1);
    +
    +		env.setParallelism(dist.getParallelism());
    +
    +		DataSet<Boolean> result = DataSetUtils.partitionByRange(input1, dist, 0).mapPartition(new RichMapPartitionFunction<Tuple3<Integer, Long, String>, Boolean>() {
    +			@Override
    +			public void mapPartition(Iterable<Tuple3<Integer, Long, String>> values, Collector<Boolean> out) throws Exception {
    +				int partitionIndex = getRuntimeContext().getIndexOfThisSubtask();
    +
    +				for (Tuple3<Integer, Long, String> s : values) {
    +					if ((s.f0 - 1) / 7 != partitionIndex) {
    +						fail("Record was not correctly partitioned: " + s.toString());
    +					}
    +				}
    +			}
    +		});
    +
    +		result.output(new DiscardingOutputFormat()); 
    +		env.execute();
    +	}
    +
    +	@Test
    +	public void testRangeWithDistribution2() throws Exception{
    +		/*
    +		 * Test the record partitioned rightly with two fields according to the customized data distribution
    +		 */
    +
    +		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
    +
    +		DataSet<Tuple3<Integer, Long, String>> input1 = CollectionDataSets.get3TupleDataSet(env);
    +		final TestDataDist dist = new TestDataDist(2);
    +
    +		env.setParallelism(dist.getParallelism());
    +
    +		DataSet<Boolean> result = DataSetUtils.partitionByRange(input1.map(new MapFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, Integer, String>>() {
    +			@Override
    +			public Tuple3<Integer, Integer, String> map(Tuple3<Integer, Long, String> value) throws Exception {
    +				return new Tuple3<>(value.f0, value.f1.intValue(), value.f2);
    +			}
    +		}), dist, 0, 1).mapPartition(new RichMapPartitionFunction<Tuple3<Integer, Integer, String>, Boolean>() {
    +			@Override
    +			public void mapPartition(Iterable<Tuple3<Integer, Integer, String>> values, Collector<Boolean> out) throws Exception {
    +				int partitionIndex = getRuntimeContext().getIndexOfThisSubtask();
    +
    +				for (Tuple3<Integer, Integer, String> s : values) {
    +					if (s.f0 <= partitionIndex * (partitionIndex + 1) / 2 ||
    +							s.f0 > (partitionIndex + 1) * (partitionIndex + 2) / 2 ||
    +							s.f1 - 1 != partitionIndex) {
    +						fail("Record was not correctly partitioned: " + s.toString());
    +					}
    +				}
    +			}
    +		});
    +
    +		result.output(new DiscardingOutputFormat());
    +		env.execute();
    +	}
    +
    +	/**
    +	 * The class is used to do the tests of range partition with customed data distribution.
    +	 */
    +	public static class TestDataDist implements DataDistribution {
    +
    +		private int dim;
    +
    +		public TestDataDist() {}
    +
    +		/**
    +		 * Constructor of the customized distribution for range partition.
    +		 * @param dim the number of the fields.
    +		 */
    +		public TestDataDist(int dim) {
    +			this.dim = dim;
    +		}
    +
    +		public int getParallelism() {
    +			if (dim == 1) {
    +				return 3;
    +			}
    +			return 6;
    +		}
    +
    +		@Override
    +		public Object[] getBucketBoundary(int bucketNum, int totalNumBuckets) {
    +			if (dim == 1) {
    +				/*
    +				for the first test, the boundary is just like : 
    +				(0, 7]
    +				(7, 14]
    +				(14, 21]
    +				 */
    +
    +				return new Integer[]{(bucketNum + 1) * 7};
    +			}
    +			/*
    +			for the second test, the boundary is just like : 
    +			(0, 1], (0, 1]
    --- End diff --
    
    I do not quite understand about the partition result and why the common distribution should be like this. Can you explain to me in detail? Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2997] Support range partition with user...

Posted by gallenvara <gi...@git.apache.org>.
Github user gallenvara commented on the pull request:

    https://github.com/apache/flink/pull/1776#issuecomment-200255451
  
    @fhueske , PR has been updated.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2997] Support range partition with user...

Posted by gallenvara <gi...@git.apache.org>.
Github user gallenvara commented on the pull request:

    https://github.com/apache/flink/pull/1776#issuecomment-198676014
  
    @fhueske , thanks a lot for review work, codes have been modified based on your advice. I change the second test with modifying the range boundary from `(bucketNum+1)*7,bucketNum*2+3` to `(bucketNum+1)*(bucketNum+2)/2,bucketNum+1`. The reason for this is for the record `Tuple3(15, 5L, "Comment#9")`, it will be emitted to `partition:2` with first field and `partition:1` with second field and the test will be failed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2997] Support range partition with user...

Posted by gallenvara <gi...@git.apache.org>.
Github user gallenvara commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1776#discussion_r56464954
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java ---
    @@ -0,0 +1,161 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.test.javaApiOperators;
    +
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.common.functions.RichMapPartitionFunction;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.test.distribution.CustomDistribution;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.tuple.Tuple3;
    +import org.apache.flink.api.java.utils.DataSetUtils;
    +import org.apache.flink.types.IntValue;
    +import org.apache.flink.util.Collector;
    +import org.junit.Test;
    +
    +
    +import java.util.List;
    +
    +import static org.junit.Assert.assertTrue;
    +
    +
    +public class CustomDistributionITCase {
    +	
    +	@Test
    +	public void testPartitionWithDistribution1() throws Exception{
    +		/*
    +		 * Test the record partitioned rightly with one field according to the customized data distribution
    +		 */
    +
    +		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
    +
    +		DataSet<Tuple3<Integer, Integer, String>> input1 = env.fromElements(
    +				new Tuple3<>(1, 1, "Hi"),
    +				new Tuple3<>(1, 2, "Hello"),
    +				new Tuple3<>(1, 3, "Hello world"),
    +				new Tuple3<>(2, 4, "how are you?"),
    +				new Tuple3<>(2, 5, "I am fine."),
    +				new Tuple3<>(3, 6, "Luke Skywalker"),
    +				new Tuple3<>(4, 7, "Comment#1"),
    +				new Tuple3<>(4, 8, "Comment#2"),
    +				new Tuple3<>(4, 9, "Comment#3"),
    +				new Tuple3<>(5, 10, "Comment#4"));
    +
    +		final IntValue[] keys = new IntValue[3];
    +
    +		for (int i = 0; i < 3; i++) {
    +			keys[i] = new IntValue((i + 1) * 2);
    +		}
    +
    +		final CustomDistribution cd = new CustomDistribution(keys);
    +
    +		env.setParallelism(3);
    +
    +		DataSet<Boolean> out1 = DataSetUtils.partitionByRange(input1.map(new MapFunction<Tuple3<Integer, Integer, String>, Tuple2<IntValue, IntValue>>() {
    +			@Override
    +			public Tuple2<IntValue, IntValue> map(Tuple3<Integer, Integer, String> value) throws Exception {
    +				IntValue key1;
    +				IntValue key2;
    +				key1 = new IntValue(value.f0);
    +				key2 = new IntValue(value.f1);
    +				return new Tuple2<>(key1, key2);
    +			}
    +		}), cd, 0).mapPartition(new RichMapPartitionFunction<Tuple2<IntValue, IntValue>, Boolean>() {
    +			@Override
    +			public void mapPartition(Iterable<Tuple2<IntValue, IntValue>> values, Collector<Boolean> out) throws Exception {
    +				boolean boo = true;
    +				for (Tuple2<IntValue, IntValue> s : values) {
    +					IntValue intValues= (IntValue)cd.getBucketBoundary(getRuntimeContext().getIndexOfThisSubtask(), 3)[0];
    +					if (s.f0.getValue() > intValues.getValue()) {
    +						boo = false;
    +					}
    +				}
    +				out.collect(boo);
    +			}
    +		});
    +
    +		List<Boolean> result = out1.collect();
    +		for (int i = 0; i < result.size(); i++) {
    --- End diff --
    
    IMO, the boolean `MapPartition` returned only tell us the record distributed rightly in a partition, iterating the list can validate the correctness of all partitions.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2997] Support range partition with user...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1776#discussion_r56344480
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java ---
    @@ -0,0 +1,161 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.test.javaApiOperators;
    +
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.common.functions.RichMapPartitionFunction;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.test.distribution.CustomDistribution;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.tuple.Tuple3;
    +import org.apache.flink.api.java.utils.DataSetUtils;
    +import org.apache.flink.types.IntValue;
    +import org.apache.flink.util.Collector;
    +import org.junit.Test;
    +
    +
    +import java.util.List;
    +
    +import static org.junit.Assert.assertTrue;
    +
    +
    +public class CustomDistributionITCase {
    +	
    +	@Test
    +	public void testPartitionWithDistribution1() throws Exception{
    +		/*
    +		 * Test the record partitioned rightly with one field according to the customized data distribution
    +		 */
    +
    +		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
    +
    +		DataSet<Tuple3<Integer, Integer, String>> input1 = env.fromElements(
    +				new Tuple3<>(1, 1, "Hi"),
    +				new Tuple3<>(1, 2, "Hello"),
    +				new Tuple3<>(1, 3, "Hello world"),
    +				new Tuple3<>(2, 4, "how are you?"),
    +				new Tuple3<>(2, 5, "I am fine."),
    +				new Tuple3<>(3, 6, "Luke Skywalker"),
    +				new Tuple3<>(4, 7, "Comment#1"),
    +				new Tuple3<>(4, 8, "Comment#2"),
    +				new Tuple3<>(4, 9, "Comment#3"),
    +				new Tuple3<>(5, 10, "Comment#4"));
    +
    +		final IntValue[] keys = new IntValue[3];
    +
    +		for (int i = 0; i < 3; i++) {
    +			keys[i] = new IntValue((i + 1) * 2);
    +		}
    +
    +		final CustomDistribution cd = new CustomDistribution(keys);
    +
    +		env.setParallelism(3);
    +
    +		DataSet<Boolean> out1 = DataSetUtils.partitionByRange(input1.map(new MapFunction<Tuple3<Integer, Integer, String>, Tuple2<IntValue, IntValue>>() {
    +			@Override
    +			public Tuple2<IntValue, IntValue> map(Tuple3<Integer, Integer, String> value) throws Exception {
    +				IntValue key1;
    +				IntValue key2;
    +				key1 = new IntValue(value.f0);
    +				key2 = new IntValue(value.f1);
    +				return new Tuple2<>(key1, key2);
    +			}
    +		}), cd, 0).mapPartition(new RichMapPartitionFunction<Tuple2<IntValue, IntValue>, Boolean>() {
    +			@Override
    +			public void mapPartition(Iterable<Tuple2<IntValue, IntValue>> values, Collector<Boolean> out) throws Exception {
    +				boolean boo = true;
    +				for (Tuple2<IntValue, IntValue> s : values) {
    +					IntValue intValues= (IntValue)cd.getBucketBoundary(getRuntimeContext().getIndexOfThisSubtask(), 3)[0];
    --- End diff --
    
    IMO, it would be more readable to use the boundaries directly instead of fetching them from the distribution.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2997] Support range partition with user...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1776#discussion_r55660703
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java ---
    @@ -0,0 +1,142 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.test.javaApiOperators;
    +
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.test.distribution.CustomDistribution;
    +import org.apache.flink.api.common.functions.MapPartitionFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.tuple.Tuple1;
    +import org.apache.flink.api.java.tuple.Tuple3;
    +import org.apache.flink.api.java.utils.DataSetUtils;
    +import org.apache.flink.types.IntValue;
    +import org.apache.flink.util.Collector;
    +import org.junit.Test;
    +
    +
    +import static org.junit.Assert.assertEquals;
    +
    +
    +public class CustomDistributionITCase {
    +	
    +	@Test
    +	public void testRangeWithDistribution1() throws Exception{
    +
    +		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
    +
    +		DataSet<Tuple3<Integer, Integer, String>> input1 = env.fromElements(
    +				new Tuple3<>(1, 1, "Hi"),
    +				new Tuple3<>(1, 2, "Hello"),
    +				new Tuple3<>(1, 3, "Hello world"),
    +				new Tuple3<>(2, 4, "Hello world, how are you?"),
    +				new Tuple3<>(2, 5, "I am fine."),
    +				new Tuple3<>(3, 6, "Luke Skywalker"),
    +				new Tuple3<>(4, 7, "Comment#1"),
    +				new Tuple3<>(4, 8, "Comment#2"),
    +				new Tuple3<>(4, 9, "Comment#3"),
    +				new Tuple3<>(5, 10, "Comment#4"));
    +
    +		IntValue[][] keys = new IntValue[2][2];
    +
    +		env.setParallelism(3);
    +
    +		for (int i = 0; i < 2; i++)
    +		{
    --- End diff --
    
    Code style, please do not put the opening curly brace in a new line.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2997] Support range partition with user...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1776#discussion_r55660531
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java ---
    @@ -0,0 +1,142 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.test.javaApiOperators;
    +
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.test.distribution.CustomDistribution;
    +import org.apache.flink.api.common.functions.MapPartitionFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.tuple.Tuple1;
    +import org.apache.flink.api.java.tuple.Tuple3;
    +import org.apache.flink.api.java.utils.DataSetUtils;
    +import org.apache.flink.types.IntValue;
    +import org.apache.flink.util.Collector;
    +import org.junit.Test;
    +
    +
    +import static org.junit.Assert.assertEquals;
    +
    +
    +public class CustomDistributionITCase {
    +	
    +	@Test
    +	public void testRangeWithDistribution1() throws Exception{
    +
    +		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
    +
    +		DataSet<Tuple3<Integer, Integer, String>> input1 = env.fromElements(
    --- End diff --
    
    You can use the built-in data sets like all other tests: `CollectionDataSets.get3TupleDataSet(env)`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2997] Support range partition with user...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1776#discussion_r56344082
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java ---
    @@ -0,0 +1,161 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.test.javaApiOperators;
    +
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.common.functions.RichMapPartitionFunction;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.test.distribution.CustomDistribution;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.tuple.Tuple3;
    +import org.apache.flink.api.java.utils.DataSetUtils;
    +import org.apache.flink.types.IntValue;
    +import org.apache.flink.util.Collector;
    +import org.junit.Test;
    +
    +
    +import java.util.List;
    +
    +import static org.junit.Assert.assertTrue;
    +
    +
    +public class CustomDistributionITCase {
    +	
    +	@Test
    +	public void testPartitionWithDistribution1() throws Exception{
    +		/*
    +		 * Test the record partitioned rightly with one field according to the customized data distribution
    +		 */
    +
    +		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
    +
    +		DataSet<Tuple3<Integer, Integer, String>> input1 = env.fromElements(
    +				new Tuple3<>(1, 1, "Hi"),
    +				new Tuple3<>(1, 2, "Hello"),
    +				new Tuple3<>(1, 3, "Hello world"),
    +				new Tuple3<>(2, 4, "how are you?"),
    +				new Tuple3<>(2, 5, "I am fine."),
    +				new Tuple3<>(3, 6, "Luke Skywalker"),
    +				new Tuple3<>(4, 7, "Comment#1"),
    +				new Tuple3<>(4, 8, "Comment#2"),
    +				new Tuple3<>(4, 9, "Comment#3"),
    +				new Tuple3<>(5, 10, "Comment#4"));
    +
    +		final IntValue[] keys = new IntValue[3];
    +
    +		for (int i = 0; i < 3; i++) {
    +			keys[i] = new IntValue((i + 1) * 2);
    +		}
    +
    +		final CustomDistribution cd = new CustomDistribution(keys);
    --- End diff --
    
    I think we do not need the `CustomDistribution` class. It is quite heavy and hard to read for a test class. 
    IMO, a static `DataDistribution` class like the following would be sufficient for these tests:
    
    ```
    public static final class TestDataDist implements DataDistribution {
    
    	@Override
    	public Object[] getBucketBoundary(int bucketNum, int totalNumBuckets) {
    		return new Integer[]{(bucketNum+1)*2};
    	}
    
    	@Override
    	public int getNumberOfFields() {
    		return 1;
    	}
    
    	@Override
    	public void write(DataOutputView out) throws IOException { }
    
    	@Override
    	public void read(DataInputView in) throws IOException { }
    }
    
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2997] Support range partition with user...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1776#discussion_r56332311
  
    --- Diff: flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/RangePartitionRewriter.java ---
    @@ -221,7 +223,7 @@ public void postVisit(PlanNode node) {
     		prRemoverNode.setParallelism(targetParallelism);
     		prPlanNode.setParallelism(targetParallelism);
     		GlobalProperties globalProperties = new GlobalProperties();
    -		globalProperties.setRangePartitioned(new Ordering(0, null, Order.ASCENDING));
    +		globalProperties.setRangePartitioned(new Ordering(0, null, Order.ASCENDING), channel.getDataDistribution());
    --- End diff --
    
    can you pass `null` instead of `channel.getDataDistribution()` to make it clear that no provided data distribution is available?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2997] Support range partition with user...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1776#discussion_r56343356
  
    --- Diff: flink-optimizer/src/main/java/org/apache/flink/optimizer/dataproperties/GlobalProperties.java ---
    @@ -80,9 +83,15 @@ public void setHashPartitioned(FieldList partitionedFields) {
     		this.partitioningFields = partitionedFields;
     		this.ordering = null;
     	}
    -	
     
    -	public void setRangePartitioned(Ordering ordering) {
    +	/**
    +	 * Set the parameters for range partition.
    +	 * 
    +	 * @param ordering Order of the partitioned fields
    +	 * @param distribution The data distribution for range partition. User can supply a customized data distribution,
    +	 *                     also the data distribution can be null.  
    +	 */
    +	public void setRangePartitioned(Ordering ordering, DataDistribution distribution) {
    --- End diff --
    
    I know you've changed this part before, but could you add a `setRangePartitioned(Ordering ordering)` method which can be used if no `DataDistribution` is provided? Would be nicer than passing `null` everywhere.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2997] Support range partition with user...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1776#discussion_r55660236
  
    --- Diff: flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/RangePartitionRewriter.java ---
    @@ -113,10 +113,12 @@ public void postVisit(PlanNode node) {
     					throw new InvalidProgramException("Range Partitioning not supported within iterations.");
    --- End diff --
    
    range partitioning is possible if the data distribution is provided. Please refactor this code accordingly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2997] Support range partition with user...

Posted by gallenvara <gi...@git.apache.org>.
Github user gallenvara commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1776#discussion_r56668261
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/distribution/TestDataDist.java ---
    @@ -0,0 +1,77 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.test.distribution;
    +
    +import org.apache.flink.api.common.distributions.DataDistribution;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.java.typeutils.TypeExtractor;
    +import org.apache.flink.core.memory.DataInputView;
    +import org.apache.flink.core.memory.DataOutputView;
    +
    +import java.io.IOException;
    +
    +/**
    + * The class is used to do the tests of range partition with customed data distribution.
    + */
    +public class TestDataDist implements DataDistribution {
    --- End diff --
    
    Thanks, @fhueske . I will modify the code tomorrow.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2997] Support range partition with user...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1776#discussion_r57139893
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java ---
    @@ -110,52 +118,93 @@ public void mapPartition(Iterable<Tuple3<Integer, Integer, String>> values, Coll
     	}
     
     	/**
    -	 * The class is used to do the tests of range partition with customed data distribution.
    +	 * The class is used to do the tests of range partition with one key.
     	 */
    -	public static class TestDataDist implements DataDistribution {
    +	public static class TestDataDist1 implements DataDistribution {
     
     		private int dim;
    --- End diff --
    
    No need for `dim`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2997] Support range partition with user...

Posted by ChengXiangLi <gi...@git.apache.org>.
Github user ChengXiangLi commented on the pull request:

    https://github.com/apache/flink/pull/1776#issuecomment-194697700
  
    Just a minor comment, mostly looks good to me. @fhueske , do you want to take a look at this PR?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2997] Support range partition with user...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1776#discussion_r56344684
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java ---
    @@ -0,0 +1,161 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.test.javaApiOperators;
    +
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.common.functions.RichMapPartitionFunction;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.test.distribution.CustomDistribution;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.tuple.Tuple3;
    +import org.apache.flink.api.java.utils.DataSetUtils;
    +import org.apache.flink.types.IntValue;
    +import org.apache.flink.util.Collector;
    +import org.junit.Test;
    +
    +
    +import java.util.List;
    +
    +import static org.junit.Assert.assertTrue;
    +
    +
    +public class CustomDistributionITCase {
    +	
    +	@Test
    +	public void testPartitionWithDistribution1() throws Exception{
    +		/*
    +		 * Test the record partitioned rightly with one field according to the customized data distribution
    +		 */
    +
    +		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
    +
    +		DataSet<Tuple3<Integer, Integer, String>> input1 = env.fromElements(
    +				new Tuple3<>(1, 1, "Hi"),
    +				new Tuple3<>(1, 2, "Hello"),
    +				new Tuple3<>(1, 3, "Hello world"),
    +				new Tuple3<>(2, 4, "how are you?"),
    +				new Tuple3<>(2, 5, "I am fine."),
    +				new Tuple3<>(3, 6, "Luke Skywalker"),
    +				new Tuple3<>(4, 7, "Comment#1"),
    +				new Tuple3<>(4, 8, "Comment#2"),
    +				new Tuple3<>(4, 9, "Comment#3"),
    +				new Tuple3<>(5, 10, "Comment#4"));
    +
    +		final IntValue[] keys = new IntValue[3];
    +
    +		for (int i = 0; i < 3; i++) {
    +			keys[i] = new IntValue((i + 1) * 2);
    +		}
    +
    +		final CustomDistribution cd = new CustomDistribution(keys);
    +
    +		env.setParallelism(3);
    +
    +		DataSet<Boolean> out1 = DataSetUtils.partitionByRange(input1.map(new MapFunction<Tuple3<Integer, Integer, String>, Tuple2<IntValue, IntValue>>() {
    +			@Override
    +			public Tuple2<IntValue, IntValue> map(Tuple3<Integer, Integer, String> value) throws Exception {
    +				IntValue key1;
    +				IntValue key2;
    +				key1 = new IntValue(value.f0);
    +				key2 = new IntValue(value.f1);
    +				return new Tuple2<>(key1, key2);
    +			}
    +		}), cd, 0).mapPartition(new RichMapPartitionFunction<Tuple2<IntValue, IntValue>, Boolean>() {
    +			@Override
    +			public void mapPartition(Iterable<Tuple2<IntValue, IntValue>> values, Collector<Boolean> out) throws Exception {
    +				boolean boo = true;
    +				for (Tuple2<IntValue, IntValue> s : values) {
    +					IntValue intValues= (IntValue)cd.getBucketBoundary(getRuntimeContext().getIndexOfThisSubtask(), 3)[0];
    +					if (s.f0.getValue() > intValues.getValue()) {
    +						boo = false;
    +					}
    +				}
    +				out.collect(boo);
    +			}
    +		});
    +
    +		List<Boolean> result = out1.collect();
    +		for (int i = 0; i < result.size(); i++) {
    --- End diff --
    
    If the `MapPartition` only returns a single boolean at the end, we do not need to iterate of this list.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2997] Support range partition with user...

Posted by gallenvara <gi...@git.apache.org>.
Github user gallenvara commented on the pull request:

    https://github.com/apache/flink/pull/1776#issuecomment-194310643
  
    @fhueske @ChengXiangLi  Can you please help me with review work? The error of CI build failure is not relevant.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2997] Support range partition with user...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1776#discussion_r55659105
  
    --- Diff: flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java ---
    @@ -82,6 +88,7 @@ public PartitionOperator(DataSet<T> input, Keys<T> pKeys, Partitioner<?> customP
     		this.pKeys = pKeys;
     		this.partitionLocationName = partitionLocationName;
     		this.customPartitioner = customPartitioner;
    +		this.distribution = distribution;
    --- End diff --
    
    Please add a check that a `distribution` is only set if `pMethod == PartitionMethod.RANGE`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2997] Support range partition with user...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1776#discussion_r56907132
  
    --- Diff: flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java ---
    @@ -45,35 +46,48 @@
     	private final PartitionMethod pMethod;
     	private final String partitionLocationName;
     	private final Partitioner<?> customPartitioner;
    -	
    -	
    +	private final DataDistribution distribution;
    +
    +
     	public PartitionOperator(DataSet<T> input, PartitionMethod pMethod, Keys<T> pKeys, String partitionLocationName) {
    -		this(input, pMethod, pKeys, null, null, partitionLocationName);
    +		this(input, pMethod, pKeys, null, null, null, partitionLocationName);
     	}
    -	
    +
    +	public PartitionOperator(DataSet<T> input, PartitionMethod pMethod, Keys<T> pKeys, DataDistribution distribution, String partitionLocationName) {
    +		this(input, pMethod, pKeys, null, null, distribution, partitionLocationName);
    +	}
    +
     	public PartitionOperator(DataSet<T> input, PartitionMethod pMethod, String partitionLocationName) {
    -		this(input, pMethod, null, null, null, partitionLocationName);
    +		this(input, pMethod, null, null, null, null, partitionLocationName);
     	}
     	
     	public PartitionOperator(DataSet<T> input, Keys<T> pKeys, Partitioner<?> customPartitioner, String partitionLocationName) {
    -		this(input, PartitionMethod.CUSTOM, pKeys, customPartitioner, null, partitionLocationName);
    +		this(input, PartitionMethod.CUSTOM, pKeys, customPartitioner, null, null, partitionLocationName);
     	}
     	
    -	public <P> PartitionOperator(DataSet<T> input, Keys<T> pKeys, Partitioner<P> customPartitioner, 
    +	public <P> PartitionOperator(DataSet<T> input, Keys<T> pKeys, Partitioner<P> customPartitioner,
     			TypeInformation<P> partitionerTypeInfo, String partitionLocationName)
     	{
    -		this(input, PartitionMethod.CUSTOM, pKeys, customPartitioner, partitionerTypeInfo, partitionLocationName);
    +		this(input, PartitionMethod.CUSTOM, pKeys, customPartitioner, partitionerTypeInfo, null, partitionLocationName);
     	}
     	
    -	private <P> PartitionOperator(DataSet<T> input, PartitionMethod pMethod, Keys<T> pKeys, Partitioner<P> customPartitioner, 
    -			TypeInformation<P> partitionerTypeInfo, String partitionLocationName)
    +	private <P> PartitionOperator(DataSet<T> input, PartitionMethod pMethod, Keys<T> pKeys, Partitioner<P> customPartitioner,
    +			TypeInformation<P> partitionerTypeInfo, DataDistribution distribution, String partitionLocationName)
     	{
     		super(input, input.getType());
     		
     		Preconditions.checkNotNull(pMethod);
     		Preconditions.checkArgument(pKeys != null || pMethod == PartitionMethod.REBALANCE, "Partitioning requires keys");
     		Preconditions.checkArgument(pMethod != PartitionMethod.CUSTOM || customPartitioner != null, "Custom partioning requires a partitioner.");
    -
    +		Preconditions.checkArgument(distribution == null || pMethod == PartitionMethod.RANGE, "Customized data distribution is only neccessary for range partition.");
    +		
    +		if (distribution != null) {
    +			Preconditions.checkArgument(distribution.getNumberOfFields() == pKeys.getNumberOfKeyFields(), "The number of key fields in the distribution and range partitioner should be the same.");
    +			for (int i = 0; i < pKeys.getNumberOfKeyFields(); i++) {
    +				Preconditions.checkArgument(distribution.getKeyTypes()[i].equals(pKeys.getKeyFieldTypes()[i]), "The types of key from the distribution and range partitioner are not equal.");
    --- End diff --
    
    can you fetch the key type arrays only once and not for every key?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2997] Support range partition with user...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1776#discussion_r57141500
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java ---
    @@ -94,12 +104,10 @@ public void testRangeWithDistribution2() throws Exception{
     			@Override
     			public void mapPartition(Iterable<Tuple3<Integer, Integer, String>> values, Collector<Boolean> out) throws Exception {
     				int partitionIndex = getRuntimeContext().getIndexOfThisSubtask();
    -
     				for (Tuple3<Integer, Integer, String> s : values) {
    -					if (s.f0 <= partitionIndex * (partitionIndex + 1) / 2 ||
    -							s.f0 > (partitionIndex + 1) * (partitionIndex + 2) / 2 ||
    -							s.f1 - 1 != partitionIndex) {
    -						fail("Record was not correctly partitioned: " + s.toString());
    +					
    +					if ((s.f0 > partitionIndex + 1) || ((s.f0 == partitionIndex + 1) && (s.f1 > dist.rightBoundary[partitionIndex]))) {
    --- End diff --
    
    The test checks only one bucket boundary. We need to check 
    - the upper boundary if `partitionIndex == 0`
    - lower and upper boundaries if `0 < `partitionIndex` < 4`
    - the lower boundary if `partitionIndex == 4.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2997] Support range partition with user...

Posted by gallenvara <gi...@git.apache.org>.
Github user gallenvara commented on the pull request:

    https://github.com/apache/flink/pull/1776#issuecomment-197744696
  
    Hi, @fhueske . Thanks a lot for your patient review. I have modified the code based on your advice.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2997] Support range partition with user...

Posted by gallenvara <gi...@git.apache.org>.
Github user gallenvara commented on the pull request:

    https://github.com/apache/flink/pull/1776#issuecomment-196245218
  
    Hi, @fhueske . I have modified the relevant code.  I still use the generic class `CustomDistribution` for the tests because it is not flexible with build-in data, IMO. Also it is not good if mix the logic of distribution to the tests. Can you please help with review?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2997] Support range partition with user...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1776#discussion_r56661891
  
    --- Diff: flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java ---
    @@ -82,6 +88,7 @@ public PartitionOperator(DataSet<T> input, Keys<T> pKeys, Partitioner<?> customP
     		this.pKeys = pKeys;
     		this.partitionLocationName = partitionLocationName;
     		this.customPartitioner = customPartitioner;
    +		this.distribution = distribution;
    --- End diff --
    
    We should check here that the key types of `distribution.getKeyTypes()` are equal to `pKeys.getKeyFieldTypes()`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2997] Support range partition with user...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1776#discussion_r56344247
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java ---
    @@ -0,0 +1,161 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.test.javaApiOperators;
    +
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.common.functions.RichMapPartitionFunction;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.test.distribution.CustomDistribution;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.tuple.Tuple3;
    +import org.apache.flink.api.java.utils.DataSetUtils;
    +import org.apache.flink.types.IntValue;
    +import org.apache.flink.util.Collector;
    +import org.junit.Test;
    +
    +
    +import java.util.List;
    +
    +import static org.junit.Assert.assertTrue;
    +
    +
    +public class CustomDistributionITCase {
    +	
    +	@Test
    +	public void testPartitionWithDistribution1() throws Exception{
    +		/*
    +		 * Test the record partitioned rightly with one field according to the customized data distribution
    +		 */
    +
    +		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
    +
    +		DataSet<Tuple3<Integer, Integer, String>> input1 = env.fromElements(
    +				new Tuple3<>(1, 1, "Hi"),
    +				new Tuple3<>(1, 2, "Hello"),
    +				new Tuple3<>(1, 3, "Hello world"),
    +				new Tuple3<>(2, 4, "how are you?"),
    +				new Tuple3<>(2, 5, "I am fine."),
    +				new Tuple3<>(3, 6, "Luke Skywalker"),
    +				new Tuple3<>(4, 7, "Comment#1"),
    +				new Tuple3<>(4, 8, "Comment#2"),
    +				new Tuple3<>(4, 9, "Comment#3"),
    +				new Tuple3<>(5, 10, "Comment#4"));
    +
    +		final IntValue[] keys = new IntValue[3];
    +
    +		for (int i = 0; i < 3; i++) {
    +			keys[i] = new IntValue((i + 1) * 2);
    +		}
    +
    +		final CustomDistribution cd = new CustomDistribution(keys);
    +
    +		env.setParallelism(3);
    +
    +		DataSet<Boolean> out1 = DataSetUtils.partitionByRange(input1.map(new MapFunction<Tuple3<Integer, Integer, String>, Tuple2<IntValue, IntValue>>() {
    +			@Override
    +			public Tuple2<IntValue, IntValue> map(Tuple3<Integer, Integer, String> value) throws Exception {
    --- End diff --
    
    This map function is not necessary, if the `DataDistribution` returns `Integer` instead of `IntValue`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2997] Support range partition with user...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the pull request:

    https://github.com/apache/flink/pull/1776#issuecomment-194786495
  
    Thanks @gallenvara for opening the PR. Looks mostly good but the tests need to be improved, IMO.
    
    It would also be good to extend the `DataDistribution` interface by adding a method `TypeInformation[] getKeyTypes()`. This will allow us to validate the key types of the `DataDistribution` and the specified keys in the `PartitionOperator`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2997] Support range partition with user...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1776#discussion_r55801308
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java ---
    @@ -0,0 +1,142 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.test.javaApiOperators;
    +
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.test.distribution.CustomDistribution;
    +import org.apache.flink.api.common.functions.MapPartitionFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.tuple.Tuple1;
    +import org.apache.flink.api.java.tuple.Tuple3;
    +import org.apache.flink.api.java.utils.DataSetUtils;
    +import org.apache.flink.types.IntValue;
    +import org.apache.flink.util.Collector;
    +import org.junit.Test;
    +
    +
    +import static org.junit.Assert.assertEquals;
    +
    +
    +public class CustomDistributionITCase {
    +	
    +	@Test
    +	public void testRangeWithDistribution1() throws Exception{
    +
    +		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
    +
    +		DataSet<Tuple3<Integer, Integer, String>> input1 = env.fromElements(
    +				new Tuple3<>(1, 1, "Hi"),
    +				new Tuple3<>(1, 2, "Hello"),
    +				new Tuple3<>(1, 3, "Hello world"),
    +				new Tuple3<>(2, 4, "Hello world, how are you?"),
    +				new Tuple3<>(2, 5, "I am fine."),
    +				new Tuple3<>(3, 6, "Luke Skywalker"),
    +				new Tuple3<>(4, 7, "Comment#1"),
    +				new Tuple3<>(4, 8, "Comment#2"),
    +				new Tuple3<>(4, 9, "Comment#3"),
    +				new Tuple3<>(5, 10, "Comment#4"));
    +
    +		IntValue[][] keys = new IntValue[2][2];
    +
    +		env.setParallelism(3);
    +
    +		for (int i = 0; i < 2; i++)
    +		{
    +			for (int j = 0; j < 2; j++)
    +			{
    +				keys[i][j] = new IntValue(i + j);
    +			}
    +		}
    +
    +		CustomDistribution cd = new CustomDistribution(keys);
    +
    +		DataSet<Tuple2<IntValue, IntValue>> out1 = DataSetUtils.partitionByRange(input1.mapPartition(
    +				new MapPartitionFunction<Tuple3<Integer, Integer, String>, Tuple2<IntValue, IntValue>>() {
    +			public void mapPartition(Iterable<Tuple3<Integer, Integer, String>> values, Collector<Tuple2<IntValue, IntValue>> out) {
    +				IntValue key1;
    +				IntValue key2;
    +				for (Tuple3<Integer, Integer, String> s : values) {
    +					key1 = new IntValue(s.f0);
    +					key2 = new IntValue(s.f1);
    +					out.collect(new Tuple2<>(key1, key2));
    +				}
    +			}
    +		}), cd, 0, 1).groupBy(0).sum(0);
    --- End diff --
    
    If you implement a `RichMapPartitionFunction` you can get the index of the current subtask by calling `getRuntimeContext().getIndexOfThisSubtask()`. With the subtask index and knowledge about the distribution, you can check that all data was sent to the right subtask.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2997] Support range partition with user...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1776#discussion_r56471182
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java ---
    @@ -0,0 +1,161 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.test.javaApiOperators;
    +
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.common.functions.RichMapPartitionFunction;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.test.distribution.CustomDistribution;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.tuple.Tuple3;
    +import org.apache.flink.api.java.utils.DataSetUtils;
    +import org.apache.flink.types.IntValue;
    +import org.apache.flink.util.Collector;
    +import org.junit.Test;
    +
    +
    +import java.util.List;
    +
    +import static org.junit.Assert.assertTrue;
    +
    +
    +public class CustomDistributionITCase {
    +	
    +	@Test
    +	public void testPartitionWithDistribution1() throws Exception{
    +		/*
    +		 * Test the record partitioned rightly with one field according to the customized data distribution
    +		 */
    +
    +		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
    +
    +		DataSet<Tuple3<Integer, Integer, String>> input1 = env.fromElements(
    +				new Tuple3<>(1, 1, "Hi"),
    +				new Tuple3<>(1, 2, "Hello"),
    +				new Tuple3<>(1, 3, "Hello world"),
    +				new Tuple3<>(2, 4, "how are you?"),
    +				new Tuple3<>(2, 5, "I am fine."),
    +				new Tuple3<>(3, 6, "Luke Skywalker"),
    +				new Tuple3<>(4, 7, "Comment#1"),
    +				new Tuple3<>(4, 8, "Comment#2"),
    +				new Tuple3<>(4, 9, "Comment#3"),
    +				new Tuple3<>(5, 10, "Comment#4"));
    +
    +		final IntValue[] keys = new IntValue[3];
    +
    +		for (int i = 0; i < 3; i++) {
    +			keys[i] = new IntValue((i + 1) * 2);
    +		}
    +
    +		final CustomDistribution cd = new CustomDistribution(keys);
    +
    +		env.setParallelism(3);
    +
    +		DataSet<Boolean> out1 = DataSetUtils.partitionByRange(input1.map(new MapFunction<Tuple3<Integer, Integer, String>, Tuple2<IntValue, IntValue>>() {
    +			@Override
    +			public Tuple2<IntValue, IntValue> map(Tuple3<Integer, Integer, String> value) throws Exception {
    +				IntValue key1;
    +				IntValue key2;
    +				key1 = new IntValue(value.f0);
    +				key2 = new IntValue(value.f1);
    +				return new Tuple2<>(key1, key2);
    +			}
    +		}), cd, 0).mapPartition(new RichMapPartitionFunction<Tuple2<IntValue, IntValue>, Boolean>() {
    +			@Override
    +			public void mapPartition(Iterable<Tuple2<IntValue, IntValue>> values, Collector<Boolean> out) throws Exception {
    +				boolean boo = true;
    +				for (Tuple2<IntValue, IntValue> s : values) {
    +					IntValue intValues= (IntValue)cd.getBucketBoundary(getRuntimeContext().getIndexOfThisSubtask(), 3)[0];
    +					if (s.f0.getValue() > intValues.getValue()) {
    +						boo = false;
    +					}
    +				}
    +				out.collect(boo);
    +			}
    +		});
    +
    +		List<Boolean> result = out1.collect();
    +		for (int i = 0; i < result.size(); i++) {
    --- End diff --
    
    Yes. I just thought, that MapPartitionFunction could return only a single boolean values. `true` if all records are correctly partitioned, `false` otherwise. Then we could avoid iterating over the result list.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2997] Support range partition with user...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the pull request:

    https://github.com/apache/flink/pull/1776#issuecomment-197057859
  
    Sorry @gallenvara, I was busy the last days. I will look into this PR tomorrow.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2997] Support range partition with user...

Posted by gallenvara <gi...@git.apache.org>.
Github user gallenvara commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1776#discussion_r57143853
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java ---
    @@ -0,0 +1,230 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.test.javaApiOperators;
    +
    +import org.apache.flink.api.common.distributions.DataDistribution;
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.common.functions.RichMapPartitionFunction;
    +import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.io.DiscardingOutputFormat;
    +import org.apache.flink.api.java.tuple.Tuple3;
    +import org.apache.flink.api.java.utils.DataSetUtils;
    +import org.apache.flink.core.memory.DataInputView;
    +import org.apache.flink.core.memory.DataOutputView;
    +import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
    +import org.apache.flink.util.Collector;
    +import org.junit.Test;
    +
    +
    +import java.io.IOException;
    +
    +import static org.junit.Assert.fail;
    +
    +
    +public class CustomDistributionITCase {
    +	
    +	@Test
    +	public void testPartitionWithDistribution1() throws Exception{
    +		/*
    +		 * Test the record partitioned rightly with one field according to the customized data distribution
    +		 */
    +
    +		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
    +
    +		DataSet<Tuple3<Integer, Long, String>> input1 = CollectionDataSets.get3TupleDataSet(env);
    +		final TestDataDist1 dist = new TestDataDist1(1);
    +
    +		env.setParallelism(dist.getParallelism());
    +
    +		DataSet<Boolean> result = DataSetUtils.partitionByRange(input1, dist, 0).mapPartition(new RichMapPartitionFunction<Tuple3<Integer, Long, String>, Boolean>() {
    +			@Override
    +			public void mapPartition(Iterable<Tuple3<Integer, Long, String>> values, Collector<Boolean> out) throws Exception {
    +				int partitionIndex = getRuntimeContext().getIndexOfThisSubtask();
    +
    +				for (Tuple3<Integer, Long, String> s : values) {
    +					if ((s.f0 - 1) / 7 != partitionIndex) {
    +						fail("Record was not correctly partitioned: " + s.toString());
    +					}
    +				}
    +			}
    +		});
    +
    +		result.output(new DiscardingOutputFormat()); 
    +		env.execute();
    +	}
    +
    +	@Test
    +	public void testRangeWithDistribution2() throws Exception{
    +		/*
    +		 * Test the record partitioned rightly with two fields according to the customized data distribution
    +		 */
    +
    +		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
    +
    +		DataSet<Tuple3<Integer, Long, String>> input1 = env.fromElements(
    +						new Tuple3<>(1, 5L, "Hi"),
    +						new Tuple3<>(1, 11L, "Hello"),
    +						new Tuple3<>(2, 3L, "World"),
    +						new Tuple3<>(2, 13L, "Hello World"),
    +						new Tuple3<>(3, 8L, "Say"),
    +						new Tuple3<>(4, 0L, "Why"),
    +						new Tuple3<>(4, 2L, "Java"),
    +						new Tuple3<>(4, 11L, "Say Hello"),
    +						new Tuple3<>(5, 2L, "Hi Java"));
    +
    +		final TestDataDist2 dist = new TestDataDist2(2);
    +
    +		env.setParallelism(dist.getParallelism());
    +
    +		DataSet<Boolean> result = DataSetUtils.partitionByRange(input1.map(new MapFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, Integer, String>>() {
    +			@Override
    +			public Tuple3<Integer, Integer, String> map(Tuple3<Integer, Long, String> value) throws Exception {
    +				return new Tuple3<>(value.f0, value.f1.intValue(), value.f2);
    +			}
    +		}), dist, 0, 1).mapPartition(new RichMapPartitionFunction<Tuple3<Integer, Integer, String>, Boolean>() {
    +			@Override
    +			public void mapPartition(Iterable<Tuple3<Integer, Integer, String>> values, Collector<Boolean> out) throws Exception {
    +				int partitionIndex = getRuntimeContext().getIndexOfThisSubtask();
    +				for (Tuple3<Integer, Integer, String> s : values) {
    +					
    +					if ((s.f0 > partitionIndex + 1) || ((s.f0 == partitionIndex + 1) && (s.f1 > dist.rightBoundary[partitionIndex]))) {
    --- End diff --
    
    Sorry to forget it again :(
    I will modify the code now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2997] Support range partition with user...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1776#discussion_r55801128
  
    --- Diff: flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java ---
    @@ -82,6 +88,7 @@ public PartitionOperator(DataSet<T> input, Keys<T> pKeys, Partitioner<?> customP
     		this.pKeys = pKeys;
     		this.partitionLocationName = partitionLocationName;
     		this.customPartitioner = customPartitioner;
    +		this.distribution = distribution;
    --- End diff --
    
    The check would prevent an invalid parameter combination and also shows readers of the code that such a combination is not valid. Hence, I think the check should be added.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2997] Support range partition with user...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1776#discussion_r56665371
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/distribution/TestDataDist.java ---
    @@ -0,0 +1,77 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.test.distribution;
    +
    +import org.apache.flink.api.common.distributions.DataDistribution;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.java.typeutils.TypeExtractor;
    +import org.apache.flink.core.memory.DataInputView;
    +import org.apache.flink.core.memory.DataOutputView;
    +
    +import java.io.IOException;
    +
    +/**
    + * The class is used to do the tests of range partition with customed data distribution.
    + */
    +public class TestDataDist implements DataDistribution {
    --- End diff --
    
    This class is only used by `CustomDistributionITCase`. 
    Can you make it a `public static` inlined class to have all code for the test (including the hard coded bucket boundaries) in one class?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2997] Support range partition with user...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1776#discussion_r56669620
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java ---
    @@ -0,0 +1,116 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.test.javaApiOperators;
    +
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.common.functions.ReduceFunction;
    +import org.apache.flink.api.common.functions.RichMapPartitionFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.tuple.Tuple3;
    +import org.apache.flink.api.java.utils.DataSetUtils;
    +import org.apache.flink.test.distribution.TestDataDist;
    +import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
    +import org.apache.flink.util.Collector;
    +import org.junit.Test;
    +
    +
    +import static org.junit.Assert.assertTrue;
    +
    +
    +public class CustomDistributionITCase {
    +	
    +	@Test
    +	public void testPartitionWithDistribution1() throws Exception{
    +		/*
    +		 * Test the record partitioned rightly with one field according to the customized data distribution
    +		 */
    +
    +		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
    +
    +		DataSet<Tuple3<Integer, Long, String>> input1 = CollectionDataSets.get3TupleDataSet(env);
    +		final TestDataDist dist = new TestDataDist(1);
    +
    +		env.setParallelism(dist.getParallelism());
    +
    +		DataSet<Boolean> result = DataSetUtils.partitionByRange(input1, dist, 0).mapPartition(new RichMapPartitionFunction<Tuple3<Integer, Long, String>, Boolean>() {
    +			@Override
    +			public void mapPartition(Iterable<Tuple3<Integer, Long, String>> values, Collector<Boolean> out) throws Exception {
    +				int partitionIndex = getRuntimeContext().getIndexOfThisSubtask();
    +				boolean checkPartition = true;
    +
    +				for (Tuple3<Integer, Long, String> s : values) {
    +					if (s.f0 > (partitionIndex + 1) * 7 || s.f0 < (partitionIndex) * 7) {
    --- End diff --
    
    This check is not working correctly. You can replace it by this one which asserts within the `MapPartitionFunction`:
    ```
    if ( (s.f0 - 1) / 7 != partitionIndex) {
      fail("Record was not correctly partitioned: "+s.toString());
    }
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2997] Support range partition with user...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1776#discussion_r56664085
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/distribution/TestDataDist.java ---
    @@ -0,0 +1,77 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.test.distribution;
    +
    +import org.apache.flink.api.common.distributions.DataDistribution;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.java.typeutils.TypeExtractor;
    +import org.apache.flink.core.memory.DataInputView;
    +import org.apache.flink.core.memory.DataOutputView;
    +
    +import java.io.IOException;
    +
    +/**
    + * The class is used to do the tests of range partition with customed data distribution.
    + */
    +public class TestDataDist implements DataDistribution {
    +
    +	private int dim;
    +
    +	public TestDataDist() {}
    +
    +	/**
    +	 * Constructor of the customized distribution for range partition.
    +	 * @param dim the number of the fields.
    +	 */
    +	public TestDataDist(int dim) {
    +		this.dim = dim;
    +	}
    +
    +	public int getParallelism() {
    +		return 3;
    +	}
    +
    +	@Override
    +	public Object[] getBucketBoundary(int bucketNum, int totalNumBuckets) {
    +		if (dim == 1) {
    +			return new Integer[]{(bucketNum + 1) * 7};
    +		}
    +		return new Integer[]{(bucketNum + 1) * 7, (bucketNum) * 2 + 3};
    +	}
    +
    +	@Override
    +	public int getNumberOfFields() {
    +		return this.dim;
    +	}
    +	
    +	@Override
    +	public TypeInformation[] getKeyTypes() {
    +		return new TypeInformation[]{TypeExtractor.getForClass(Integer.class)};
    --- End diff --
    
    `BasicTypeInfo.INT_TYPE_INFO`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2997] Support range partition with user...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1776#discussion_r56961893
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java ---
    @@ -0,0 +1,184 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.test.javaApiOperators;
    +
    +import org.apache.flink.api.common.distributions.DataDistribution;
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.common.functions.RichMapPartitionFunction;
    +import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.io.DiscardingOutputFormat;
    +import org.apache.flink.api.java.tuple.Tuple3;
    +import org.apache.flink.api.java.utils.DataSetUtils;
    +import org.apache.flink.core.memory.DataInputView;
    +import org.apache.flink.core.memory.DataOutputView;
    +import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
    +import org.apache.flink.util.Collector;
    +import org.junit.Test;
    +
    +
    +import java.io.IOException;
    +
    +import static org.junit.Assert.fail;
    +
    +
    +public class CustomDistributionITCase {
    +	
    +	@Test
    +	public void testPartitionWithDistribution1() throws Exception{
    +		/*
    +		 * Test the record partitioned rightly with one field according to the customized data distribution
    +		 */
    +
    +		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
    +
    +		DataSet<Tuple3<Integer, Long, String>> input1 = CollectionDataSets.get3TupleDataSet(env);
    +		final TestDataDist dist = new TestDataDist(1);
    +
    +		env.setParallelism(dist.getParallelism());
    +
    +		DataSet<Boolean> result = DataSetUtils.partitionByRange(input1, dist, 0).mapPartition(new RichMapPartitionFunction<Tuple3<Integer, Long, String>, Boolean>() {
    +			@Override
    +			public void mapPartition(Iterable<Tuple3<Integer, Long, String>> values, Collector<Boolean> out) throws Exception {
    +				int partitionIndex = getRuntimeContext().getIndexOfThisSubtask();
    +
    +				for (Tuple3<Integer, Long, String> s : values) {
    +					if ((s.f0 - 1) / 7 != partitionIndex) {
    +						fail("Record was not correctly partitioned: " + s.toString());
    +					}
    +				}
    +			}
    +		});
    +
    +		result.output(new DiscardingOutputFormat()); 
    +		env.execute();
    +	}
    +
    +	@Test
    +	public void testRangeWithDistribution2() throws Exception{
    +		/*
    +		 * Test the record partitioned rightly with two fields according to the customized data distribution
    +		 */
    +
    +		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
    +
    +		DataSet<Tuple3<Integer, Long, String>> input1 = CollectionDataSets.get3TupleDataSet(env);
    +		final TestDataDist dist = new TestDataDist(2);
    +
    +		env.setParallelism(dist.getParallelism());
    +
    +		DataSet<Boolean> result = DataSetUtils.partitionByRange(input1.map(new MapFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, Integer, String>>() {
    +			@Override
    +			public Tuple3<Integer, Integer, String> map(Tuple3<Integer, Long, String> value) throws Exception {
    +				return new Tuple3<>(value.f0, value.f1.intValue(), value.f2);
    +			}
    +		}), dist, 0, 1).mapPartition(new RichMapPartitionFunction<Tuple3<Integer, Integer, String>, Boolean>() {
    +			@Override
    +			public void mapPartition(Iterable<Tuple3<Integer, Integer, String>> values, Collector<Boolean> out) throws Exception {
    +				int partitionIndex = getRuntimeContext().getIndexOfThisSubtask();
    +
    +				for (Tuple3<Integer, Integer, String> s : values) {
    +					if (s.f0 <= partitionIndex * (partitionIndex + 1) / 2 ||
    +							s.f0 > (partitionIndex + 1) * (partitionIndex + 2) / 2 ||
    +							s.f1 - 1 != partitionIndex) {
    +						fail("Record was not correctly partitioned: " + s.toString());
    +					}
    +				}
    +			}
    +		});
    +
    +		result.output(new DiscardingOutputFormat());
    +		env.execute();
    +	}
    +
    +	/**
    +	 * The class is used to do the tests of range partition with customed data distribution.
    +	 */
    +	public static class TestDataDist implements DataDistribution {
    --- End diff --
    
    The two distributions defined by this class have not a lot in common. How about we define two classes, one for each distribution?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2997] Support range partition with user...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1776#discussion_r55661252
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java ---
    @@ -0,0 +1,142 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.test.javaApiOperators;
    +
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.test.distribution.CustomDistribution;
    +import org.apache.flink.api.common.functions.MapPartitionFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.tuple.Tuple1;
    +import org.apache.flink.api.java.tuple.Tuple3;
    +import org.apache.flink.api.java.utils.DataSetUtils;
    +import org.apache.flink.types.IntValue;
    +import org.apache.flink.util.Collector;
    +import org.junit.Test;
    +
    +
    +import static org.junit.Assert.assertEquals;
    +
    +
    +public class CustomDistributionITCase {
    +	
    +	@Test
    +	public void testRangeWithDistribution1() throws Exception{
    +
    +		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
    +
    +		DataSet<Tuple3<Integer, Integer, String>> input1 = env.fromElements(
    +				new Tuple3<>(1, 1, "Hi"),
    +				new Tuple3<>(1, 2, "Hello"),
    +				new Tuple3<>(1, 3, "Hello world"),
    +				new Tuple3<>(2, 4, "Hello world, how are you?"),
    +				new Tuple3<>(2, 5, "I am fine."),
    +				new Tuple3<>(3, 6, "Luke Skywalker"),
    +				new Tuple3<>(4, 7, "Comment#1"),
    +				new Tuple3<>(4, 8, "Comment#2"),
    +				new Tuple3<>(4, 9, "Comment#3"),
    +				new Tuple3<>(5, 10, "Comment#4"));
    +
    +		IntValue[][] keys = new IntValue[2][2];
    +
    +		env.setParallelism(3);
    +
    +		for (int i = 0; i < 2; i++)
    +		{
    +			for (int j = 0; j < 2; j++)
    +			{
    +				keys[i][j] = new IntValue(i + j);
    +			}
    +		}
    +
    +		CustomDistribution cd = new CustomDistribution(keys);
    +
    +		DataSet<Tuple2<IntValue, IntValue>> out1 = DataSetUtils.partitionByRange(input1.mapPartition(
    +				new MapPartitionFunction<Tuple3<Integer, Integer, String>, Tuple2<IntValue, IntValue>>() {
    +			public void mapPartition(Iterable<Tuple3<Integer, Integer, String>> values, Collector<Tuple2<IntValue, IntValue>> out) {
    +				IntValue key1;
    +				IntValue key2;
    +				for (Tuple3<Integer, Integer, String> s : values) {
    +					key1 = new IntValue(s.f0);
    +					key2 = new IntValue(s.f1);
    +					out.collect(new Tuple2<>(key1, key2));
    +				}
    +			}
    +		}), cd, 0, 1).groupBy(0).sum(0);
    +
    +		String expected = "[(1,3), (4,5), (2,2), (3,6), (5,10), (12,9)]";
    +		assertEquals(expected, out1.collect().toString());
    +	}
    +
    +	@Test
    +	public void testRangeWithDistribution2() throws Exception{
    --- End diff --
    
    Comments of the previous tests apply to this method as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2997] Support range partition with user...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1776#discussion_r56343484
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java ---
    @@ -0,0 +1,161 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.test.javaApiOperators;
    +
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.common.functions.RichMapPartitionFunction;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.test.distribution.CustomDistribution;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.tuple.Tuple3;
    +import org.apache.flink.api.java.utils.DataSetUtils;
    +import org.apache.flink.types.IntValue;
    +import org.apache.flink.util.Collector;
    +import org.junit.Test;
    +
    +
    +import java.util.List;
    +
    +import static org.junit.Assert.assertTrue;
    +
    +
    +public class CustomDistributionITCase {
    +	
    +	@Test
    +	public void testPartitionWithDistribution1() throws Exception{
    +		/*
    +		 * Test the record partitioned rightly with one field according to the customized data distribution
    +		 */
    +
    +		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
    +
    +		DataSet<Tuple3<Integer, Integer, String>> input1 = env.fromElements(
    --- End diff --
    
    Can you use a built-in data sets from `CollectionDataSets`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2997] Support range partition with user...

Posted by ChengXiangLi <gi...@git.apache.org>.
Github user ChengXiangLi commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1776#discussion_r55639603
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java ---
    @@ -0,0 +1,137 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.test.javaApiOperators;
    +
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.test.distribution.CustomDistribution;
    +import org.apache.flink.api.common.functions.MapPartitionFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.tuple.Tuple1;
    +import org.apache.flink.api.java.tuple.Tuple3;
    +import org.apache.flink.api.java.utils.DataSetUtils;
    +import org.apache.flink.types.IntValue;
    +import org.apache.flink.util.Collector;
    +import org.junit.Test;
    +
    +
    +import static org.junit.Assert.assertEquals;
    +
    +
    +public class CustomDistributionITCase {
    +	
    +	@Test
    +	public void testRangeWithDistribution1() throws Exception{
    +
    +		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
    +
    +		DataSet<Tuple3<Integer, Integer, String>> input1 = env.fromElements(
    +				new Tuple3<>(1, 1, "Hi"),
    +				new Tuple3<>(1, 2, "Hello"),
    +				new Tuple3<>(1, 3, "Hello world"),
    +				new Tuple3<>(2, 4, "Hello world, how are you?"),
    +				new Tuple3<>(2, 5, "I am fine."),
    +				new Tuple3<>(3, 6, "Luke Skywalker"),
    +				new Tuple3<>(4, 7, "Comment#1"),
    +				new Tuple3<>(4, 8, "Comment#2"),
    +				new Tuple3<>(4, 9, "Comment#3"),
    +				new Tuple3<>(5, 10, "Comment#4"));
    +		
    +		IntValue[] keys = new IntValue[4];
    +		
    +		env.setParallelism(5);
    +
    +		for (int i = 0; i < keys.length; i++)
    +		{
    +			keys[i] = new IntValue(i + 1);
    +		}
    +
    +		CustomDistribution cd = new CustomDistribution(keys);
    +
    +		DataSet<Tuple1<IntValue>> out1 = DataSetUtils.partitionByRange(input1.mapPartition(
    +				new MapPartitionFunction<Tuple3<Integer, Integer, String>, Tuple1<IntValue>>() {
    +			public void mapPartition(Iterable<Tuple3<Integer, Integer, String>> values, Collector<Tuple1<IntValue>> out) {
    +				IntValue key1;
    +				for (Tuple3<Integer, Integer, String> s : values) {
    +					key1 = new IntValue(s.f0);
    +					out.collect(new Tuple1<>(key1));
    +				}
    +			}
    +		}), cd, 0).groupBy(0).sum(0);
    +
    +		String expected = "[(3), (4), (3), (12), (5)]";
    +		assertEquals(expected, out1.collect().toString());
    +	}
    +
    +	@Test
    +	public void testRangeWithDistribution2() throws Exception{
    +
    +		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
    +
    +		DataSet<Tuple2<Tuple2<Integer, Integer>, String>> input1 = env.fromElements(
    +				new Tuple2<>(new Tuple2<>(1, 1), "Hi"),
    +				new Tuple2<>(new Tuple2<>(1, 2), "Hello"),
    +				new Tuple2<>(new Tuple2<>(1, 3), "Hello world"),
    +				new Tuple2<>(new Tuple2<>(2, 4), "Hello world, how are you?"),
    +				new Tuple2<>(new Tuple2<>(2, 5), "I am fine."),
    +				new Tuple2<>(new Tuple2<>(3, 6), "Luke Skywalker"),
    +				new Tuple2<>(new Tuple2<>(4, 7), "Comment#1"),
    +				new Tuple2<>(new Tuple2<>(4, 8), "Comment#2"),
    +				new Tuple2<>(new Tuple2<>(4, 9), "Comment#3"),
    +				new Tuple2<>(new Tuple2<>(5, 10), "Comment#4"));
    +
    +		IntValue[][] keys = new IntValue[2][2];
    +
    +		env.setParallelism(3);
    +
    +		for (int i = 0; i < 2; i++)
    +		{
    +			for (int j = 0; j < 2; j++)
    +			{
    +				keys[i][j] = new IntValue(i + j);
    +			}
    +		}
    +
    +		CustomDistribution cd = new CustomDistribution(keys);
    +
    +		DataSet<Tuple1<IntValue>> out1= DataSetUtils.partitionByRange(input1.mapPartition(
    +				new MapPartitionFunction<Tuple2<Tuple2<Integer, Integer>, String>, Tuple1<Tuple2<IntValue, IntValue>>>() {
    +					public void mapPartition(Iterable<Tuple2<Tuple2<Integer, Integer>, String>> values, Collector<Tuple1<Tuple2<IntValue, IntValue>>> out) {
    +						IntValue key1;
    +						IntValue key2;
    +						for (Tuple2<Tuple2<Integer, Integer>, String> s : values) {
    +							key1 = new IntValue(s.f0.f0);
    +							key2 = new IntValue(s.f0.f1);
    +							out.collect(new Tuple1<>(new Tuple2<>(key1, key2)));
    +						}
    +					}
    +				}), cd, 0).mapPartition(new MapPartitionFunction<Tuple1<Tuple2<IntValue, IntValue>>, Tuple1<IntValue>>() {
    +			public void mapPartition(Iterable<Tuple1<Tuple2<IntValue, IntValue>>> values, Collector<Tuple1<IntValue>> out) {
    +				Tuple1<IntValue> key;
    +				for (Tuple1<Tuple2<IntValue, IntValue>> s : values) {
    +					key = new Tuple1<>(s.f0.f0);
    +					out.collect(key);
    +				}
    +			}
    +		}).groupBy(0).sum(0);
    +
    +		String expected = "[(1), (4), (2), (3), (5), (12)]";
    +		assertEquals(expected, out1.collect().toString());
    +	}
    --- End diff --
    
    Would you add a test which use 2 fields as partition key?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2997] Support range partition with user...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1776#discussion_r56661477
  
    --- Diff: flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/MockDistribution.java ---
    @@ -38,6 +40,11 @@ public int getNumberOfFields() {
     	}
     
     	@Override
    +	public TypeInformation[] getKeyTypes() {
    +		return new TypeInformation[]{TypeExtractor.getForClass(Integer.class)};
    --- End diff --
    
    This can be simplified to `BasicTypeInfo.INT_TYPE_INFO`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2997] Support range partition with user...

Posted by gallenvara <gi...@git.apache.org>.
Github user gallenvara commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1776#discussion_r56532523
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java ---
    @@ -0,0 +1,161 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.test.javaApiOperators;
    +
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.common.functions.RichMapPartitionFunction;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.test.distribution.CustomDistribution;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.tuple.Tuple3;
    +import org.apache.flink.api.java.utils.DataSetUtils;
    +import org.apache.flink.types.IntValue;
    +import org.apache.flink.util.Collector;
    +import org.junit.Test;
    +
    +
    +import java.util.List;
    +
    +import static org.junit.Assert.assertTrue;
    +
    +
    +public class CustomDistributionITCase {
    +	
    +	@Test
    +	public void testPartitionWithDistribution1() throws Exception{
    +		/*
    +		 * Test the record partitioned rightly with one field according to the customized data distribution
    +		 */
    +
    +		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
    +
    +		DataSet<Tuple3<Integer, Integer, String>> input1 = env.fromElements(
    +				new Tuple3<>(1, 1, "Hi"),
    +				new Tuple3<>(1, 2, "Hello"),
    +				new Tuple3<>(1, 3, "Hello world"),
    +				new Tuple3<>(2, 4, "how are you?"),
    +				new Tuple3<>(2, 5, "I am fine."),
    +				new Tuple3<>(3, 6, "Luke Skywalker"),
    +				new Tuple3<>(4, 7, "Comment#1"),
    +				new Tuple3<>(4, 8, "Comment#2"),
    +				new Tuple3<>(4, 9, "Comment#3"),
    +				new Tuple3<>(5, 10, "Comment#4"));
    +
    +		final IntValue[] keys = new IntValue[3];
    +
    +		for (int i = 0; i < 3; i++) {
    +			keys[i] = new IntValue((i + 1) * 2);
    +		}
    +
    +		final CustomDistribution cd = new CustomDistribution(keys);
    +
    +		env.setParallelism(3);
    +
    +		DataSet<Boolean> out1 = DataSetUtils.partitionByRange(input1.map(new MapFunction<Tuple3<Integer, Integer, String>, Tuple2<IntValue, IntValue>>() {
    +			@Override
    +			public Tuple2<IntValue, IntValue> map(Tuple3<Integer, Integer, String> value) throws Exception {
    +				IntValue key1;
    +				IntValue key2;
    +				key1 = new IntValue(value.f0);
    +				key2 = new IntValue(value.f1);
    +				return new Tuple2<>(key1, key2);
    +			}
    +		}), cd, 0).mapPartition(new RichMapPartitionFunction<Tuple2<IntValue, IntValue>, Boolean>() {
    +			@Override
    +			public void mapPartition(Iterable<Tuple2<IntValue, IntValue>> values, Collector<Boolean> out) throws Exception {
    +				boolean boo = true;
    +				for (Tuple2<IntValue, IntValue> s : values) {
    +					IntValue intValues= (IntValue)cd.getBucketBoundary(getRuntimeContext().getIndexOfThisSubtask(), 3)[0];
    +					if (s.f0.getValue() > intValues.getValue()) {
    +						boo = false;
    +					}
    +				}
    +				out.collect(boo);
    +			}
    +		});
    +
    +		List<Boolean> result = out1.collect();
    +		for (int i = 0; i < result.size(); i++) {
    --- End diff --
    
    Sorry @fhueske ,  I misunderstood your point and i have modified the code to avoid iterating. Can you give me some suggestions about other code? Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2997] Support range partition with user...

Posted by gallenvara <gi...@git.apache.org>.
Github user gallenvara commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1776#discussion_r55650289
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java ---
    @@ -0,0 +1,137 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.test.javaApiOperators;
    +
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.test.distribution.CustomDistribution;
    +import org.apache.flink.api.common.functions.MapPartitionFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.tuple.Tuple1;
    +import org.apache.flink.api.java.tuple.Tuple3;
    +import org.apache.flink.api.java.utils.DataSetUtils;
    +import org.apache.flink.types.IntValue;
    +import org.apache.flink.util.Collector;
    +import org.junit.Test;
    +
    +
    +import static org.junit.Assert.assertEquals;
    +
    +
    +public class CustomDistributionITCase {
    +	
    +	@Test
    +	public void testRangeWithDistribution1() throws Exception{
    +
    +		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
    +
    +		DataSet<Tuple3<Integer, Integer, String>> input1 = env.fromElements(
    +				new Tuple3<>(1, 1, "Hi"),
    +				new Tuple3<>(1, 2, "Hello"),
    +				new Tuple3<>(1, 3, "Hello world"),
    +				new Tuple3<>(2, 4, "Hello world, how are you?"),
    +				new Tuple3<>(2, 5, "I am fine."),
    +				new Tuple3<>(3, 6, "Luke Skywalker"),
    +				new Tuple3<>(4, 7, "Comment#1"),
    +				new Tuple3<>(4, 8, "Comment#2"),
    +				new Tuple3<>(4, 9, "Comment#3"),
    +				new Tuple3<>(5, 10, "Comment#4"));
    +		
    +		IntValue[] keys = new IntValue[4];
    +		
    +		env.setParallelism(5);
    +
    +		for (int i = 0; i < keys.length; i++)
    +		{
    +			keys[i] = new IntValue(i + 1);
    +		}
    +
    +		CustomDistribution cd = new CustomDistribution(keys);
    +
    +		DataSet<Tuple1<IntValue>> out1 = DataSetUtils.partitionByRange(input1.mapPartition(
    +				new MapPartitionFunction<Tuple3<Integer, Integer, String>, Tuple1<IntValue>>() {
    +			public void mapPartition(Iterable<Tuple3<Integer, Integer, String>> values, Collector<Tuple1<IntValue>> out) {
    +				IntValue key1;
    +				for (Tuple3<Integer, Integer, String> s : values) {
    +					key1 = new IntValue(s.f0);
    +					out.collect(new Tuple1<>(key1));
    +				}
    +			}
    +		}), cd, 0).groupBy(0).sum(0);
    +
    +		String expected = "[(3), (4), (3), (12), (5)]";
    +		assertEquals(expected, out1.collect().toString());
    +	}
    +
    +	@Test
    +	public void testRangeWithDistribution2() throws Exception{
    +
    +		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
    +
    +		DataSet<Tuple2<Tuple2<Integer, Integer>, String>> input1 = env.fromElements(
    +				new Tuple2<>(new Tuple2<>(1, 1), "Hi"),
    +				new Tuple2<>(new Tuple2<>(1, 2), "Hello"),
    +				new Tuple2<>(new Tuple2<>(1, 3), "Hello world"),
    +				new Tuple2<>(new Tuple2<>(2, 4), "Hello world, how are you?"),
    +				new Tuple2<>(new Tuple2<>(2, 5), "I am fine."),
    +				new Tuple2<>(new Tuple2<>(3, 6), "Luke Skywalker"),
    +				new Tuple2<>(new Tuple2<>(4, 7), "Comment#1"),
    +				new Tuple2<>(new Tuple2<>(4, 8), "Comment#2"),
    +				new Tuple2<>(new Tuple2<>(4, 9), "Comment#3"),
    +				new Tuple2<>(new Tuple2<>(5, 10), "Comment#4"));
    +
    +		IntValue[][] keys = new IntValue[2][2];
    +
    +		env.setParallelism(3);
    +
    +		for (int i = 0; i < 2; i++)
    +		{
    +			for (int j = 0; j < 2; j++)
    +			{
    +				keys[i][j] = new IntValue(i + j);
    +			}
    +		}
    +
    +		CustomDistribution cd = new CustomDistribution(keys);
    +
    +		DataSet<Tuple1<IntValue>> out1= DataSetUtils.partitionByRange(input1.mapPartition(
    +				new MapPartitionFunction<Tuple2<Tuple2<Integer, Integer>, String>, Tuple1<Tuple2<IntValue, IntValue>>>() {
    +					public void mapPartition(Iterable<Tuple2<Tuple2<Integer, Integer>, String>> values, Collector<Tuple1<Tuple2<IntValue, IntValue>>> out) {
    +						IntValue key1;
    +						IntValue key2;
    +						for (Tuple2<Tuple2<Integer, Integer>, String> s : values) {
    +							key1 = new IntValue(s.f0.f0);
    +							key2 = new IntValue(s.f0.f1);
    +							out.collect(new Tuple1<>(new Tuple2<>(key1, key2)));
    +						}
    +					}
    +				}), cd, 0).mapPartition(new MapPartitionFunction<Tuple1<Tuple2<IntValue, IntValue>>, Tuple1<IntValue>>() {
    +			public void mapPartition(Iterable<Tuple1<Tuple2<IntValue, IntValue>>> values, Collector<Tuple1<IntValue>> out) {
    +				Tuple1<IntValue> key;
    +				for (Tuple1<Tuple2<IntValue, IntValue>> s : values) {
    +					key = new Tuple1<>(s.f0.f0);
    +					out.collect(key);
    +				}
    +			}
    +		}).groupBy(0).sum(0);
    +
    +		String expected = "[(1), (4), (2), (3), (5), (12)]";
    +		assertEquals(expected, out1.collect().toString());
    +	}
    --- End diff --
    
    OK, i will modify the test code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2997] Support range partition with user...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1776#discussion_r55660961
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java ---
    @@ -0,0 +1,142 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.test.javaApiOperators;
    +
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.test.distribution.CustomDistribution;
    +import org.apache.flink.api.common.functions.MapPartitionFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.tuple.Tuple1;
    +import org.apache.flink.api.java.tuple.Tuple3;
    +import org.apache.flink.api.java.utils.DataSetUtils;
    +import org.apache.flink.types.IntValue;
    +import org.apache.flink.util.Collector;
    +import org.junit.Test;
    +
    +
    +import static org.junit.Assert.assertEquals;
    +
    +
    +public class CustomDistributionITCase {
    +	
    +	@Test
    +	public void testRangeWithDistribution1() throws Exception{
    +
    +		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
    +
    +		DataSet<Tuple3<Integer, Integer, String>> input1 = env.fromElements(
    +				new Tuple3<>(1, 1, "Hi"),
    +				new Tuple3<>(1, 2, "Hello"),
    +				new Tuple3<>(1, 3, "Hello world"),
    +				new Tuple3<>(2, 4, "Hello world, how are you?"),
    +				new Tuple3<>(2, 5, "I am fine."),
    +				new Tuple3<>(3, 6, "Luke Skywalker"),
    +				new Tuple3<>(4, 7, "Comment#1"),
    +				new Tuple3<>(4, 8, "Comment#2"),
    +				new Tuple3<>(4, 9, "Comment#3"),
    +				new Tuple3<>(5, 10, "Comment#4"));
    +
    +		IntValue[][] keys = new IntValue[2][2];
    +
    +		env.setParallelism(3);
    +
    +		for (int i = 0; i < 2; i++)
    +		{
    +			for (int j = 0; j < 2; j++)
    +			{
    +				keys[i][j] = new IntValue(i + j);
    +			}
    +		}
    +
    +		CustomDistribution cd = new CustomDistribution(keys);
    +
    +		DataSet<Tuple2<IntValue, IntValue>> out1 = DataSetUtils.partitionByRange(input1.mapPartition(
    +				new MapPartitionFunction<Tuple3<Integer, Integer, String>, Tuple2<IntValue, IntValue>>() {
    +			public void mapPartition(Iterable<Tuple3<Integer, Integer, String>> values, Collector<Tuple2<IntValue, IntValue>> out) {
    --- End diff --
    
    A `MapFunction` would be easier. Also preparing the data sets first before applying the `rangeByRange` would be more readable.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2997] Support range partition with user...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1776#discussion_r56669959
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java ---
    @@ -0,0 +1,116 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.test.javaApiOperators;
    +
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.common.functions.ReduceFunction;
    +import org.apache.flink.api.common.functions.RichMapPartitionFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.tuple.Tuple3;
    +import org.apache.flink.api.java.utils.DataSetUtils;
    +import org.apache.flink.test.distribution.TestDataDist;
    +import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
    +import org.apache.flink.util.Collector;
    +import org.junit.Test;
    +
    +
    +import static org.junit.Assert.assertTrue;
    +
    +
    +public class CustomDistributionITCase {
    +	
    +	@Test
    +	public void testPartitionWithDistribution1() throws Exception{
    +		/*
    +		 * Test the record partitioned rightly with one field according to the customized data distribution
    +		 */
    +
    +		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
    +
    +		DataSet<Tuple3<Integer, Long, String>> input1 = CollectionDataSets.get3TupleDataSet(env);
    +		final TestDataDist dist = new TestDataDist(1);
    +
    +		env.setParallelism(dist.getParallelism());
    +
    +		DataSet<Boolean> result = DataSetUtils.partitionByRange(input1, dist, 0).mapPartition(new RichMapPartitionFunction<Tuple3<Integer, Long, String>, Boolean>() {
    +			@Override
    +			public void mapPartition(Iterable<Tuple3<Integer, Long, String>> values, Collector<Boolean> out) throws Exception {
    +				int partitionIndex = getRuntimeContext().getIndexOfThisSubtask();
    +				boolean checkPartition = true;
    +
    +				for (Tuple3<Integer, Long, String> s : values) {
    +					if (s.f0 > (partitionIndex + 1) * 7 || s.f0 < (partitionIndex) * 7) {
    +						checkPartition = false;
    +					}
    +				}
    +				out.collect(checkPartition);
    +			}                        
    +		}).reduce(new ReduceFunction<Boolean>() {
    +			@Override
    +			public Boolean reduce(Boolean value1, Boolean value2) throws Exception {
    +				return value1 && value2;
    +			}
    +		});
    +
    +		assertTrue("The record is not emitted to the right partition", result.collect().get(0));
    --- End diff --
    
    append an `result.output(new DiscardingOutputFormat());` and call `env.execute()` instead of `result.collect()`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2997] Support range partition with user...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1776#discussion_r55661084
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java ---
    @@ -0,0 +1,142 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.test.javaApiOperators;
    +
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.test.distribution.CustomDistribution;
    +import org.apache.flink.api.common.functions.MapPartitionFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.tuple.Tuple1;
    +import org.apache.flink.api.java.tuple.Tuple3;
    +import org.apache.flink.api.java.utils.DataSetUtils;
    +import org.apache.flink.types.IntValue;
    +import org.apache.flink.util.Collector;
    +import org.junit.Test;
    +
    +
    +import static org.junit.Assert.assertEquals;
    +
    +
    +public class CustomDistributionITCase {
    +	
    +	@Test
    +	public void testRangeWithDistribution1() throws Exception{
    +
    +		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
    +
    +		DataSet<Tuple3<Integer, Integer, String>> input1 = env.fromElements(
    +				new Tuple3<>(1, 1, "Hi"),
    +				new Tuple3<>(1, 2, "Hello"),
    +				new Tuple3<>(1, 3, "Hello world"),
    +				new Tuple3<>(2, 4, "Hello world, how are you?"),
    +				new Tuple3<>(2, 5, "I am fine."),
    +				new Tuple3<>(3, 6, "Luke Skywalker"),
    +				new Tuple3<>(4, 7, "Comment#1"),
    +				new Tuple3<>(4, 8, "Comment#2"),
    +				new Tuple3<>(4, 9, "Comment#3"),
    +				new Tuple3<>(5, 10, "Comment#4"));
    +
    +		IntValue[][] keys = new IntValue[2][2];
    +
    +		env.setParallelism(3);
    +
    +		for (int i = 0; i < 2; i++)
    +		{
    +			for (int j = 0; j < 2; j++)
    +			{
    +				keys[i][j] = new IntValue(i + j);
    +			}
    +		}
    +
    +		CustomDistribution cd = new CustomDistribution(keys);
    +
    +		DataSet<Tuple2<IntValue, IntValue>> out1 = DataSetUtils.partitionByRange(input1.mapPartition(
    +				new MapPartitionFunction<Tuple3<Integer, Integer, String>, Tuple2<IntValue, IntValue>>() {
    +			public void mapPartition(Iterable<Tuple3<Integer, Integer, String>> values, Collector<Tuple2<IntValue, IntValue>> out) {
    +				IntValue key1;
    +				IntValue key2;
    +				for (Tuple3<Integer, Integer, String> s : values) {
    +					key1 = new IntValue(s.f0);
    +					key2 = new IntValue(s.f1);
    +					out.collect(new Tuple2<>(key1, key2));
    +				}
    +			}
    +		}), cd, 0, 1).groupBy(0).sum(0);
    --- End diff --
    
    This test only checks that the partitioning worked correctly. It does not check that the values were distributed accordingly to the provided distribution.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2997] Support range partition with user...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the pull request:

    https://github.com/apache/flink/pull/1776#issuecomment-200298878
  
    Thanks for the quick updates @gallenvara! I think we are almost done. The test distribution and input data looks good. Only the boundary checks need to be fixed.
    Thanks, Fabian


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2997] Support range partition with user...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1776#discussion_r57141718
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java ---
    @@ -0,0 +1,230 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.test.javaApiOperators;
    +
    +import org.apache.flink.api.common.distributions.DataDistribution;
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.common.functions.RichMapPartitionFunction;
    +import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.io.DiscardingOutputFormat;
    +import org.apache.flink.api.java.tuple.Tuple3;
    +import org.apache.flink.api.java.utils.DataSetUtils;
    +import org.apache.flink.core.memory.DataInputView;
    +import org.apache.flink.core.memory.DataOutputView;
    +import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
    +import org.apache.flink.util.Collector;
    +import org.junit.Test;
    +
    +
    +import java.io.IOException;
    +
    +import static org.junit.Assert.fail;
    +
    +
    +public class CustomDistributionITCase {
    +	
    +	@Test
    +	public void testPartitionWithDistribution1() throws Exception{
    +		/*
    +		 * Test the record partitioned rightly with one field according to the customized data distribution
    +		 */
    +
    +		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
    +
    +		DataSet<Tuple3<Integer, Long, String>> input1 = CollectionDataSets.get3TupleDataSet(env);
    +		final TestDataDist1 dist = new TestDataDist1(1);
    +
    +		env.setParallelism(dist.getParallelism());
    +
    +		DataSet<Boolean> result = DataSetUtils.partitionByRange(input1, dist, 0).mapPartition(new RichMapPartitionFunction<Tuple3<Integer, Long, String>, Boolean>() {
    +			@Override
    +			public void mapPartition(Iterable<Tuple3<Integer, Long, String>> values, Collector<Boolean> out) throws Exception {
    +				int partitionIndex = getRuntimeContext().getIndexOfThisSubtask();
    +
    +				for (Tuple3<Integer, Long, String> s : values) {
    +					if ((s.f0 - 1) / 7 != partitionIndex) {
    +						fail("Record was not correctly partitioned: " + s.toString());
    +					}
    +				}
    +			}
    +		});
    +
    +		result.output(new DiscardingOutputFormat()); 
    +		env.execute();
    +	}
    +
    +	@Test
    +	public void testRangeWithDistribution2() throws Exception{
    +		/*
    +		 * Test the record partitioned rightly with two fields according to the customized data distribution
    +		 */
    +
    +		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
    +
    +		DataSet<Tuple3<Integer, Long, String>> input1 = env.fromElements(
    +						new Tuple3<>(1, 5L, "Hi"),
    +						new Tuple3<>(1, 11L, "Hello"),
    +						new Tuple3<>(2, 3L, "World"),
    +						new Tuple3<>(2, 13L, "Hello World"),
    +						new Tuple3<>(3, 8L, "Say"),
    +						new Tuple3<>(4, 0L, "Why"),
    +						new Tuple3<>(4, 2L, "Java"),
    +						new Tuple3<>(4, 11L, "Say Hello"),
    +						new Tuple3<>(5, 2L, "Hi Java"));
    +
    +		final TestDataDist2 dist = new TestDataDist2(2);
    +
    +		env.setParallelism(dist.getParallelism());
    +
    +		DataSet<Boolean> result = DataSetUtils.partitionByRange(input1.map(new MapFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, Integer, String>>() {
    +			@Override
    +			public Tuple3<Integer, Integer, String> map(Tuple3<Integer, Long, String> value) throws Exception {
    +				return new Tuple3<>(value.f0, value.f1.intValue(), value.f2);
    +			}
    +		}), dist, 0, 1).mapPartition(new RichMapPartitionFunction<Tuple3<Integer, Integer, String>, Boolean>() {
    +			@Override
    +			public void mapPartition(Iterable<Tuple3<Integer, Integer, String>> values, Collector<Boolean> out) throws Exception {
    +				int partitionIndex = getRuntimeContext().getIndexOfThisSubtask();
    +				for (Tuple3<Integer, Integer, String> s : values) {
    +					
    +					if ((s.f0 > partitionIndex + 1) || ((s.f0 == partitionIndex + 1) && (s.f1 > dist.rightBoundary[partitionIndex]))) {
    +							fail("Record was not correctly partitioned: " + s.toString());
    +					}
    +				}
    +			}
    +		});
    +
    +		result.output(new DiscardingOutputFormat());
    +		env.execute();
    +	}
    +
    +	/**
    +	 * The class is used to do the tests of range partition with one key.
    +	 */
    +	public static class TestDataDist1 implements DataDistribution {
    +
    +		private int dim;
    --- End diff --
    
    `dim` should always be `1`. Remove the field, the constructor, and update `write()` and `read()`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2997] Support range partition with user...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1776#discussion_r55660109
  
    --- Diff: flink-optimizer/src/main/java/org/apache/flink/optimizer/dataproperties/GlobalProperties.java ---
    @@ -92,6 +95,10 @@ public void setRangePartitioned(Ordering ordering) {
     		this.partitioningFields = ordering.getInvolvedIndexes();
     	}
     	
    +	public void setDataDistribution(DataDistribution distribution) {
    --- End diff --
    
    Can you remove the `setDataDistribution` method and add a parameter to `setRangePartitioned()` instead. Also add JavaDocs to `setRangePartitioned` to explain that the distribution is optional and might be null.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2997] Support range partition with user...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1776#discussion_r55661214
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java ---
    @@ -0,0 +1,142 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.test.javaApiOperators;
    +
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.test.distribution.CustomDistribution;
    +import org.apache.flink.api.common.functions.MapPartitionFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.tuple.Tuple1;
    +import org.apache.flink.api.java.tuple.Tuple3;
    +import org.apache.flink.api.java.utils.DataSetUtils;
    +import org.apache.flink.types.IntValue;
    +import org.apache.flink.util.Collector;
    +import org.junit.Test;
    +
    +
    +import static org.junit.Assert.assertEquals;
    +
    +
    +public class CustomDistributionITCase {
    --- End diff --
    
    Please add comments about what the tests are doing and testing.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2997] Support range partition with user...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1776#discussion_r56344766
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java ---
    @@ -0,0 +1,161 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.test.javaApiOperators;
    +
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.common.functions.RichMapPartitionFunction;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.test.distribution.CustomDistribution;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.tuple.Tuple3;
    +import org.apache.flink.api.java.utils.DataSetUtils;
    +import org.apache.flink.types.IntValue;
    +import org.apache.flink.util.Collector;
    +import org.junit.Test;
    +
    +
    +import java.util.List;
    +
    +import static org.junit.Assert.assertTrue;
    +
    +
    +public class CustomDistributionITCase {
    +	
    +	@Test
    +	public void testPartitionWithDistribution1() throws Exception{
    +		/*
    +		 * Test the record partitioned rightly with one field according to the customized data distribution
    +		 */
    +
    +		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
    +
    +		DataSet<Tuple3<Integer, Integer, String>> input1 = env.fromElements(
    +				new Tuple3<>(1, 1, "Hi"),
    +				new Tuple3<>(1, 2, "Hello"),
    +				new Tuple3<>(1, 3, "Hello world"),
    +				new Tuple3<>(2, 4, "how are you?"),
    +				new Tuple3<>(2, 5, "I am fine."),
    +				new Tuple3<>(3, 6, "Luke Skywalker"),
    +				new Tuple3<>(4, 7, "Comment#1"),
    +				new Tuple3<>(4, 8, "Comment#2"),
    +				new Tuple3<>(4, 9, "Comment#3"),
    +				new Tuple3<>(5, 10, "Comment#4"));
    +
    +		final IntValue[] keys = new IntValue[3];
    +
    +		for (int i = 0; i < 3; i++) {
    +			keys[i] = new IntValue((i + 1) * 2);
    +		}
    +
    +		final CustomDistribution cd = new CustomDistribution(keys);
    +
    +		env.setParallelism(3);
    +
    +		DataSet<Boolean> out1 = DataSetUtils.partitionByRange(input1.map(new MapFunction<Tuple3<Integer, Integer, String>, Tuple2<IntValue, IntValue>>() {
    +			@Override
    +			public Tuple2<IntValue, IntValue> map(Tuple3<Integer, Integer, String> value) throws Exception {
    +				IntValue key1;
    +				IntValue key2;
    +				key1 = new IntValue(value.f0);
    +				key2 = new IntValue(value.f1);
    +				return new Tuple2<>(key1, key2);
    +			}
    +		}), cd, 0).mapPartition(new RichMapPartitionFunction<Tuple2<IntValue, IntValue>, Boolean>() {
    +			@Override
    +			public void mapPartition(Iterable<Tuple2<IntValue, IntValue>> values, Collector<Boolean> out) throws Exception {
    +				boolean boo = true;
    +				for (Tuple2<IntValue, IntValue> s : values) {
    +					IntValue intValues= (IntValue)cd.getBucketBoundary(getRuntimeContext().getIndexOfThisSubtask(), 3)[0];
    +					if (s.f0.getValue() > intValues.getValue()) {
    +						boo = false;
    +					}
    +				}
    +				out.collect(boo);
    +			}
    +		});
    +
    +		List<Boolean> result = out1.collect();
    +		for (int i = 0; i < result.size(); i++) {
    +			assertTrue("The record is not emitted to the right partition", result.get(i));
    +		}
    +	}
    +
    +	@Test
    +	public void testRangeWithDistribution2() throws Exception{
    --- End diff --
    
    Same issues with the other test.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2997] Support range partition with user...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1776#discussion_r56669667
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java ---
    @@ -0,0 +1,116 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.test.javaApiOperators;
    +
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.common.functions.ReduceFunction;
    +import org.apache.flink.api.common.functions.RichMapPartitionFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.tuple.Tuple3;
    +import org.apache.flink.api.java.utils.DataSetUtils;
    +import org.apache.flink.test.distribution.TestDataDist;
    +import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
    +import org.apache.flink.util.Collector;
    +import org.junit.Test;
    +
    +
    +import static org.junit.Assert.assertTrue;
    +
    +
    +public class CustomDistributionITCase {
    +	
    +	@Test
    +	public void testPartitionWithDistribution1() throws Exception{
    +		/*
    +		 * Test the record partitioned rightly with one field according to the customized data distribution
    +		 */
    +
    +		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
    +
    +		DataSet<Tuple3<Integer, Long, String>> input1 = CollectionDataSets.get3TupleDataSet(env);
    +		final TestDataDist dist = new TestDataDist(1);
    +
    +		env.setParallelism(dist.getParallelism());
    +
    +		DataSet<Boolean> result = DataSetUtils.partitionByRange(input1, dist, 0).mapPartition(new RichMapPartitionFunction<Tuple3<Integer, Long, String>, Boolean>() {
    +			@Override
    +			public void mapPartition(Iterable<Tuple3<Integer, Long, String>> values, Collector<Boolean> out) throws Exception {
    +				int partitionIndex = getRuntimeContext().getIndexOfThisSubtask();
    +				boolean checkPartition = true;
    +
    +				for (Tuple3<Integer, Long, String> s : values) {
    +					if (s.f0 > (partitionIndex + 1) * 7 || s.f0 < (partitionIndex) * 7) {
    +						checkPartition = false;
    +					}
    +				}
    +				out.collect(checkPartition);
    --- End diff --
    
    No need to emit something if assert is done in function.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2997] Support range partition with user...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1776#discussion_r57141747
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java ---
    @@ -0,0 +1,230 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.test.javaApiOperators;
    +
    +import org.apache.flink.api.common.distributions.DataDistribution;
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.common.functions.RichMapPartitionFunction;
    +import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.io.DiscardingOutputFormat;
    +import org.apache.flink.api.java.tuple.Tuple3;
    +import org.apache.flink.api.java.utils.DataSetUtils;
    +import org.apache.flink.core.memory.DataInputView;
    +import org.apache.flink.core.memory.DataOutputView;
    +import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
    +import org.apache.flink.util.Collector;
    +import org.junit.Test;
    +
    +
    +import java.io.IOException;
    +
    +import static org.junit.Assert.fail;
    +
    +
    +public class CustomDistributionITCase {
    +	
    +	@Test
    +	public void testPartitionWithDistribution1() throws Exception{
    +		/*
    +		 * Test the record partitioned rightly with one field according to the customized data distribution
    +		 */
    +
    +		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
    +
    +		DataSet<Tuple3<Integer, Long, String>> input1 = CollectionDataSets.get3TupleDataSet(env);
    +		final TestDataDist1 dist = new TestDataDist1(1);
    +
    +		env.setParallelism(dist.getParallelism());
    +
    +		DataSet<Boolean> result = DataSetUtils.partitionByRange(input1, dist, 0).mapPartition(new RichMapPartitionFunction<Tuple3<Integer, Long, String>, Boolean>() {
    +			@Override
    +			public void mapPartition(Iterable<Tuple3<Integer, Long, String>> values, Collector<Boolean> out) throws Exception {
    +				int partitionIndex = getRuntimeContext().getIndexOfThisSubtask();
    +
    +				for (Tuple3<Integer, Long, String> s : values) {
    +					if ((s.f0 - 1) / 7 != partitionIndex) {
    +						fail("Record was not correctly partitioned: " + s.toString());
    +					}
    +				}
    +			}
    +		});
    +
    +		result.output(new DiscardingOutputFormat()); 
    +		env.execute();
    +	}
    +
    +	@Test
    +	public void testRangeWithDistribution2() throws Exception{
    +		/*
    +		 * Test the record partitioned rightly with two fields according to the customized data distribution
    +		 */
    +
    +		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
    +
    +		DataSet<Tuple3<Integer, Long, String>> input1 = env.fromElements(
    +						new Tuple3<>(1, 5L, "Hi"),
    +						new Tuple3<>(1, 11L, "Hello"),
    +						new Tuple3<>(2, 3L, "World"),
    +						new Tuple3<>(2, 13L, "Hello World"),
    +						new Tuple3<>(3, 8L, "Say"),
    +						new Tuple3<>(4, 0L, "Why"),
    +						new Tuple3<>(4, 2L, "Java"),
    +						new Tuple3<>(4, 11L, "Say Hello"),
    +						new Tuple3<>(5, 2L, "Hi Java"));
    +
    +		final TestDataDist2 dist = new TestDataDist2(2);
    +
    +		env.setParallelism(dist.getParallelism());
    +
    +		DataSet<Boolean> result = DataSetUtils.partitionByRange(input1.map(new MapFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, Integer, String>>() {
    +			@Override
    +			public Tuple3<Integer, Integer, String> map(Tuple3<Integer, Long, String> value) throws Exception {
    +				return new Tuple3<>(value.f0, value.f1.intValue(), value.f2);
    +			}
    +		}), dist, 0, 1).mapPartition(new RichMapPartitionFunction<Tuple3<Integer, Integer, String>, Boolean>() {
    +			@Override
    +			public void mapPartition(Iterable<Tuple3<Integer, Integer, String>> values, Collector<Boolean> out) throws Exception {
    +				int partitionIndex = getRuntimeContext().getIndexOfThisSubtask();
    +				for (Tuple3<Integer, Integer, String> s : values) {
    +					
    +					if ((s.f0 > partitionIndex + 1) || ((s.f0 == partitionIndex + 1) && (s.f1 > dist.rightBoundary[partitionIndex]))) {
    --- End diff --
    
    The test checks only one bucket boundary. We need to check 
    - the upper boundary if `partitionIndex == 0`
    - lower and upper boundaries if `0 < `partitionIndex` < 4`
    - the lower boundary if `partitionIndex == 4.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2997] Support range partition with user...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1776#discussion_r57078308
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java ---
    @@ -0,0 +1,184 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.test.javaApiOperators;
    +
    +import org.apache.flink.api.common.distributions.DataDistribution;
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.common.functions.RichMapPartitionFunction;
    +import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.io.DiscardingOutputFormat;
    +import org.apache.flink.api.java.tuple.Tuple3;
    +import org.apache.flink.api.java.utils.DataSetUtils;
    +import org.apache.flink.core.memory.DataInputView;
    +import org.apache.flink.core.memory.DataOutputView;
    +import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
    +import org.apache.flink.util.Collector;
    +import org.junit.Test;
    +
    +
    +import java.io.IOException;
    +
    +import static org.junit.Assert.fail;
    +
    +
    +public class CustomDistributionITCase {
    +	
    +	@Test
    +	public void testPartitionWithDistribution1() throws Exception{
    +		/*
    +		 * Test the record partitioned rightly with one field according to the customized data distribution
    +		 */
    +
    +		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
    +
    +		DataSet<Tuple3<Integer, Long, String>> input1 = CollectionDataSets.get3TupleDataSet(env);
    +		final TestDataDist dist = new TestDataDist(1);
    +
    +		env.setParallelism(dist.getParallelism());
    +
    +		DataSet<Boolean> result = DataSetUtils.partitionByRange(input1, dist, 0).mapPartition(new RichMapPartitionFunction<Tuple3<Integer, Long, String>, Boolean>() {
    +			@Override
    +			public void mapPartition(Iterable<Tuple3<Integer, Long, String>> values, Collector<Boolean> out) throws Exception {
    +				int partitionIndex = getRuntimeContext().getIndexOfThisSubtask();
    +
    +				for (Tuple3<Integer, Long, String> s : values) {
    +					if ((s.f0 - 1) / 7 != partitionIndex) {
    +						fail("Record was not correctly partitioned: " + s.toString());
    +					}
    +				}
    +			}
    +		});
    +
    +		result.output(new DiscardingOutputFormat()); 
    +		env.execute();
    +	}
    +
    +	@Test
    +	public void testRangeWithDistribution2() throws Exception{
    +		/*
    +		 * Test the record partitioned rightly with two fields according to the customized data distribution
    +		 */
    +
    +		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
    +
    +		DataSet<Tuple3<Integer, Long, String>> input1 = CollectionDataSets.get3TupleDataSet(env);
    +		final TestDataDist dist = new TestDataDist(2);
    +
    +		env.setParallelism(dist.getParallelism());
    +
    +		DataSet<Boolean> result = DataSetUtils.partitionByRange(input1.map(new MapFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, Integer, String>>() {
    +			@Override
    +			public Tuple3<Integer, Integer, String> map(Tuple3<Integer, Long, String> value) throws Exception {
    +				return new Tuple3<>(value.f0, value.f1.intValue(), value.f2);
    +			}
    +		}), dist, 0, 1).mapPartition(new RichMapPartitionFunction<Tuple3<Integer, Integer, String>, Boolean>() {
    +			@Override
    +			public void mapPartition(Iterable<Tuple3<Integer, Integer, String>> values, Collector<Boolean> out) throws Exception {
    +				int partitionIndex = getRuntimeContext().getIndexOfThisSubtask();
    +
    +				for (Tuple3<Integer, Integer, String> s : values) {
    +					if (s.f0 <= partitionIndex * (partitionIndex + 1) / 2 ||
    +							s.f0 > (partitionIndex + 1) * (partitionIndex + 2) / 2 ||
    +							s.f1 - 1 != partitionIndex) {
    +						fail("Record was not correctly partitioned: " + s.toString());
    +					}
    +				}
    +			}
    +		});
    +
    +		result.output(new DiscardingOutputFormat());
    +		env.execute();
    +	}
    +
    +	/**
    +	 * The class is used to do the tests of range partition with customed data distribution.
    +	 */
    +	public static class TestDataDist implements DataDistribution {
    +
    +		private int dim;
    +
    +		public TestDataDist() {}
    +
    +		/**
    +		 * Constructor of the customized distribution for range partition.
    +		 * @param dim the number of the fields.
    +		 */
    +		public TestDataDist(int dim) {
    +			this.dim = dim;
    +		}
    +
    +		public int getParallelism() {
    +			if (dim == 1) {
    +				return 3;
    +			}
    +			return 6;
    +		}
    +
    +		@Override
    +		public Object[] getBucketBoundary(int bucketNum, int totalNumBuckets) {
    +			if (dim == 1) {
    +				/*
    +				for the first test, the boundary is just like : 
    +				(0, 7]
    +				(7, 14]
    +				(14, 21]
    +				 */
    +
    +				return new Integer[]{(bucketNum + 1) * 7};
    +			}
    +			/*
    +			for the second test, the boundary is just like : 
    +			(0, 1], (0, 1]
    --- End diff --
    
    Sure :-)
    The value distribution of the two key attributes in the data set used for the test is not optimal. Both attributes are increasing and hence somewhat correlated. The buckets of the test distribution reflect this correlation because the keys of both boundaries are increasing too. In fact, only a single attribute can be used to determine the correct result partition. So the tests do not show that both attributes are correctly evaluated to determine the result partition. 
    In my opinion, it would be better to use a data set where the partition keys are not correlated and have boundaries similar as the ones I posted before.
    Did that clarify my previous comment?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2997] Support range partition with user...

Posted by gallenvara <gi...@git.apache.org>.
Github user gallenvara commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1776#discussion_r56916757
  
    --- Diff: flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java ---
    @@ -45,35 +46,48 @@
     	private final PartitionMethod pMethod;
     	private final String partitionLocationName;
     	private final Partitioner<?> customPartitioner;
    -	
    -	
    +	private final DataDistribution distribution;
    +
    +
     	public PartitionOperator(DataSet<T> input, PartitionMethod pMethod, Keys<T> pKeys, String partitionLocationName) {
    -		this(input, pMethod, pKeys, null, null, partitionLocationName);
    +		this(input, pMethod, pKeys, null, null, null, partitionLocationName);
     	}
    -	
    +
    +	public PartitionOperator(DataSet<T> input, PartitionMethod pMethod, Keys<T> pKeys, DataDistribution distribution, String partitionLocationName) {
    +		this(input, pMethod, pKeys, null, null, distribution, partitionLocationName);
    +	}
    +
     	public PartitionOperator(DataSet<T> input, PartitionMethod pMethod, String partitionLocationName) {
    -		this(input, pMethod, null, null, null, partitionLocationName);
    +		this(input, pMethod, null, null, null, null, partitionLocationName);
     	}
     	
     	public PartitionOperator(DataSet<T> input, Keys<T> pKeys, Partitioner<?> customPartitioner, String partitionLocationName) {
    -		this(input, PartitionMethod.CUSTOM, pKeys, customPartitioner, null, partitionLocationName);
    +		this(input, PartitionMethod.CUSTOM, pKeys, customPartitioner, null, null, partitionLocationName);
     	}
     	
    -	public <P> PartitionOperator(DataSet<T> input, Keys<T> pKeys, Partitioner<P> customPartitioner, 
    +	public <P> PartitionOperator(DataSet<T> input, Keys<T> pKeys, Partitioner<P> customPartitioner,
     			TypeInformation<P> partitionerTypeInfo, String partitionLocationName)
     	{
    -		this(input, PartitionMethod.CUSTOM, pKeys, customPartitioner, partitionerTypeInfo, partitionLocationName);
    +		this(input, PartitionMethod.CUSTOM, pKeys, customPartitioner, partitionerTypeInfo, null, partitionLocationName);
     	}
     	
    -	private <P> PartitionOperator(DataSet<T> input, PartitionMethod pMethod, Keys<T> pKeys, Partitioner<P> customPartitioner, 
    -			TypeInformation<P> partitionerTypeInfo, String partitionLocationName)
    +	private <P> PartitionOperator(DataSet<T> input, PartitionMethod pMethod, Keys<T> pKeys, Partitioner<P> customPartitioner,
    +			TypeInformation<P> partitionerTypeInfo, DataDistribution distribution, String partitionLocationName)
     	{
     		super(input, input.getType());
     		
     		Preconditions.checkNotNull(pMethod);
     		Preconditions.checkArgument(pKeys != null || pMethod == PartitionMethod.REBALANCE, "Partitioning requires keys");
     		Preconditions.checkArgument(pMethod != PartitionMethod.CUSTOM || customPartitioner != null, "Custom partioning requires a partitioner.");
    -
    +		Preconditions.checkArgument(distribution == null || pMethod == PartitionMethod.RANGE, "Customized data distribution is only neccessary for range partition.");
    +		
    +		if (distribution != null) {
    +			Preconditions.checkArgument(distribution.getNumberOfFields() == pKeys.getNumberOfKeyFields(), "The number of key fields in the distribution and range partitioner should be the same.");
    +			for (int i = 0; i < pKeys.getNumberOfKeyFields(); i++) {
    +				Preconditions.checkArgument(distribution.getKeyTypes()[i].equals(pKeys.getKeyFieldTypes()[i]), "The types of key from the distribution and range partitioner are not equal.");
    --- End diff --
    
    code modified and rebase the new commit with previous one.( you must have stayed up late last night :) )


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2997] Support range partition with user...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1776#discussion_r55660409
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/distribution/CustomDistribution.java ---
    @@ -0,0 +1,164 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.test.distribution;
    +
    +import org.apache.flink.api.common.distributions.DataDistribution;
    +import org.apache.flink.core.memory.DataInputView;
    +import org.apache.flink.core.memory.DataOutputView;
    +import org.apache.flink.types.Value;
    +import org.apache.flink.util.InstantiationUtil;
    +
    +import java.io.IOException;
    +
    +/**
    + * The class is used to do the tests of range partition with customed data distribution.
    + */
    +public class CustomDistribution implements DataDistribution {
    --- End diff --
    
    I think it would be better to not implement a generic `CustomDistribution` for the tests. 
    Instead provide simple `DataDistribution` with built-in data and fixed parallelism. This will make the tests easier to read and reason about.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2997] Support range partition with user...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1776#discussion_r56669752
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java ---
    @@ -0,0 +1,116 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.test.javaApiOperators;
    +
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.common.functions.ReduceFunction;
    +import org.apache.flink.api.common.functions.RichMapPartitionFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.tuple.Tuple3;
    +import org.apache.flink.api.java.utils.DataSetUtils;
    +import org.apache.flink.test.distribution.TestDataDist;
    +import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
    +import org.apache.flink.util.Collector;
    +import org.junit.Test;
    +
    +
    +import static org.junit.Assert.assertTrue;
    +
    +
    +public class CustomDistributionITCase {
    +	
    +	@Test
    +	public void testPartitionWithDistribution1() throws Exception{
    +		/*
    +		 * Test the record partitioned rightly with one field according to the customized data distribution
    +		 */
    +
    +		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
    +
    +		DataSet<Tuple3<Integer, Long, String>> input1 = CollectionDataSets.get3TupleDataSet(env);
    +		final TestDataDist dist = new TestDataDist(1);
    +
    +		env.setParallelism(dist.getParallelism());
    +
    +		DataSet<Boolean> result = DataSetUtils.partitionByRange(input1, dist, 0).mapPartition(new RichMapPartitionFunction<Tuple3<Integer, Long, String>, Boolean>() {
    +			@Override
    +			public void mapPartition(Iterable<Tuple3<Integer, Long, String>> values, Collector<Boolean> out) throws Exception {
    +				int partitionIndex = getRuntimeContext().getIndexOfThisSubtask();
    +				boolean checkPartition = true;
    +
    +				for (Tuple3<Integer, Long, String> s : values) {
    +					if (s.f0 > (partitionIndex + 1) * 7 || s.f0 < (partitionIndex) * 7) {
    +						checkPartition = false;
    +					}
    +				}
    +				out.collect(checkPartition);
    +			}                        
    +		}).reduce(new ReduceFunction<Boolean>() {
    --- End diff --
    
    remove `ReduceFunction`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2997] Support range partition with user...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1776#discussion_r56962421
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java ---
    @@ -0,0 +1,184 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.test.javaApiOperators;
    +
    +import org.apache.flink.api.common.distributions.DataDistribution;
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.common.functions.RichMapPartitionFunction;
    +import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.io.DiscardingOutputFormat;
    +import org.apache.flink.api.java.tuple.Tuple3;
    +import org.apache.flink.api.java.utils.DataSetUtils;
    +import org.apache.flink.core.memory.DataInputView;
    +import org.apache.flink.core.memory.DataOutputView;
    +import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
    +import org.apache.flink.util.Collector;
    +import org.junit.Test;
    +
    +
    +import java.io.IOException;
    +
    +import static org.junit.Assert.fail;
    +
    +
    +public class CustomDistributionITCase {
    +	
    +	@Test
    +	public void testPartitionWithDistribution1() throws Exception{
    +		/*
    +		 * Test the record partitioned rightly with one field according to the customized data distribution
    +		 */
    +
    +		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
    +
    +		DataSet<Tuple3<Integer, Long, String>> input1 = CollectionDataSets.get3TupleDataSet(env);
    +		final TestDataDist dist = new TestDataDist(1);
    +
    +		env.setParallelism(dist.getParallelism());
    +
    +		DataSet<Boolean> result = DataSetUtils.partitionByRange(input1, dist, 0).mapPartition(new RichMapPartitionFunction<Tuple3<Integer, Long, String>, Boolean>() {
    +			@Override
    +			public void mapPartition(Iterable<Tuple3<Integer, Long, String>> values, Collector<Boolean> out) throws Exception {
    +				int partitionIndex = getRuntimeContext().getIndexOfThisSubtask();
    +
    +				for (Tuple3<Integer, Long, String> s : values) {
    +					if ((s.f0 - 1) / 7 != partitionIndex) {
    +						fail("Record was not correctly partitioned: " + s.toString());
    +					}
    +				}
    +			}
    +		});
    +
    +		result.output(new DiscardingOutputFormat()); 
    +		env.execute();
    +	}
    +
    +	@Test
    +	public void testRangeWithDistribution2() throws Exception{
    +		/*
    +		 * Test the record partitioned rightly with two fields according to the customized data distribution
    +		 */
    +
    +		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
    +
    +		DataSet<Tuple3<Integer, Long, String>> input1 = CollectionDataSets.get3TupleDataSet(env);
    --- End diff --
    
    I think there is no suitable Collection DataSet to test range partitioning on a composite key. The easiest data would be two int attributes that are not correlated, i.e., are not both increasing.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2997] Support range partition with user...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1776#discussion_r56664157
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/distribution/TestDataDist.java ---
    @@ -0,0 +1,77 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.test.distribution;
    +
    +import org.apache.flink.api.common.distributions.DataDistribution;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.java.typeutils.TypeExtractor;
    +import org.apache.flink.core.memory.DataInputView;
    +import org.apache.flink.core.memory.DataOutputView;
    +
    +import java.io.IOException;
    +
    +/**
    + * The class is used to do the tests of range partition with customed data distribution.
    + */
    +public class TestDataDist implements DataDistribution {
    +
    +	private int dim;
    +
    +	public TestDataDist() {}
    +
    +	/**
    +	 * Constructor of the customized distribution for range partition.
    +	 * @param dim the number of the fields.
    +	 */
    +	public TestDataDist(int dim) {
    +		this.dim = dim;
    +	}
    +
    +	public int getParallelism() {
    +		return 3;
    +	}
    +
    +	@Override
    +	public Object[] getBucketBoundary(int bucketNum, int totalNumBuckets) {
    +		if (dim == 1) {
    +			return new Integer[]{(bucketNum + 1) * 7};
    +		}
    +		return new Integer[]{(bucketNum + 1) * 7, (bucketNum) * 2 + 3};
    +	}
    +
    +	@Override
    +	public int getNumberOfFields() {
    +		return this.dim;
    +	}
    +	
    +	@Override
    +	public TypeInformation[] getKeyTypes() {
    +		return new TypeInformation[]{TypeExtractor.getForClass(Integer.class)};
    --- End diff --
    
    must depend on `dim`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2997] Support range partition with user...

Posted by gallenvara <gi...@git.apache.org>.
Github user gallenvara commented on the pull request:

    https://github.com/apache/flink/pull/1776#issuecomment-200321158
  
    @fhueske codes has been modified :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2997] Support range partition with user...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1776#discussion_r55660005
  
    --- Diff: flink-optimizer/src/main/java/org/apache/flink/optimizer/dataproperties/GlobalProperties.java ---
    @@ -167,6 +174,8 @@ public PartitioningProperty getPartitioning() {
     		return this.customPartitioner;
     	}
     	
    +	public DataDistribution getDataDistribution() {return this.distribution;}
    --- End diff --
    
    Code style: please put the return statement in a new line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2997] Support range partition with user...

Posted by gallenvara <gi...@git.apache.org>.
Github user gallenvara commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1776#discussion_r55805402
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java ---
    @@ -0,0 +1,142 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.test.javaApiOperators;
    +
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.test.distribution.CustomDistribution;
    +import org.apache.flink.api.common.functions.MapPartitionFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.tuple.Tuple1;
    +import org.apache.flink.api.java.tuple.Tuple3;
    +import org.apache.flink.api.java.utils.DataSetUtils;
    +import org.apache.flink.types.IntValue;
    +import org.apache.flink.util.Collector;
    +import org.junit.Test;
    +
    +
    +import static org.junit.Assert.assertEquals;
    +
    +
    +public class CustomDistributionITCase {
    +	
    +	@Test
    +	public void testRangeWithDistribution1() throws Exception{
    +
    +		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
    +
    +		DataSet<Tuple3<Integer, Integer, String>> input1 = env.fromElements(
    +				new Tuple3<>(1, 1, "Hi"),
    +				new Tuple3<>(1, 2, "Hello"),
    +				new Tuple3<>(1, 3, "Hello world"),
    +				new Tuple3<>(2, 4, "Hello world, how are you?"),
    +				new Tuple3<>(2, 5, "I am fine."),
    +				new Tuple3<>(3, 6, "Luke Skywalker"),
    +				new Tuple3<>(4, 7, "Comment#1"),
    +				new Tuple3<>(4, 8, "Comment#2"),
    +				new Tuple3<>(4, 9, "Comment#3"),
    +				new Tuple3<>(5, 10, "Comment#4"));
    +
    +		IntValue[][] keys = new IntValue[2][2];
    +
    +		env.setParallelism(3);
    +
    +		for (int i = 0; i < 2; i++)
    +		{
    +			for (int j = 0; j < 2; j++)
    +			{
    +				keys[i][j] = new IntValue(i + j);
    +			}
    +		}
    +
    +		CustomDistribution cd = new CustomDistribution(keys);
    +
    +		DataSet<Tuple2<IntValue, IntValue>> out1 = DataSetUtils.partitionByRange(input1.mapPartition(
    +				new MapPartitionFunction<Tuple3<Integer, Integer, String>, Tuple2<IntValue, IntValue>>() {
    +			public void mapPartition(Iterable<Tuple3<Integer, Integer, String>> values, Collector<Tuple2<IntValue, IntValue>> out) {
    +				IntValue key1;
    +				IntValue key2;
    +				for (Tuple3<Integer, Integer, String> s : values) {
    +					key1 = new IntValue(s.f0);
    +					key2 = new IntValue(s.f1);
    +					out.collect(new Tuple2<>(key1, key2));
    +				}
    +			}
    +		}), cd, 0, 1).groupBy(0).sum(0);
    --- End diff --
    
    Thanks a lot and i will modify the test code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2997] Support range partition with user...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the pull request:

    https://github.com/apache/flink/pull/1776#issuecomment-198405075
  
    I had a few comments, but we are getting closer :-) 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2997] Support range partition with user...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the pull request:

    https://github.com/apache/flink/pull/1776#issuecomment-200339079
  
    Thanks for the fast update. The PR looks good. :-)
    I will merge it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2997] Support range partition with user...

Posted by gallenvara <gi...@git.apache.org>.
Github user gallenvara commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1776#discussion_r55794734
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java ---
    @@ -0,0 +1,142 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.test.javaApiOperators;
    +
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.test.distribution.CustomDistribution;
    +import org.apache.flink.api.common.functions.MapPartitionFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.tuple.Tuple1;
    +import org.apache.flink.api.java.tuple.Tuple3;
    +import org.apache.flink.api.java.utils.DataSetUtils;
    +import org.apache.flink.types.IntValue;
    +import org.apache.flink.util.Collector;
    +import org.junit.Test;
    +
    +
    +import static org.junit.Assert.assertEquals;
    +
    +
    +public class CustomDistributionITCase {
    +	
    +	@Test
    +	public void testRangeWithDistribution1() throws Exception{
    +
    +		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
    +
    +		DataSet<Tuple3<Integer, Integer, String>> input1 = env.fromElements(
    +				new Tuple3<>(1, 1, "Hi"),
    +				new Tuple3<>(1, 2, "Hello"),
    +				new Tuple3<>(1, 3, "Hello world"),
    +				new Tuple3<>(2, 4, "Hello world, how are you?"),
    +				new Tuple3<>(2, 5, "I am fine."),
    +				new Tuple3<>(3, 6, "Luke Skywalker"),
    +				new Tuple3<>(4, 7, "Comment#1"),
    +				new Tuple3<>(4, 8, "Comment#2"),
    +				new Tuple3<>(4, 9, "Comment#3"),
    +				new Tuple3<>(5, 10, "Comment#4"));
    +
    +		IntValue[][] keys = new IntValue[2][2];
    +
    +		env.setParallelism(3);
    +
    +		for (int i = 0; i < 2; i++)
    +		{
    +			for (int j = 0; j < 2; j++)
    +			{
    +				keys[i][j] = new IntValue(i + j);
    +			}
    +		}
    +
    +		CustomDistribution cd = new CustomDistribution(keys);
    +
    +		DataSet<Tuple2<IntValue, IntValue>> out1 = DataSetUtils.partitionByRange(input1.mapPartition(
    +				new MapPartitionFunction<Tuple3<Integer, Integer, String>, Tuple2<IntValue, IntValue>>() {
    +			public void mapPartition(Iterable<Tuple3<Integer, Integer, String>> values, Collector<Tuple2<IntValue, IntValue>> out) {
    +				IntValue key1;
    +				IntValue key2;
    +				for (Tuple3<Integer, Integer, String> s : values) {
    +					key1 = new IntValue(s.f0);
    +					key2 = new IntValue(s.f1);
    +					out.collect(new Tuple2<>(key1, key2));
    +				}
    +			}
    +		}), cd, 0, 1).groupBy(0).sum(0);
    --- End diff --
    
    Partition the inputs according the supplied distribution is the middle process in range partitioning, and i don't know how to get the information of records in every partition to validate its correctness. Can you give me some sugesstions? Thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2997] Support range partition with user...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1776#discussion_r56961903
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java ---
    @@ -0,0 +1,184 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.test.javaApiOperators;
    +
    +import org.apache.flink.api.common.distributions.DataDistribution;
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.common.functions.RichMapPartitionFunction;
    +import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.io.DiscardingOutputFormat;
    +import org.apache.flink.api.java.tuple.Tuple3;
    +import org.apache.flink.api.java.utils.DataSetUtils;
    +import org.apache.flink.core.memory.DataInputView;
    +import org.apache.flink.core.memory.DataOutputView;
    +import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
    +import org.apache.flink.util.Collector;
    +import org.junit.Test;
    +
    +
    +import java.io.IOException;
    +
    +import static org.junit.Assert.fail;
    +
    +
    +public class CustomDistributionITCase {
    +	
    +	@Test
    +	public void testPartitionWithDistribution1() throws Exception{
    +		/*
    +		 * Test the record partitioned rightly with one field according to the customized data distribution
    +		 */
    +
    +		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
    +
    +		DataSet<Tuple3<Integer, Long, String>> input1 = CollectionDataSets.get3TupleDataSet(env);
    +		final TestDataDist dist = new TestDataDist(1);
    +
    +		env.setParallelism(dist.getParallelism());
    +
    +		DataSet<Boolean> result = DataSetUtils.partitionByRange(input1, dist, 0).mapPartition(new RichMapPartitionFunction<Tuple3<Integer, Long, String>, Boolean>() {
    +			@Override
    +			public void mapPartition(Iterable<Tuple3<Integer, Long, String>> values, Collector<Boolean> out) throws Exception {
    +				int partitionIndex = getRuntimeContext().getIndexOfThisSubtask();
    +
    +				for (Tuple3<Integer, Long, String> s : values) {
    +					if ((s.f0 - 1) / 7 != partitionIndex) {
    +						fail("Record was not correctly partitioned: " + s.toString());
    +					}
    +				}
    +			}
    +		});
    +
    +		result.output(new DiscardingOutputFormat()); 
    +		env.execute();
    +	}
    +
    +	@Test
    +	public void testRangeWithDistribution2() throws Exception{
    +		/*
    +		 * Test the record partitioned rightly with two fields according to the customized data distribution
    +		 */
    +
    +		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
    +
    +		DataSet<Tuple3<Integer, Long, String>> input1 = CollectionDataSets.get3TupleDataSet(env);
    +		final TestDataDist dist = new TestDataDist(2);
    +
    +		env.setParallelism(dist.getParallelism());
    +
    +		DataSet<Boolean> result = DataSetUtils.partitionByRange(input1.map(new MapFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, Integer, String>>() {
    +			@Override
    +			public Tuple3<Integer, Integer, String> map(Tuple3<Integer, Long, String> value) throws Exception {
    +				return new Tuple3<>(value.f0, value.f1.intValue(), value.f2);
    +			}
    +		}), dist, 0, 1).mapPartition(new RichMapPartitionFunction<Tuple3<Integer, Integer, String>, Boolean>() {
    +			@Override
    +			public void mapPartition(Iterable<Tuple3<Integer, Integer, String>> values, Collector<Boolean> out) throws Exception {
    +				int partitionIndex = getRuntimeContext().getIndexOfThisSubtask();
    +
    +				for (Tuple3<Integer, Integer, String> s : values) {
    +					if (s.f0 <= partitionIndex * (partitionIndex + 1) / 2 ||
    +							s.f0 > (partitionIndex + 1) * (partitionIndex + 2) / 2 ||
    +							s.f1 - 1 != partitionIndex) {
    +						fail("Record was not correctly partitioned: " + s.toString());
    +					}
    +				}
    +			}
    +		});
    +
    +		result.output(new DiscardingOutputFormat());
    +		env.execute();
    +	}
    +
    +	/**
    +	 * The class is used to do the tests of range partition with customed data distribution.
    +	 */
    +	public static class TestDataDist implements DataDistribution {
    +
    +		private int dim;
    +
    +		public TestDataDist() {}
    +
    +		/**
    +		 * Constructor of the customized distribution for range partition.
    +		 * @param dim the number of the fields.
    +		 */
    +		public TestDataDist(int dim) {
    +			this.dim = dim;
    +		}
    +
    +		public int getParallelism() {
    +			if (dim == 1) {
    +				return 3;
    +			}
    +			return 6;
    +		}
    +
    +		@Override
    +		public Object[] getBucketBoundary(int bucketNum, int totalNumBuckets) {
    +			if (dim == 1) {
    +				/*
    +				for the first test, the boundary is just like : 
    +				(0, 7]
    +				(7, 14]
    +				(14, 21]
    +				 */
    +
    +				return new Integer[]{(bucketNum + 1) * 7};
    +			}
    +			/*
    +			for the second test, the boundary is just like : 
    +			(0, 1], (0, 1]
    --- End diff --
    
    This is not a common distribution for composite range partition keys. It should look similar to this one:
    ```
    ( (1, 2), (1, 7) ]
    ( (1, 7), (3, 2) ]
    ( (3, 2), (4, 8) ]
    ...
    ```
    This would result in 4 partitions:
    
    - Partition 1: `(0,2), (1,1), (0,10), ...`
    - Partition 2: `(1,3), (2,20), (1,8), ...`
    - Partition 3: `(4,10), (3,5), ...`
    - Partition 4: `(5,3), (8,20), (4,12), ...`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2997] Support range partition with user...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the pull request:

    https://github.com/apache/flink/pull/1776#issuecomment-197360737
  
    Hi, I had a few more remarks, mostly regarding the tests. Can you also add the method `TypeInformation[] getKeyTypes()` to `DataDistribution` that I requested in the last review? Thanks


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2997] Support range partition with user...

Posted by gallenvara <gi...@git.apache.org>.
Github user gallenvara commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1776#discussion_r55800752
  
    --- Diff: flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java ---
    @@ -82,6 +88,7 @@ public PartitionOperator(DataSet<T> input, Keys<T> pKeys, Partitioner<?> customP
     		this.pKeys = pKeys;
     		this.partitionLocationName = partitionLocationName;
     		this.customPartitioner = customPartitioner;
    +		this.distribution = distribution;
    --- End diff --
    
    Other `pMethod` like `hashPartition` and `customPartition` don't need the `distribution` and the `distribution` use its default value: `null`, i think no check here also can make sense. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2997] Support range partition with user...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/1776


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---