You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by gallenvara <gi...@git.apache.org> on 2016/03/30 04:54:55 UTC

[GitHub] flink pull request: [FLINK-2998] Support range partition compariso...

GitHub user gallenvara opened a pull request:

    https://github.com/apache/flink/pull/1838

    [FLINK-2998] Support range partition comparison for multi input nodes.

    The PR implements range partition comparison in operation such as join and cogroup for multi inputs, now optimizer can optimize the dag to avoid re-partition if it find the data distributions user supplied are equal.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/gallenvara/flink flink-2998

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/1838.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1838
    
----
commit 37e6147a829e50ba8a45c26f225e16e7695f6489
Author: gallenvara <ga...@126.com>
Date:   2016-03-29T14:36:21Z

    Support range partition comparison for multi input nodes.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2998] Support range partition compariso...

Posted by gallenvara <gi...@git.apache.org>.
Github user gallenvara commented on the pull request:

    https://github.com/apache/flink/pull/1838#issuecomment-212300584
  
    @fhueske PR updated.
    I am a little confused when i wrote the tests. The original dataset handled by a `map` operator to ensure that the type of partition key is same with the boundary in the supplied distribution. In the `DataDistribution` interface, the type of `getBucketBoundary` method returned is `Object[]`. My doubt is whether this can be changed to type of `Tuple`. I mean that when range partition by one field, it return `Tuple1` and two fields return `Tuple2`. Also in the `OutputEmmiter`, change the type of keys from `Object[]` to `Tuple` and comparing the key with boundary using `Tuple` comparator. If this is possible, the boundaries in the distribution for rangePartition test will be:
    `Tuple2<Integer, Long>[] boundaries = new Tuple2[]{
    new Tuple2(1, 1L),
    new Tuple2(3, 2L),
    ....
    }`
    This can make the test more succinct and direct.
    Another confusing is that why partitionByHash and partitionByRange do not support some KeySelectors returned Tuple type such as:
    ```
    public static class KeySelector3 implements KeySelector<Tuple3<Integer,Long,String>, Tuple2<Integer,Long>> {
    		private static final long serialVersionUID = 1L;
    		@Override
    		public Tuple2<Integer,Long> getKey(Tuple3<Integer,Long,String> in) {
    			return new Tuple2<>(in.f0,in.f1);
    		}
    	}
    ```
    and can not run the following codes:
    ```
    DataSet<Tuple3<Integer,Long,String>> dataSet = ...;
    dataSet.partitionByRange(new KeySelector3());
    ```
    Can you explain it to me?Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2998] Support range partition compariso...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1838#discussion_r59750088
  
    --- Diff: flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/AbstractJoinDescriptor.java ---
    @@ -142,9 +142,9 @@ public boolean areCompatible(RequestedGlobalProperties requested1, RequestedGlob
     			else if(produced1.getPartitioning() == PartitioningProperty.RANGE_PARTITIONED &&
     					produced2.getPartitioning() == PartitioningProperty.RANGE_PARTITIONED) {
    --- End diff --
    
    Add checks that both distributions are not `null`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2998] Support range partition compariso...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the pull request:

    https://github.com/apache/flink/pull/1838#issuecomment-203307525
  
    I'm currently on vacation. Will have a closer look when I'm back in about a week.
    
    I am not sure that we need to touch the Join and CoGroup operators to pass the distributions. The optimizer is able to get this from the GlobalProperties to decide whether the partitionings are valid and equivalent.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2998] Support range partition compariso...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the pull request:

    https://github.com/apache/flink/pull/1838#issuecomment-211923989
  
    Hi @gallenvara, thanks for the update! I tried it locally and it worked as expected. 
    
    I would like two more test methods though, to ensure that the thing is working end-to-end.
    Could you add one test method to `JoinITCase` which basically extends `testeUDFJoinOnTuplesWithMultipleKeyFieldPositions()` and uses range partitioning. For that you should provide a DataDistribution and set the parallelism to 4 on the environment. 
    Please to the same with `CoGroupITCase.testCoGroupWithMultipleKeyFieldsWithFieldSelector()`
    
    After that we can merge the PR. Thanks, Fabian


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2998] Support range partition compariso...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the pull request:

    https://github.com/apache/flink/pull/1838#issuecomment-210049036
  
    Thanks for the update @gallenvara. Aside from a few comments the PR looks good. No worries about the `GlobalProperties`. The internals of the optimizer are not straightforward ;-). 
    Thanks for adding the tests for the distribution / key length!



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2998] Support range partition compariso...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1838#discussion_r60396134
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/JoinITCase.java ---
    @@ -104,6 +112,51 @@ public void testeUDFJoinOnTuplesWithMultipleKeyFieldPositions() throws Exception
     	}
     
     	@Test
    +	public void testeUDFJoinOnTuplesWithMultipleKeyFieldPositions2() throws Exception {
    --- End diff --
    
    Comments of the `CoGroupITCase` apply here as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2998] Support range partition compariso...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1838#discussion_r59232583
  
    --- Diff: flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/TestDistribution.java ---
    @@ -0,0 +1,87 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.optimizer.operators;
    +
    +import org.apache.flink.api.common.distributions.DataDistribution;
    +import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.core.memory.DataInputView;
    +import org.apache.flink.core.memory.DataOutputView;
    +
    +import java.io.IOException;
    +import java.util.Arrays;
    +
    +public class TestDistribution implements DataDistribution {
    --- End diff --
    
    I think this class can be simplified a lot for our test purposes. We do not need to provide proper implementations for the most of the functionality (`getParallelism`, `getBucketBoundary`, `write`, `read`). `equals` can also be mocked to check for a single value.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2998] Support range partition compariso...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1838#discussion_r60395706
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java ---
    @@ -797,4 +873,46 @@ public void coGroup(Iterable<Integer> first, Iterable<Tuple3<Integer, Long, Stri
     			}
     		}
     	}
    +
    +	public static class TestDistribution implements DataDistribution {
    +		public Integer boundaries[][] = new Integer[][]{
    +				new Integer[]{2, 2},
    +				new Integer[]{5, 4},
    +				new Integer[]{10, 12},
    +				new Integer[]{21, 6}
    +		};
    +
    +		public TestDistribution() {}
    +
    +		@Override
    +		public Object[] getBucketBoundary(int bucketNum, int totalNumBuckets) {
    +			return boundaries[bucketNum];
    +		}
    +
    +		@Override
    +		public int getNumberOfFields() {
    +			return 2;
    +		}
    +
    +		@Override
    +		public TypeInformation[] getKeyTypes() {
    +			return new TypeInformation[]{BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO};
    --- End diff --
    
    Change to `Integer, Long` to support the orginal data type.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2998] Support range partition compariso...

Posted by gallenvara <gi...@git.apache.org>.
Github user gallenvara commented on the pull request:

    https://github.com/apache/flink/pull/1838#issuecomment-203350617
  
    @fhueske Yes, `TwoInputNode` rebuild the channels and `child` nodes don't have the information of `data distribution`. I have added the information into them and PR has been updated.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2998] Support range partition compariso...

Posted by gallenvara <gi...@git.apache.org>.
Github user gallenvara commented on the pull request:

    https://github.com/apache/flink/pull/1838#issuecomment-203216796
  
    @fhueske @ChengXiangLi Can you please help with review? :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2998] Support range partition compariso...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1838#discussion_r59232837
  
    --- Diff: flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/JoinGlobalPropertiesCompatibilityTest.java ---
    @@ -35,7 +37,7 @@ public void checkCompatiblePartitionings() {
     			final FieldList keysLeft = new FieldList(1, 4);
     			final FieldList keysRight = new FieldList(3, 1);
     
    -			SortMergeInnerJoinDescriptor descr = new SortMergeInnerJoinDescriptor(keysLeft, keysRight);
    +			SortMergeInnerJoinDescriptor descr1 = new SortMergeInnerJoinDescriptor(keysLeft, keysRight);
    --- End diff --
    
    why do you rename this variable. I think it can be used for the new tests as well, no?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2998] Support range partition compariso...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1838#discussion_r59231845
  
    --- Diff: flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/JoinWithDistribution.java ---
    @@ -0,0 +1,97 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.optimizer.operators;
    +
    +import org.apache.flink.api.common.Plan;
    +import org.apache.flink.api.common.functions.JoinFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.io.DiscardingOutputFormat;
    +import org.apache.flink.api.java.tuple.Tuple3;
    +import org.apache.flink.api.java.utils.DataSetUtils;
    +import org.apache.flink.optimizer.plan.*;
    +import org.apache.flink.optimizer.util.CompilerTestBase;
    +import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
    +import org.junit.Test;
    +
    +import static org.junit.Assert.assertEquals;
    +
    +public class JoinWithDistribution extends CompilerTestBase {
    --- End diff --
    
    Rename to `JoinWithDistributionTest`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2998] Support range partition compariso...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the pull request:

    https://github.com/apache/flink/pull/1838#issuecomment-212413005
  
    Thanks for the fast update @gallenvara! 
    PR looks good and can be merged if tests pass.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2998] Support range partition compariso...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1838#discussion_r59232978
  
    --- Diff: flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/JoinGlobalPropertiesCompatibilityTest.java ---
    @@ -93,7 +95,34 @@ public int partition(Object key, int numPartitions) {
     				GlobalProperties propsRight = new GlobalProperties();
     				propsRight.setCustomPartitioned(keysRight, part);
     				
    -				assertTrue(descr.areCompatible(reqLeft, reqRight, propsLeft, propsRight));
    +				assertTrue(descr1.areCompatible(reqLeft, reqRight, propsLeft, propsRight));
    +			}
    +
    +			TestDistribution dist1 = new TestDistribution(1);
    +			TestDistribution dist2 = new TestDistribution(1);
    +			SortMergeInnerJoinDescriptor descr2 = new SortMergeInnerJoinDescriptor(keysLeft, keysRight);
    +			
    +			// test compatible range partitioning
    +			{
    +				Ordering ordering1 = new Ordering();
    +				for (int field : keysLeft) {
    +					ordering1.appendOrdering(field, null, Order.ASCENDING);
    +				}
    +				Ordering ordering2 = new Ordering();
    +				for (int field : keysRight) {
    +					ordering2.appendOrdering(field, null, Order.ASCENDING);
    +				}
    +
    +				RequestedGlobalProperties reqLeft = new RequestedGlobalProperties();
    +				reqLeft.setRangePartitioned(ordering1, dist1);
    +				RequestedGlobalProperties reqRigth = new RequestedGlobalProperties();
    --- End diff --
    
    typo: `reqRigth` -> `reqRight`
    copy-pasted to the CoGroup test as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2998] Support range partition compariso...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/1838


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2998] Support range partition compariso...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1838#discussion_r59749772
  
    --- Diff: flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/CoGroupGlobalPropertiesCompatibilityTest.java ---
    @@ -95,6 +97,32 @@ public int partition(Object key, int numPartitions) {
     				
     				assertTrue(descr.areCompatible(reqLeft, reqRight, propsLeft, propsRight));
     			}
    +
    +			TestDistribution dist1 = new TestDistribution(1);
    +			TestDistribution dist2 = new TestDistribution(1);
    +			
    +			// test compatible range partitioning
    +			{
    --- End diff --
    
    Can you add a checks with two keys, one DESC and one ASC? 
    Orders of both should be the same to check if it is correctly identified as compatible.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2998] Support range partition compariso...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1838#discussion_r59229736
  
    --- Diff: flink-optimizer/src/main/java/org/apache/flink/optimizer/dataproperties/GlobalProperties.java ---
    @@ -83,6 +83,10 @@ public void setHashPartitioned(FieldList partitionedFields) {
     		this.partitioningFields = partitionedFields;
     		this.ordering = null;
     	}
    +	
    +	public void setDistribution(DataDistribution distribution) {
    --- End diff --
    
    Distribution should not be set from outside.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2998] Support range partition compariso...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1838#discussion_r59751421
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java ---
    @@ -175,6 +175,77 @@ else if (pIdx > 0 && pIdx < dist.getParallelism() - 1) {
     		env.execute();
     	}
     
    +	@Test
    +	public void testPartitionKeyLessDistribution() throws Exception{
    +		/*
    +		 * Test the number of keys less than the number of distribution fields
    +		 */
    +
    +		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
    +
    +		DataSet<Tuple3<Integer, Long, String>> input1 = CollectionDataSets.get3TupleDataSet(env);
    +		final TestDataDist2 dist = new TestDataDist2();
    +
    +		env.setParallelism(dist.getParallelism());
    +
    +		DataSet<Boolean> result = DataSetUtils
    +				.partitionByRange(input1, dist, 0)
    +				.mapPartition(new RichMapPartitionFunction<Tuple3<Integer, Long, String>, Boolean>() {
    +
    +								  @Override
    --- End diff --
    
    Can you adjust the indention to be as in the other test methods?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2998] Support range partition compariso...

Posted by gallenvara <gi...@git.apache.org>.
Github user gallenvara commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1838#discussion_r59819652
  
    --- Diff: flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java ---
    @@ -84,8 +84,8 @@ public PartitionOperator(DataSet<T> input, Keys<T> pKeys, Partitioner<?> customP
     		Preconditions.checkArgument(distribution == null || pMethod == PartitionMethod.RANGE, "Customized data distribution is only neccessary for range partition.");
     		
     		if (distribution != null) {
    -			Preconditions.checkArgument(distribution.getNumberOfFields() == pKeys.getNumberOfKeyFields(), "The number of key fields in the distribution and range partitioner should be the same.");
    -			Preconditions.checkArgument(Arrays.equals(distribution.getKeyTypes(), pKeys.getKeyFieldTypes()), "The types of key from the distribution and range partitioner are not equal.");
    +			Preconditions.checkArgument(pKeys.getNumberOfKeyFields() <= distribution.getNumberOfFields(), "The number of key fields in range partitioner should be less than the number in the distribution.");
    +			Preconditions.checkArgument(Arrays.equals(pKeys.getKeyFieldTypes(), Arrays.copyOfRange(distribution.getKeyTypes(), 0, pKeys.getNumberOfKeyFields())), "The type array of the partition key should be prefix of the type array of the distribution.");
    --- End diff --
    
    :) i will improve my poor English skill.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2998] Support range partition compariso...

Posted by gallenvara <gi...@git.apache.org>.
Github user gallenvara commented on the pull request:

    https://github.com/apache/flink/pull/1838#issuecomment-210255818
  
    @fhueske PR updated.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2998] Support range partition compariso...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the pull request:

    https://github.com/apache/flink/pull/1838#issuecomment-212608296
  
    Merging this PR


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2998] Support range partition compariso...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1838#discussion_r59752212
  
    --- Diff: flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java ---
    @@ -84,8 +84,8 @@ public PartitionOperator(DataSet<T> input, Keys<T> pKeys, Partitioner<?> customP
     		Preconditions.checkArgument(distribution == null || pMethod == PartitionMethod.RANGE, "Customized data distribution is only neccessary for range partition.");
     		
     		if (distribution != null) {
    -			Preconditions.checkArgument(distribution.getNumberOfFields() == pKeys.getNumberOfKeyFields(), "The number of key fields in the distribution and range partitioner should be the same.");
    -			Preconditions.checkArgument(Arrays.equals(distribution.getKeyTypes(), pKeys.getKeyFieldTypes()), "The types of key from the distribution and range partitioner are not equal.");
    +			Preconditions.checkArgument(pKeys.getNumberOfKeyFields() <= distribution.getNumberOfFields(), "The number of key fields in range partitioner should be less than the number in the distribution.");
    --- End diff --
    
    Update the message to "The distribution must provide at least as many fields as flat key fields are specified."


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2998] Support range partition compariso...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1838#discussion_r59233074
  
    --- Diff: flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/CoGroupGlobalPropertiesCompatibilityTest.java ---
    @@ -35,7 +37,7 @@ public void checkCompatiblePartitionings() {
     			final FieldList keysLeft = new FieldList(1, 4);
     			final FieldList keysRight = new FieldList(3, 1);
     			
    -			CoGroupDescriptor descr = new CoGroupDescriptor(keysLeft, keysRight);
    +			CoGroupDescriptor descr1 = new CoGroupDescriptor(keysLeft, keysRight);
    --- End diff --
    
    Can't we use this object for the new test as well? Then we would not need to rename it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2998] Support range partition compariso...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1838#discussion_r59752067
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java ---
    @@ -175,6 +175,77 @@ else if (pIdx > 0 && pIdx < dist.getParallelism() - 1) {
     		env.execute();
     	}
     
    +	@Test
    +	public void testPartitionKeyLessDistribution() throws Exception{
    +		/*
    +		 * Test the number of keys less than the number of distribution fields
    +		 */
    +
    +		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
    +
    +		DataSet<Tuple3<Integer, Long, String>> input1 = CollectionDataSets.get3TupleDataSet(env);
    +		final TestDataDist2 dist = new TestDataDist2();
    +
    +		env.setParallelism(dist.getParallelism());
    +
    +		DataSet<Boolean> result = DataSetUtils
    +				.partitionByRange(input1, dist, 0)
    +				.mapPartition(new RichMapPartitionFunction<Tuple3<Integer, Long, String>, Boolean>() {
    +
    +								  @Override
    +								  public void mapPartition(Iterable<Tuple3<Integer, Long, String>> values, Collector<Boolean> out) throws Exception {
    +									  int pIdx = getRuntimeContext().getIndexOfThisSubtask();
    +
    +									  for (Tuple3<Integer, Long, String> s : values) {
    +										  boolean correctlyPartitioned = true;
    +										  if (pIdx == 0) {
    +											  Integer[] upper = dist.boundaries[0];
    +											  if (s.f0.compareTo(upper[0]) > 0) {
    +												  correctlyPartitioned = false;
    +											  }
    +										  } else if (pIdx > 0 && pIdx < dist.getParallelism() - 1) {
    +											  Integer[] lower = dist.boundaries[pIdx - 1];
    +											  Integer[] upper = dist.boundaries[pIdx];
    +											  if (s.f0.compareTo(upper[0]) > 0 || (s.f0.compareTo(lower[0]) <= 0)) {
    +												  correctlyPartitioned = false;
    +											  }
    +										  } else {
    +											  Integer[] lower = dist.boundaries[pIdx - 1];
    +											  if ((s.f0.compareTo(lower[0]) <= 0)) {
    +												  correctlyPartitioned = false;
    +											  }
    +										  }
    +
    +										  if (!correctlyPartitioned) {
    +											  fail("Record was not correctly partitioned: " + s.toString());
    +										  }
    +									  }
    +								  }
    +							  }
    +				);
    +
    +		result.output(new DiscardingOutputFormat<Boolean>());
    +		env.execute();
    +	}
    +
    +	@Test(expected = IllegalArgumentException.class)
    +	public void testPartitionMoreThanDistribution() throws Exception{
    +		/*
    +		 * Test the number of keys larger than the number of distribution fields
    +		 */
    +
    +		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
    +
    +		DataSet<Tuple3<Integer, Long, String>> input1 = CollectionDataSets.get3TupleDataSet(env);
    +		final TestDataDist2 dist = new TestDataDist2();
    +
    +		DataSet<Tuple3<Integer, Long, String>> result = DataSetUtils
    +				.partitionByRange(input1, dist, 0, 1, 2);
    +
    +		result.output(new DiscardingOutputFormat<Tuple3<Integer, Long, String>>());
    --- End diff --
    
    I think you can remove this code. The test should fail at `partitionByRange(...)`, right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2998] Support range partition compariso...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1838#discussion_r59750107
  
    --- Diff: flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/CoGroupDescriptor.java ---
    @@ -151,10 +151,10 @@ public boolean areCompatible(RequestedGlobalProperties requested1, RequestedGlob
     		else if(produced1.getPartitioning() == PartitioningProperty.RANGE_PARTITIONED &&
     				produced2.getPartitioning() == PartitioningProperty.RANGE_PARTITIONED) {
    --- End diff --
    
    Add checks that both distributions are not `null`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2998] Support range partition compariso...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1838#discussion_r60395647
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java ---
    @@ -797,4 +873,46 @@ public void coGroup(Iterable<Integer> first, Iterable<Tuple3<Integer, Long, Stri
     			}
     		}
     	}
    +
    +	public static class TestDistribution implements DataDistribution {
    +		public Integer boundaries[][] = new Integer[][]{
    +				new Integer[]{2, 2},
    +				new Integer[]{5, 4},
    +				new Integer[]{10, 12},
    +				new Integer[]{21, 6}
    +		};
    +
    +		public TestDistribution() {}
    +
    +		@Override
    +		public Object[] getBucketBoundary(int bucketNum, int totalNumBuckets) {
    +			return boundaries[bucketNum];
    +		}
    +
    +		@Override
    +		public int getNumberOfFields() {
    +			return 2;
    +		}
    +
    +		@Override
    +		public TypeInformation[] getKeyTypes() {
    +			return new TypeInformation[]{BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO};
    +		}
    +
    +		@Override
    +		public void write(DataOutputView out) throws IOException {
    +
    +		}
    +
    +		@Override
    +		public void read(DataInputView in) throws IOException {
    +
    +		}
    +
    +		@Override
    +		public boolean equals(Object obj) {
    +			// The test is running with same distribution, so return true directly
    +			return true; 
    --- End diff --
    
    That is true. 
    Could you change it to `obj instanceof TestDistribution` nonetheless? Sometimes test code is copied and this could be a bit dangerous ;-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2998] Support range partition compariso...

Posted by gallenvara <gi...@git.apache.org>.
Github user gallenvara commented on the pull request:

    https://github.com/apache/flink/pull/1838#issuecomment-212409952
  
    @fhueske Thanks a lot for the explanation and the relevant codes have been modified.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2998] Support range partition compariso...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1838#discussion_r60396078
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java ---
    @@ -301,6 +309,51 @@ public void testCoGroupWithMultipleKeyFieldsWithFieldSelector() throws Exception
     	}
     
     	@Test
    +	public void testCoGroupWithMultipleKeyFieldsWithFieldSelector2() throws Exception {
    --- End diff --
    
    Can you change the method name to `testCoGroupWithRangePartitioning` and move the method to bottom of the file (before the helper classes).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2998] Support range partition compariso...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1838#discussion_r59229663
  
    --- Diff: flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/TwoInputNode.java ---
    @@ -454,6 +463,14 @@ public void computeInterestingPropertiesForInputs(CostEstimator estimator) {
     							}
     						}
     						
    +						if (child2.getInputs().iterator().hasNext()) {
    --- End diff --
    
    Same as above.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2998] Support range partition compariso...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the pull request:

    https://github.com/apache/flink/pull/1838#issuecomment-212402787
  
    Hi @gallenvara, the tests look good. I left a few comments to make the test code more concise.
    
    Regarding your questions:
    1) Yes, we could change the `Object[]` to Tuple. However, I am not sure if this would confuse users. Tuple is usually used as a data type in programs while in DataDistribution it would be a holder for composite keys. 
    2) The problem with returning `Tuple` from a `KeySelector` is that `Tuple` does not implement `Comparable` as requested by `partitionByHash` or `partitionByRange`. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2998] Support range partition compariso...

