You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by chiwanpark <gi...@git.apache.org> on 2016/02/04 13:18:39 UTC

[GitHub] flink pull request: [FLINK-3234] [dataSet] Add KeySelector support...

GitHub user chiwanpark opened a pull request:

    https://github.com/apache/flink/pull/1585

    [FLINK-3234] [dataSet] Add KeySelector support to sortPartition operation.

    This PR contains following changes:
    
    * Add `sortPartition` methods which receive a `KeySelector` instance or a Scala lambda function
    * Add `SortPartitionOperator` constructor which takes `Keys.SelectorFunctionKeys`
    * Add `getFlatFields` method to `SortPartitionOperator` class for `Keys.SelectorFunctionKeys`
    * Add some test cases

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/chiwanpark/flink FLINK-3234

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/1585.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1585
    
----
commit 8dc8845449b494f1495965015d96f114551abc31
Author: Chiwan Park <ch...@apache.org>
Date:   2016-02-04T11:46:10Z

    [FLINK-3234] [dataSet] Add KeySelector support to sortPartition operation

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3234] [dataSet] Add KeySelector support...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the pull request:

    https://github.com/apache/flink/pull/1585#issuecomment-181773503
  
    The refactoring looks good, @chiwanpark.
    I have just a few minor remarks. The PR can be resolved after these have been addressed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3234] [dataSet] Add KeySelector support...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1585#discussion_r51870870
  
    --- Diff: flink-java/src/test/java/org/apache/flink/api/java/operator/SortPartitionTest.java ---
    @@ -169,6 +169,38 @@ public void testSortPartitionWithExpressionKeys4() {
     		tupleDs.sortPartition("f3", Order.ASCENDING);
     	}
     
    +	@Test
    --- End diff --
    
    I suggest add more tests:
    - mixing `SelectorFunctionKeys` and `ExpressionKeys` -> Should not work
    - use a `KeySelector` that returns a composite type, e.g., a `Tuple3`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3234] [dataSet] Add KeySelector support...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1585#discussion_r51870968
  
    --- Diff: flink-scala/src/main/scala/org/apache/flink/api/scala/PartitionSortedDataSet.scala ---
    @@ -39,12 +41,25 @@ class PartitionSortedDataSet[T: ClassTag](set: SortPartitionOperator[T])
         this
       }
     
    -/**
    - * Appends the given field and order to the sort-partition operator.
    - */
    +  /**
    +   * Appends the given field and order to the sort-partition operator.
    +   */
       override def sortPartition(field: String, order: Order): DataSet[T] = {
         this.set.sortPartition(field, order)
         this
       }
     
    +  /**
    +    * Appends the given field and order to the sort-partition operator.
    +    */
    +  override def sortPartition[K: TypeInformation](fun: T => K, order: Order): DataSet[T] = {
    --- End diff --
    
    Do not allow chaining of `KeySelector`s. Make sure different key types are not mixed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3234] [dataSet] Add KeySelector support...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1585#discussion_r52281965
  
    --- Diff: flink-java/src/main/java/org/apache/flink/api/java/DataSet.java ---
    @@ -1377,6 +1377,24 @@ public long count() throws Exception {
     		return new SortPartitionOperator<>(this, field, order, Utils.getCallLocationName());
     	}
     
    +	/**
    +	 * Locally sorts the partitions of the DataSet on the an extracted key in the specified order.
    +	 * DataSet can be sorted on multiple values by returning a tuple from the KeySelector.
    +	 *
    +	 * Note that any key extraction methods cannot be chained with the KeySelector. To sort the
    --- End diff --
    
    "Note that any key extraction methods cannot be ..." -> "Note that no additional sort keys can be appended to a KeySelector."


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3234] [dataSet] Add KeySelector support...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1585#discussion_r51870482
  
    --- Diff: flink-java/src/main/java/org/apache/flink/api/java/operators/SortPartitionOperator.java ---
    @@ -89,6 +100,22 @@ public SortPartitionOperator(DataSet<T> dataSet, String sortField, Order sortOrd
     		return this;
     	}
     
    +	/**
    +	 * Appends an additional sort order with the specified field in the specified order to the
    +	 * local partition sorting of the DataSet.
    +	 *
    +	 * @param keyExtractor The KeySelector function which extracts the key value of the additional
    +	 *                     sort order of the local partition sorting.
    +	 * @param order The order of the additional sort order of the local partition sorting.
    +	 * @return The DataSet with sorted local partitions.
    +	 */
    +	public <K> SortPartitionOperator<T> sortPartition(KeySelector<T, K> keyExtractor, Order order) {
    --- End diff --
    
    I would not allow chaining.
    In addition, we must make sure that `ExpressionKeys` and `SelectorFunctionKeys` are not mixed. So you can either use a single key selector or one or more expression keys.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3234] [dataSet] Add KeySelector support...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1585#discussion_r51870995
  
    --- Diff: flink-tests/src/test/scala/org/apache/flink/api/scala/operators/SortPartitionITCase.scala ---
    @@ -166,6 +166,23 @@ class SortPartitionITCase(mode: TestExecutionMode) extends MultipleProgramsTestB
         TestBaseUtils.compareResultAsText(result.asJava, expected)
       }
     
    +  @Test
    +  def testSortPartitionWithKeySelector(): Unit = {
    --- End diff --
    
    Add same tests as for Java.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3234] [dataSet] Add KeySelector support...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1585#discussion_r52099909
  
    --- Diff: flink-java/src/main/java/org/apache/flink/api/java/operators/SortPartitionOperator.java ---
    @@ -79,16 +112,33 @@ public SortPartitionOperator(DataSet<T> dataSet, String sortField, Order sortOrd
     	 * local partition sorting of the DataSet.
     	 *
     	 * @param field The field expression referring to the field of the additional sort order of
    -	 *                 the local partition sorting.
    -	 * @param order The order  of the additional sort order of the local partition sorting.
    +	 *              the local partition sorting.
    +	 * @param order The order of the additional sort order of the local partition sorting.
     	 * @return The DataSet with sorted local partitions.
     	 */
     	public SortPartitionOperator<T> sortPartition(String field, Order order) {
    +		if (useKeySelector) {
    +			throw new InvalidProgramException("Expression keys cannot be appended after selector function keys");
    +		}
    +
     		int[] flatOrderKeys = getFlatFields(field);
     		this.appendSorting(flatOrderKeys, order);
     		return this;
     	}
     
    +	/**
    +	 * Appends an additional sort order with the specified field in the specified order to the
    +	 * local partition sorting of the DataSet.
    +	 *
    +	 * @param keyExtractor The KeySelector function which extracts the key value of the additional
    +	 *                     sort order of the local partition sorting.
    +	 * @param order        The order of the additional sort order of the local partition sorting.
    +	 * @return The DataSet with sorted local partitions.
    +	 */
    +	public <K> SortPartitionOperator<T> sortPartition(KeySelector<T, K> keyExtractor, Order order) {
    --- End diff --
    
    Oh yes, you are right! 
    Can you update the JavaDocs of this method and explain that chaining is not possible and how to work around it?
    Thanks


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3234] [dataSet] Add KeySelector support...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1585#discussion_r51993269
  
    --- Diff: flink-java/src/test/java/org/apache/flink/api/java/operator/SortPartitionTest.java ---
    @@ -169,6 +169,38 @@ public void testSortPartitionWithExpressionKeys4() {
     		tupleDs.sortPartition("f3", Order.ASCENDING);
     	}
     
    +	@Test
    +	public void testSortPartitionWithKeySelector1() {
    +		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    +		DataSet<Tuple4<Integer, Long, CustomType, Long[]>> tupleDs = env.fromCollection(tupleWithCustomData, tupleWithCustomInfo);
    +
    +		// should work
    +		try {
    +			tupleDs.sortPartition(new KeySelector<Tuple4<Integer, Long, CustomType, Long[]>, Integer>() {
    +				@Override
    +				public Integer getKey(Tuple4<Integer, Long, CustomType, Long[]> value) throws Exception {
    +					return value.f0;
    --- End diff --
    
    Can you return some other field than `f0` here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3234] [dataSet] Add KeySelector support...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1585#discussion_r52281750
  
    --- Diff: flink-java/src/main/java/org/apache/flink/api/java/DataSet.java ---
    @@ -1377,6 +1377,24 @@ public long count() throws Exception {
     		return new SortPartitionOperator<>(this, field, order, Utils.getCallLocationName());
     	}
     
    +	/**
    +	 * Locally sorts the partitions of the DataSet on the an extracted key in the specified order.
    +	 * DataSet can be sorted on multiple values by returning a tuple from the KeySelector.
    --- End diff --
    
    "The DataSet can be ...", add "The"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3234] [dataSet] Add KeySelector support...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1585#discussion_r51870368
  
    --- Diff: flink-java/src/main/java/org/apache/flink/api/java/operators/SortPartitionOperator.java ---
    @@ -59,6 +62,14 @@ public SortPartitionOperator(DataSet<T> dataSet, String sortField, Order sortOrd
     		this.appendSorting(flatOrderKeys, sortOrder);
     	}
     
    +	public <K> SortPartitionOperator(DataSet<T> dataSet, Keys.SelectorFunctionKeys<T, K> sortKey, Order sortOrder, String sortLocationName) {
    +		super(dataSet, dataSet.getType());
    +		this.sortLocationName = sortLocationName;
    +
    +		int[] flatOrderKeys = getFlatFields(sortKey);
    --- End diff --
    
    This does not work. 
    `SelectorFunctionKeys` need to be handled completely different. In short, you need to inject a MapFunction<IN, Tuple2<KEY, IN>> that extracts the key and emits a Tuple2 with the extracted key and the original input record.
    The data set is then sorted on the first tuple field (the key field) and the records are unwrapped after the sort by another `MapFunction<Tuple2<KEY, IN>, IN>`. Have a look at how the PartitionOperator handles KeySelectors.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3234] [dataSet] Add KeySelector support...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1585#discussion_r52281721
  
    --- Diff: flink-java/src/main/java/org/apache/flink/api/java/DataSet.java ---
    @@ -1377,6 +1377,24 @@ public long count() throws Exception {
     		return new SortPartitionOperator<>(this, field, order, Utils.getCallLocationName());
     	}
     
    +	/**
    +	 * Locally sorts the partitions of the DataSet on the an extracted key in the specified order.
    --- End diff --
    
    "...the DataSet on the **an** extracted key...", remove "an"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3234] [dataSet] Add KeySelector support...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1585#discussion_r51870593
  
    --- Diff: flink-java/src/test/java/org/apache/flink/api/java/operator/SortPartitionTest.java ---
    @@ -169,6 +169,38 @@ public void testSortPartitionWithExpressionKeys4() {
     		tupleDs.sortPartition("f3", Order.ASCENDING);
     	}
     
    +	@Test
    +	public void testSortPartitionWithKeySelector1() {
    +		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    +		DataSet<Tuple4<Integer, Long, CustomType, Long[]>> tupleDs = env.fromCollection(tupleWithCustomData, tupleWithCustomInfo);
    +
    +		// should work
    +		try {
    +			tupleDs.sortPartition(new KeySelector<Tuple4<Integer, Long, CustomType, Long[]>, Integer>() {
    +				@Override
    +				public Integer getKey(Tuple4<Integer, Long, CustomType, Long[]> value) throws Exception {
    +					return value.f0;
    --- End diff --
    
    The test only works because the KeySelector function returns the first field of the input data. If you change this to `value.f1` the test will fail.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3234] [dataSet] Add KeySelector support...

Posted by chiwanpark <gi...@git.apache.org>.
Github user chiwanpark commented on the pull request:

    https://github.com/apache/flink/pull/1585#issuecomment-179843708
  
    Thanks for review @fhueske! I'll address your comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3234] [dataSet] Add KeySelector support...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1585#discussion_r52099968
  
    --- Diff: flink-java/src/test/java/org/apache/flink/api/java/operator/SortPartitionTest.java ---
    @@ -169,6 +169,72 @@ public void testSortPartitionWithExpressionKeys4() {
     		tupleDs.sortPartition("f3", Order.ASCENDING);
     	}
     
    +	@Test
    +	public void testSortPartitionWithKeySelector1() {
    +		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    +		DataSet<Tuple4<Integer, Long, CustomType, Long[]>> tupleDs = env.fromCollection(tupleWithCustomData, tupleWithCustomInfo);
    +
    +		// should work
    +		try {
    +			tupleDs.sortPartition(new KeySelector<Tuple4<Integer, Long, CustomType, Long[]>, Integer>() {
    +				@Override
    +				public Integer getKey(Tuple4<Integer, Long, CustomType, Long[]> value) throws Exception {
    +					return value.f0;
    +				}
    +			}, Order.ASCENDING);
    +		} catch (Exception e) {
    +			Assert.fail();
    +		}
    +	}
    +
    +	@Test(expected = InvalidProgramException.class)
    +	public void testSortPartitionWithKeySelector2() {
    +		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    +		DataSet<Tuple4<Integer, Long, CustomType, Long[]>> tupleDs = env.fromCollection(tupleWithCustomData, tupleWithCustomInfo);
    +
    +		// must not work
    +		tupleDs.sortPartition(new KeySelector<Tuple4<Integer, Long, CustomType, Long[]>, Long[]>() {
    +			@Override
    +			public Long[] getKey(Tuple4<Integer, Long, CustomType, Long[]> value) throws Exception {
    +				return value.f3;
    +			}
    +		}, Order.ASCENDING);
    +	}
    +
    +	@Test(expected = InvalidProgramException.class)
    --- End diff --
    
    Can you add a test that first uses KeySelectors and then ExpressionKeys?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3234] [dataSet] Add KeySelector support...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1585#discussion_r52282174
  
    --- Diff: flink-java/src/main/java/org/apache/flink/api/java/operators/SortPartitionOperator.java ---
    @@ -79,58 +119,41 @@ public SortPartitionOperator(DataSet<T> dataSet, String sortField, Order sortOrd
     	 * local partition sorting of the DataSet.
     	 *
     	 * @param field The field expression referring to the field of the additional sort order of
    -	 *                 the local partition sorting.
    -	 * @param order The order  of the additional sort order of the local partition sorting.
    +	 *              the local partition sorting.
    +	 * @param order The order of the additional sort order of the local partition sorting.
     	 * @return The DataSet with sorted local partitions.
     	 */
     	public SortPartitionOperator<T> sortPartition(String field, Order order) {
    -		int[] flatOrderKeys = getFlatFields(field);
    -		this.appendSorting(flatOrderKeys, order);
    +		if (useKeySelector) {
    +			throw new InvalidProgramException("Expression keys cannot be appended after a KeySelector");
    +		}
    +
    +		ensureSortableKey(field);
    +		keys.add(new Keys.ExpressionKeys<>(field, getType()));
    +		orders.add(order);
    +
     		return this;
     	}
     
    -	// --------------------------------------------------------------------------------------------
    -	//  Key Extraction
    -	// --------------------------------------------------------------------------------------------
    -
    -	private int[] getFlatFields(int field) {
    +	public <K> SortPartitionOperator<T> sortPartition(KeySelector<T, K> keyExtractor, Order order) {
    +		throw new InvalidProgramException("KeySelector cannot be chained.");
    +	}
     
    -		if (!Keys.ExpressionKeys.isSortKey(field, super.getType())) {
    +	private void ensureSortableKey(int field) throws InvalidProgramException {
    +		if (!Keys.ExpressionKeys.isSortKey(field, getType())) {
     			throw new InvalidProgramException("Selected sort key is not a sortable type");
     		}
    -
    -		Keys.ExpressionKeys<T> ek = new Keys.ExpressionKeys<>(field, super.getType());
    -		return ek.computeLogicalKeyPositions();
     	}
     
    -	private int[] getFlatFields(String fields) {
    -
    -		if (!Keys.ExpressionKeys.isSortKey(fields, super.getType())) {
    +	private void ensureSortableKey(String field) throws InvalidProgramException {
    +		if (!Keys.ExpressionKeys.isSortKey(field, getType())) {
     			throw new InvalidProgramException("Selected sort key is not a sortable type");
     		}
    -
    -		Keys.ExpressionKeys<T> ek = new Keys.ExpressionKeys<>(fields, super.getType());
    -		return ek.computeLogicalKeyPositions();
     	}
     
    -	private void appendSorting(int[] flatOrderFields, Order order) {
    -
    -		if(this.sortKeyPositions == null) {
    -			// set sorting info
    -			this.sortKeyPositions = flatOrderFields;
    -			this.sortOrders = new Order[flatOrderFields.length];
    -			Arrays.fill(this.sortOrders, order);
    -		} else {
    -			// append sorting info to exising info
    -			int oldLength = this.sortKeyPositions.length;
    -			int newLength = oldLength + flatOrderFields.length;
    -			this.sortKeyPositions = Arrays.copyOf(this.sortKeyPositions, newLength);
    -			this.sortOrders = Arrays.copyOf(this.sortOrders, newLength);
    -
    -			for(int i=0; i<flatOrderFields.length; i++) {
    -				this.sortKeyPositions[oldLength+i] = flatOrderFields[i];
    -				this.sortOrders[oldLength+i] = order;
    -			}
    +	private void ensureSortableKey(Keys<T> sortKey) {
    --- End diff --
    
    Change the `sortKey` parameter type to `SelectorFunctionKeys` and remove the type check + cast.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3234] [dataSet] Add KeySelector support...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1585#discussion_r52282391
  
    --- Diff: flink-tests/src/test/scala/org/apache/flink/api/scala/operators/SortPartitionITCase.scala ---
    @@ -166,6 +167,58 @@ class SortPartitionITCase(mode: TestExecutionMode) extends MultipleProgramsTestB
         TestBaseUtils.compareResultAsText(result.asJava, expected)
       }
     
    +  @Test
    +  def testSortPartitionWithKeySelector1(): Unit = {
    +    val env = ExecutionEnvironment.getExecutionEnvironment
    +    env.setParallelism(4)
    +    val ds = CollectionDataSets.get3TupleDataSet(env)
    +
    +    val result = ds
    +      .map { x => x }.setParallelism(4)
    +      .sortPartition(_._2, Order.DESCENDING)
    --- End diff --
    
    Change sort order to `ASCENDING` (or in the other test).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3234] [dataSet] Add KeySelector support...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1585#discussion_r52282336
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/SortPartitionITCase.java ---
    @@ -197,6 +198,58 @@ public void testSortPartitionParallelismChange() throws Exception {
     		compareResultAsText(result, expected);
     	}
     
    +	@Test
    +	public void testSortPartitionWithKeySelector1() throws Exception {
    +		/*
    +		 * Test sort partition on an extracted key
    +		 */
    +
    +		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    +		env.setParallelism(4);
    +
    +		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
    +		List<Tuple1<Boolean>> result = ds
    +			.map(new IdMapper<Tuple3<Integer, Long, String>>()).setParallelism(4) // parallelize input
    +			.sortPartition(new KeySelector<Tuple3<Integer, Long, String>, Long>() {
    +				@Override
    +				public Long getKey(Tuple3<Integer, Long, String> value) throws Exception {
    +					return value.f1;
    +				}
    +			}, Order.DESCENDING)
    --- End diff --
    
    Change sort order to `ASCENDING` (or in the other test).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3234] [dataSet] Add KeySelector support...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the pull request:

    https://github.com/apache/flink/pull/1585#issuecomment-180268588
  
    Hi @chiwanpark, thanks for the update!
    The refactorings you proposed make a lot of sense. +1
    
    Thanks, Fabian


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3234] [dataSet] Add KeySelector support...

Posted by chiwanpark <gi...@git.apache.org>.
Github user chiwanpark commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1585#discussion_r52098334
  
    --- Diff: flink-java/src/main/java/org/apache/flink/api/java/operators/SortPartitionOperator.java ---
    @@ -79,16 +112,33 @@ public SortPartitionOperator(DataSet<T> dataSet, String sortField, Order sortOrd
     	 * local partition sorting of the DataSet.
     	 *
     	 * @param field The field expression referring to the field of the additional sort order of
    -	 *                 the local partition sorting.
    -	 * @param order The order  of the additional sort order of the local partition sorting.
    +	 *              the local partition sorting.
    +	 * @param order The order of the additional sort order of the local partition sorting.
     	 * @return The DataSet with sorted local partitions.
     	 */
     	public SortPartitionOperator<T> sortPartition(String field, Order order) {
    +		if (useKeySelector) {
    +			throw new InvalidProgramException("Expression keys cannot be appended after selector function keys");
    +		}
    +
     		int[] flatOrderKeys = getFlatFields(field);
     		this.appendSorting(flatOrderKeys, order);
     		return this;
     	}
     
    +	/**
    +	 * Appends an additional sort order with the specified field in the specified order to the
    +	 * local partition sorting of the DataSet.
    +	 *
    +	 * @param keyExtractor The KeySelector function which extracts the key value of the additional
    +	 *                     sort order of the local partition sorting.
    +	 * @param order        The order of the additional sort order of the local partition sorting.
    +	 * @return The DataSet with sorted local partitions.
    +	 */
    +	public <K> SortPartitionOperator<T> sortPartition(KeySelector<T, K> keyExtractor, Order order) {
    --- End diff --
    
    If we remove this method, the following code can be executed:
    
    ```java
    DataSet<MyObject> data = ...
    DataSet<MyObject> result = data
      .sortPartition(new KeySelector<MyObject, Integer> {
        public Integer getKey(MyObject value) throws Exception {
          return value.myInt;
        }
      }, Order.ASCENDING)
      .sortPartition(new KeySelector<MyObject, String> {
        public String getKey(MyObject value) throws Exception {
          return value.myString;
        }
      }, Order.ASCENDING);
    ```
    
    In above case, `data` is sorted twice (first by Integer, second by String) and the result of first sorting will be ignored. I think this is quite confusing for users.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3234] [dataSet] Add KeySelector support...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/1585


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3234] [dataSet] Add KeySelector support...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1585#discussion_r52282275
  
    --- Diff: flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala ---
    @@ -1508,6 +1508,31 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
           new SortPartitionOperator[T](javaSet, field, order, getCallLocationName()))
       }
     
    +  /**
    +    * Locally sorts the partitions of the DataSet on the specified field in the specified order.
    +    * The DataSet can be sorted on multiple fields by chaining sortPartition() calls.
    +    *
    +    * Note that any key extraction methods cannot be chained with the KeySelector. To sort the
    +    * partition by multiple values using KeySelector, the KeySelector must return a tuple
    +    * consisting of the values.
    +    */
    +  def sortPartition[K: TypeInformation](fun: T => K, order: Order): DataSet[T] ={
    --- End diff --
    
    Copy the method docs from the `DataSet.java`. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3234] [dataSet] Add KeySelector support...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1585#discussion_r51993309
  
    --- Diff: flink-scala/src/main/scala/org/apache/flink/api/scala/PartitionSortedDataSet.scala ---
    @@ -35,16 +37,30 @@ class PartitionSortedDataSet[T: ClassTag](set: SortPartitionOperator[T])
        * Appends the given field and order to the sort-partition operator.
        */
       override def sortPartition(field: Int, order: Order): DataSet[T] = {
    +    if (set.useKeySelector()) {
    +      throw new InvalidProgramException("Expression keys cannot be appended after selector " +
    +        "function keys")
    +    }
    +
         this.set.sortPartition(field, order)
         this
       }
     
    -/**
    - * Appends the given field and order to the sort-partition operator.
    - */
    +  /**
    +   * Appends the given field and order to the sort-partition operator.
    +   */
       override def sortPartition(field: String, order: Order): DataSet[T] = {
    +    if (set.useKeySelector()) {
    +      throw new InvalidProgramException("Expression keys cannot be appended after selector " +
    +        "function keys")
    +    }
    +
         this.set.sortPartition(field, order)
         this
       }
     
    +  override def sortPartition[K: TypeInformation](fun: T => K, order: Order): DataSet[T] = {
    --- End diff --
    
    remove method.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3234] [dataSet] Add KeySelector support...

Posted by chiwanpark <gi...@git.apache.org>.
Github user chiwanpark commented on the pull request:

    https://github.com/apache/flink/pull/1585#issuecomment-181833677
  
    Thanks for review @fhueske. I have addressed your comments. :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3234] [dataSet] Add KeySelector support...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the pull request:

    https://github.com/apache/flink/pull/1585#issuecomment-179835764
  
    Hi @chiwanpark, thanks for picking up this issue and filling this gap in the DataSet API. 
    Would be nice, if it could make it into the 1.0 release, IMO.
    However, the implementation needs to be improved. I suggest to have a look at the `PartitionOperator` and check how `KeySelector`s are handled there.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3234] [dataSet] Add KeySelector support...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1585#discussion_r51870056
  
    --- Diff: flink-java/src/main/java/org/apache/flink/api/java/DataSet.java ---
    @@ -1377,6 +1377,20 @@ public long count() throws Exception {
     		return new SortPartitionOperator<>(this, field, order, Utils.getCallLocationName());
     	}
     
    +	/**
    +	 * Locally sorts the partitions of the DataSet on the specified field in the specified order.
    +	 * DataSet can be sorted on multiple fields by chaining sortPartition() calls.
    --- End diff --
    
    I would not allow chaining of KeySelectors. We only allow a single key extractor anywhere else in the DataSet API (for a good reason). If users want to sort on more than one field, they can return a `TupleX` from the `KeySelector`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3234] [dataSet] Add KeySelector support...

Posted by chiwanpark <gi...@git.apache.org>.
Github user chiwanpark commented on the pull request:

    https://github.com/apache/flink/pull/1585#issuecomment-180224231
  
    @fhueske I addressed your comments.
    
    To reduce modifying original code, I added some variables for key selector only. But this make the source code quite messy. I would like to refactor `SortPartitionOperator` like following:
    
    * Preserve key informations and sort orders as `List<Keys>` and `List<Order>` object.
      * We have to create a list because users can chain expression keys.
    * Check whether given field is sortable or not in constructors and `sortPartition` method in `SortPartitionOperator`
    * Remove `sortKeyPositions`, `sortOrders`, `getFlatFields` and `appendSorting`
    * When `translateToDataFlow` method is called, the method fetches key positions from preserved key objects and creates operators.
    
    Does this make sense?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3234] [dataSet] Add KeySelector support...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1585#discussion_r51869883
  
    --- Diff: flink-java/src/main/java/org/apache/flink/api/java/DataSet.java ---
    @@ -1377,6 +1377,20 @@ public long count() throws Exception {
     		return new SortPartitionOperator<>(this, field, order, Utils.getCallLocationName());
     	}
     
    +	/**
    +	 * Locally sorts the partitions of the DataSet on the specified field in the specified order.
    --- End diff --
    
    "on the specified field" -> "on an extracted key"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3234] [dataSet] Add KeySelector support...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1585#discussion_r51992647
  
    --- Diff: flink-java/src/main/java/org/apache/flink/api/java/operators/SortPartitionOperator.java ---
    @@ -79,16 +112,33 @@ public SortPartitionOperator(DataSet<T> dataSet, String sortField, Order sortOrd
     	 * local partition sorting of the DataSet.
     	 *
     	 * @param field The field expression referring to the field of the additional sort order of
    -	 *                 the local partition sorting.
    -	 * @param order The order  of the additional sort order of the local partition sorting.
    +	 *              the local partition sorting.
    +	 * @param order The order of the additional sort order of the local partition sorting.
     	 * @return The DataSet with sorted local partitions.
     	 */
     	public SortPartitionOperator<T> sortPartition(String field, Order order) {
    +		if (useKeySelector) {
    +			throw new InvalidProgramException("Expression keys cannot be appended after selector function keys");
    +		}
    +
     		int[] flatOrderKeys = getFlatFields(field);
     		this.appendSorting(flatOrderKeys, order);
     		return this;
     	}
     
    +	/**
    +	 * Appends an additional sort order with the specified field in the specified order to the
    +	 * local partition sorting of the DataSet.
    +	 *
    +	 * @param keyExtractor The KeySelector function which extracts the key value of the additional
    +	 *                     sort order of the local partition sorting.
    +	 * @param order        The order of the additional sort order of the local partition sorting.
    +	 * @return The DataSet with sorted local partitions.
    +	 */
    +	public <K> SortPartitionOperator<T> sortPartition(KeySelector<T, K> keyExtractor, Order order) {
    --- End diff --
    
    If you remove this method, it cannot be called ;-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3234] [dataSet] Add KeySelector support...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the pull request:

    https://github.com/apache/flink/pull/1585#issuecomment-181917425
  
    Will merge this PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3234] [dataSet] Add KeySelector support...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1585#discussion_r52282118
  
    --- Diff: flink-java/src/main/java/org/apache/flink/api/java/operators/SortPartitionOperator.java ---
    @@ -36,27 +40,58 @@
      */
     public class SortPartitionOperator<T> extends SingleInputOperator<T, T, SortPartitionOperator<T>> {
     
    -	private int[] sortKeyPositions;
    +	private List<Keys<T>> keys;
     
    -	private Order[] sortOrders;
    +	private List<Order> orders;
     
     	private final String sortLocationName;
     
    +	private boolean useKeySelector;
     
    -	public SortPartitionOperator(DataSet<T> dataSet, int sortField, Order sortOrder, String sortLocationName) {
    +	private SortPartitionOperator(DataSet<T> dataSet, String sortLocationName) {
     		super(dataSet, dataSet.getType());
    +
    +		keys = new ArrayList<>();
    +		orders = new ArrayList<>();
     		this.sortLocationName = sortLocationName;
    +	}
    +
    +
    +	public SortPartitionOperator(DataSet<T> dataSet, int sortField, Order sortOrder, String sortLocationName) {
    +		this(dataSet, sortLocationName);
    +		this.useKeySelector = false;
    +
    +		ensureSortableKey(sortField);
     
    -		int[] flatOrderKeys = getFlatFields(sortField);
    -		this.appendSorting(flatOrderKeys, sortOrder);
    +		keys.add(new Keys.ExpressionKeys<>(sortField, getType()));
    +		orders.add(sortOrder);
     	}
     
     	public SortPartitionOperator(DataSet<T> dataSet, String sortField, Order sortOrder, String sortLocationName) {
    -		super(dataSet, dataSet.getType());
    -		this.sortLocationName = sortLocationName;
    +		this(dataSet, sortLocationName);
    +		this.useKeySelector = false;
    +
    +		ensureSortableKey(sortField);
    +
    +		keys.add(new Keys.ExpressionKeys<>(sortField, getType()));
    +		orders.add(sortOrder);
    +	}
    +
    +	public SortPartitionOperator(DataSet<T> dataSet, Keys<T> sortKey, Order sortOrder, String sortLocationName) {
    --- End diff --
    
    Change the `sortKey` parameter type to `SelectorFunctionKeys` (or accept the `KeySelector` and create the `SelectorFunctionKeys` in the constructor.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3234] [dataSet] Add KeySelector support...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1585#discussion_r51993427
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/SortPartitionITCase.java ---
    @@ -197,6 +198,58 @@ public void testSortPartitionParallelismChange() throws Exception {
     		compareResultAsText(result, expected);
     	}
     
    +	@Test
    +	public void testSortPartitionWithKeySelector1() throws Exception {
    +		/*
    +		 * Test sort partition on an extracted key
    +		 */
    +
    +		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    +		env.setParallelism(4);
    +
    +		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
    +		List<Tuple1<Boolean>> result = ds
    +			.map(new IdMapper<Tuple3<Integer, Long, String>>()).setParallelism(4) // parallelize input
    +			.sortPartition(new KeySelector<Tuple3<Integer, Long, String>, Integer>() {
    +				@Override
    +				public Integer getKey(Tuple3<Integer, Long, String> value) throws Exception {
    +					return value.f0;
    --- End diff --
    
    return some other field than `f0`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3234] [dataSet] Add KeySelector support...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the pull request:

    https://github.com/apache/flink/pull/1585#issuecomment-181845299
  
    Thanks for the fast update!
    Good to merge :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3234] [dataSet] Add KeySelector support...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1585#discussion_r51992451
  
    --- Diff: flink-java/src/main/java/org/apache/flink/api/java/DataSet.java ---
    @@ -1377,6 +1377,20 @@ public long count() throws Exception {
     		return new SortPartitionOperator<>(this, field, order, Utils.getCallLocationName());
     	}
     
    +	/**
    +	 * Locally sorts the partitions of the DataSet on the an extracted key in the specified order.
    +	 * DataSet can be sorted on multiple values by returning a tuple.
    --- End diff --
    
    "returning a tuple" + "from the KeySelector."


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---