You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by qmlmoon <gi...@git.apache.org> on 2014/11/17 16:39:09 UTC

[GitHub] incubator-flink pull request: [FLINK-1112] Add KeySelector group s...

GitHub user qmlmoon opened a pull request:

    https://github.com/apache/incubator-flink/pull/209

    [FLINK-1112] Add KeySelector group sorting on KeySelector grouping

    

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/qmlmoon/incubator-flink groupsort

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/incubator-flink/pull/209.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #209
    
----
commit f8e4afc56953a7462c32ce4fd69af65d298f64a5
Author: mingliang <qm...@gmail.com>
Date:   2014-11-16T20:29:47Z

    [FLINK-1112] Add KeySelector group sorting on KeySelector grouping

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: [FLINK-1112] Add KeySelector group s...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/209#discussion_r21094587
  
    --- Diff: flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java ---
    @@ -83,6 +84,30 @@ public SortedGrouping(DataSet<T> set, Keys<T> keys, String field, Order order) {
     		this.groupSortOrders = new Order[groupSortKeyPositions.length];
     		Arrays.fill(this.groupSortOrders, order); // if field == "*"
     	}
    +
    +	/*
    +	 * KeySelector sorting for Pojos and tuples
    --- End diff --
    
    KeySelectors not restricted to Pojos and Tuples


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: [FLINK-1112] Add KeySelector group s...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/209#discussion_r21094820
  
    --- Diff: flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java ---
    @@ -83,6 +84,30 @@ public SortedGrouping(DataSet<T> set, Keys<T> keys, String field, Order order) {
     		this.groupSortOrders = new Order[groupSortKeyPositions.length];
     		Arrays.fill(this.groupSortOrders, order); // if field == "*"
     	}
    +
    +	/*
    +	 * KeySelector sorting for Pojos and tuples
    +	 */
    +	public <K> SortedGrouping(DataSet<T> set, Keys<T> keys, Keys.SelectorFunctionKeys<T, K> keySelector, Order order) {
    +		super(set, keys);
    +
    +		if (!(this.keys instanceof Keys.SelectorFunctionKeys)) {
    +			throw new InvalidProgramException("Sorting on KeySelector only works for KeySelector grouping.");
    +		}
    +
    +		if (!(dataSet.getType() instanceof CompositeType)) {
    +			throw new InvalidProgramException("Specifying order keys via field positions is only valid for composite data types (pojo / tuple / case class)");
    --- End diff --
    
    Error message says "field positions" but we are using KeySelectors instead.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: [FLINK-1112] Add KeySelector group s...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/209#discussion_r21099693
  
    --- Diff: flink-java/src/main/java/org/apache/flink/api/java/operators/translation/Tuple3WrappingCollector.java ---
    @@ -0,0 +1,58 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.java.operators.translation;
    +
    +import org.apache.flink.api.java.tuple.Tuple3;
    +import org.apache.flink.util.Collector;
    +
    +/**
    + * Needed to wrap tuples to Tuple3<groupKey, sortKey, value> for combine method of group reduce with key selector sorting
    + */
    +public class Tuple3WrappingCollector<IN, K1, K2> implements Collector<IN>, java.io.Serializable {
    +
    +	private static final long serialVersionUID = 1L;
    +
    +	private final Tuple3UnwrappingIterator<IN, K1, K2> tui;
    +	private final Tuple3<K1, K2, IN> outTuple;
    +
    +	private Collector<Tuple3<K1, K2, IN>> wrappedCollector;
    +
    +
    +	public Tuple3WrappingCollector(Tuple3UnwrappingIterator<IN, K1, K2> tui) {
    +		this.tui = tui;
    +		this.outTuple = new Tuple3<K1, K2, IN>();
    +	}
    +
    +	public void set(Collector<Tuple3<K1, K2, IN>> wrappedCollector) {
    +		this.wrappedCollector = wrappedCollector;
    +	}
    +
    +	@Override
    +	public void close() {
    +		this.wrappedCollector.close();
    +	}
    +
    +	@Override
    +	public void collect(IN record) {
    +		this.outTuple.f0 = this.tui.getLastKey();
    +		this.outTuple.f2 = record;
    --- End diff --
    
    the sort key must also be forwarded by the combiner collector.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: [FLINK-1112] Add KeySelector group s...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the pull request:

    https://github.com/apache/incubator-flink/pull/209#issuecomment-64914086
  
    I think the change is good to merge


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: [FLINK-1112] Add KeySelector group s...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/209#discussion_r21094436
  
    --- Diff: flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java ---
    @@ -230,5 +231,21 @@ public UnsortedGrouping(DataSet<T> set, Keys<T> keys) {
     	public SortedGrouping<T> sortGroup(String field, Order order) {
     		return new SortedGrouping<T>(this.dataSet, this.keys, field, order);
     	}
    +
    +	/**
    +	 * Sorts Pojos or Tuple within a group with specified {@link org.apache.flink.api.java.functions.KeySelector} in the specified {@link Order}.</br>
    --- End diff --
    
    The JavaDoc documentation needs some work, IMO.
    KeySelectors are not restricted to Pojos and Tuples. They can extract a key from any data type.
    I would phrase it as: "Sorts elements within a group on a key extracted by the specified KeySelector in the specified order."
    
    The `@param order` refers to a field which has not been mentioned before. Should be the key extracted by the KeySelector.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: [FLINK-1112] Add KeySelector group s...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/209#discussion_r21099647
  
    --- Diff: flink-java/src/main/java/org/apache/flink/api/java/operators/translation/Tuple3UnwrappingIterator.java ---
    @@ -0,0 +1,73 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.java.operators.translation;
    +
    +import java.util.Iterator;
    +
    +import org.apache.flink.api.java.tuple.Tuple3;
    +import org.apache.flink.util.TraversableOnceException;
    +
    +/**
    + * An iterator that reads 3-tuples (groupKey, sortKey, value) and returns only the values (thrid field).
    + * The iterator also tracks the groupKeys, as the triples flow though it.
    + */
    +public class Tuple3UnwrappingIterator<T, K1, K2> implements Iterator<T>, Iterable<T>, java.io.Serializable {
    +
    +	private static final long serialVersionUID = 1L;
    +
    +	private K1 lastKey;
    --- End diff --
    
    The sortkey must also be remembered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: [FLINK-1112] Add KeySelector group s...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the pull request:

    https://github.com/apache/incubator-flink/pull/209#issuecomment-65091843
  
    Thanks for the pull request! 
    I added some comments in-line.
    
    Could you also add the following tests to the Java and Scala integration tests:
    - KeyExtractor returns a Tuple2 as key
    - GroupReduce function is combinable (including the checks that the elements are processed by the combiner in the right order).
    
    Thank you very much!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: [FLINK-1112] Add KeySelector group s...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/209#discussion_r21094766
  
    --- Diff: flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java ---
    @@ -83,6 +84,30 @@ public SortedGrouping(DataSet<T> set, Keys<T> keys, String field, Order order) {
     		this.groupSortOrders = new Order[groupSortKeyPositions.length];
     		Arrays.fill(this.groupSortOrders, order); // if field == "*"
     	}
    +
    +	/*
    +	 * KeySelector sorting for Pojos and tuples
    +	 */
    +	public <K> SortedGrouping(DataSet<T> set, Keys<T> keys, Keys.SelectorFunctionKeys<T, K> keySelector, Order order) {
    +		super(set, keys);
    +
    +		if (!(this.keys instanceof Keys.SelectorFunctionKeys)) {
    +			throw new InvalidProgramException("Sorting on KeySelector only works for KeySelector grouping.");
    +		}
    +
    +		if (!(dataSet.getType() instanceof CompositeType)) {
    --- End diff --
    
    I don't think we need this restriction.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---