Posted by gallenvara <gi...@git.apache.org>.
Github user gallenvara commented on the pull request:

    https://github.com/apache/flink/pull/1838#issuecomment-208712697
  
    @fhueske Thanks a lot for your advice. PR updated. Please forgive my limited understand about the logic of `GlobalProperties`. I added tests to `CustomDistributionITCase` about the number of partition key being less or larger than the number in the distribution.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2998] Support range partition compariso...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1838#discussion_r59229638
  
    --- Diff: flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/TwoInputNode.java ---
    @@ -427,6 +428,14 @@ public void computeInterestingPropertiesForInputs(CostEstimator estimator) {
     						}
     					}
     					
    +					if (child1.getInputs().iterator().hasNext()) {
    --- End diff --
    
    This change is not necessary. The data distribution should be set within `GlobalProperties`. I'll point you to the correct places.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2998] Support range partition compariso...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the pull request:

    https://github.com/apache/flink/pull/1838#issuecomment-208421293
  
    `GlobalProperties` must copy the `distribution` itself. There are three places where this must be added:
    1) `filterBySemanticProperties()`, line 314: `gp.distribution = this.distribution;`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2998] Support range partition compariso...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1838#discussion_r60395484
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java ---
    @@ -301,6 +309,51 @@ public void testCoGroupWithMultipleKeyFieldsWithFieldSelector() throws Exception
     	}
     
     	@Test
    +	public void testCoGroupWithMultipleKeyFieldsWithFieldSelector2() throws Exception {
    +		/*
    +		 * UDF Join on tuples with multiple key field positions and same customized distribution
    +		 */
    +
    +		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    +
    +		DataSet<Tuple5<Integer, Long, Integer, String, Integer>> ds1 = CollectionDataSets.get5TupleDataSet(env)
    +				.map(new MapFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, Long, Integer, String, Integer>>() {
    --- End diff --
    
    Can you keep the original data type, i.e., not convert the last field to Integer? 
    1) It will check if the range partitioning handles different types.
    2) it will be more concise (no map functions + no additional CoGroupFunction).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2998] Support range partition compariso...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1838#discussion_r59233583
  
    --- Diff: flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/CoGroupGlobalPropertiesCompatibilityTest.java ---
    @@ -93,7 +95,34 @@ public int partition(Object key, int numPartitions) {
     				GlobalProperties propsRight = new GlobalProperties();
     				propsRight.setCustomPartitioned(keysRight, part);
     				
    -				assertTrue(descr.areCompatible(reqLeft, reqRight, propsLeft, propsRight));
    +				assertTrue(descr1.areCompatible(reqLeft, reqRight, propsLeft, propsRight));
    +			}
    +
    +			TestDistribution dist1 = new TestDistribution(1);
    +			TestDistribution dist2 = new TestDistribution(1);
    +			CoGroupDescriptor descr2 = new CoGroupDescriptor(keysLeft, keysRight);
    +			
    +			// test compatible range partitioning
    --- End diff --
    
    We also need tests for a range partitioning with two (or more) keys and tests for range partitioning with different orders.
    The tests should check for compatible and incompatible partitionings.
    Same applies for the `JoinGlobalPropertiesCompatibilityTest`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2998] Support range partition compariso...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1838#discussion_r59752679
  
    --- Diff: flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java ---
    @@ -84,8 +84,8 @@ public PartitionOperator(DataSet<T> input, Keys<T> pKeys, Partitioner<?> customP
     		Preconditions.checkArgument(distribution == null || pMethod == PartitionMethod.RANGE, "Customized data distribution is only neccessary for range partition.");
     		
     		if (distribution != null) {
    -			Preconditions.checkArgument(distribution.getNumberOfFields() == pKeys.getNumberOfKeyFields(), "The number of key fields in the distribution and range partitioner should be the same.");
    -			Preconditions.checkArgument(Arrays.equals(distribution.getKeyTypes(), pKeys.getKeyFieldTypes()), "The types of key from the distribution and range partitioner are not equal.");
    +			Preconditions.checkArgument(pKeys.getNumberOfKeyFields() <= distribution.getNumberOfFields(), "The number of key fields in range partitioner should be less than the number in the distribution.");
    +			Preconditions.checkArgument(Arrays.equals(pKeys.getKeyFieldTypes(), Arrays.copyOfRange(distribution.getKeyTypes(), 0, pKeys.getNumberOfKeyFields())), "The type array of the partition key should be prefix of the type array of the distribution.");
    --- End diff --
    
    Please split this line and update the message to "The types of the flat key fields must be a equal of the types of the fields of the distribution."


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2998] Support range partition compariso...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1838#discussion_r60229754
  
    --- Diff: flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java ---
    @@ -84,8 +84,8 @@ public PartitionOperator(DataSet<T> input, Keys<T> pKeys, Partitioner<?> customP
     		Preconditions.checkArgument(distribution == null || pMethod == PartitionMethod.RANGE, "Customized data distribution is only neccessary for range partition.");
     		
     		if (distribution != null) {
    -			Preconditions.checkArgument(distribution.getNumberOfFields() == pKeys.getNumberOfKeyFields(), "The number of key fields in the distribution and range partitioner should be the same.");
    -			Preconditions.checkArgument(Arrays.equals(distribution.getKeyTypes(), pKeys.getKeyFieldTypes()), "The types of key from the distribution and range partitioner are not equal.");
    +			Preconditions.checkArgument(pKeys.getNumberOfKeyFields() <= distribution.getNumberOfFields(), "The number of key fields in range partitioner should be less than the number in the distribution.");
    +			Preconditions.checkArgument(Arrays.equals(pKeys.getKeyFieldTypes(), Arrays.copyOfRange(distribution.getKeyTypes(), 0, pKeys.getNumberOfKeyFields())), "The type array of the partition key should be prefix of the type array of the distribution.");
    --- End diff --
    
    Oh, maybe I should improve my English skills. The message should read like: `"The types of the flat key fields must be equal to the types of the fields of the distribution."`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2998] Support range partition compariso...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1838#discussion_r60395945
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java ---
    @@ -301,6 +309,51 @@ public void testCoGroupWithMultipleKeyFieldsWithFieldSelector() throws Exception
     	}
     
     	@Test
    +	public void testCoGroupWithMultipleKeyFieldsWithFieldSelector2() throws Exception {
    +		/*
    +		 * UDF Join on tuples with multiple key field positions and same customized distribution
    +		 */
    +
    +		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    +
    +		DataSet<Tuple5<Integer, Long, Integer, String, Integer>> ds1 = CollectionDataSets.get5TupleDataSet(env)
    +				.map(new MapFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, Long, Integer, String, Integer>>() {
    +					@Override
    +					public Tuple5<Integer, Long, Integer, String, Integer> map(Tuple5<Integer, Long, Integer, String, Long> value) throws Exception {
    +						return new Tuple5<>(value.f0, value.f1, value.f2, value.f3, value.f4.intValue());
    +					}
    +				});
    +
    +		DataSet<Tuple3<Integer, Integer, String>> ds2 = CollectionDataSets.get3TupleDataSet(env)
    +				.map(new MapFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, Integer, String>>() {
    +					@Override
    +					public Tuple3<Integer, Integer, String> map(Tuple3<Integer, Long, String> value) throws Exception {
    +						return new Tuple3<>(value.f0, value.f1.intValue(), value.f2);
    +					}
    +				});
    +		
    +		env.setParallelism(4);
    +		TestDistribution testDis = new TestDistribution();
    +		DataSet<Tuple3<Integer, Long, String>> coGrouped =
    +				DataSetUtils.partitionByRange(ds1, testDis, 0, 4)
    +				.coGroup(DataSetUtils.partitionByRange(ds2, testDis, 0, 1))
    +				.where(0, 4)
    +				.equalTo(0, 1)
    +				.with(new Tuple5Tuple3CoGroup2());
    --- End diff --
    
    Please use the same `CoGroupFunction` as in the original test.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---