You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2015/02/02 10:46:35 UTC

[jira] [Commented] (FLINK-1105) Add support for locally sorted output

    [ https://issues.apache.org/jira/browse/FLINK-1105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14301080#comment-14301080 ] 

ASF GitHub Bot commented on FLINK-1105:
---------------------------------------

Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/347#discussion_r23914605
  
    --- Diff: flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java ---
    @@ -83,6 +93,107 @@ public DataSink(DataSet<T> data, OutputFormat<T> format, TypeInformation<T> type
     	}
     
     	/**
    +	 * Sorts each local partition of a {@link org.apache.flink.api.java.tuple.Tuple} data set
    +	 * on the specified field in the specified {@link Order} before it is emitted by the output format.</br>
    +	 * <b>Note: Only tuple data sets can be sorted using integer field indices.</b><br/>
    +	 * The tuple data set can be sorted on multiple fields in different orders
    +	 * by chaining {@link #sortLocalOutput(int, Order)} calls.
    +	 *
    +	 * @param field The Tuple field on which the data set is locally sorted.
    +	 * @param order The Order in which the specified Tuple field is locally sorted.
    +	 * @return This data sink operator with specified output order.
    +	 *
    +	 * @see org.apache.flink.api.java.tuple.Tuple
    +	 * @see Order
    +	 */
    +	public DataSink<T> sortLocalOutput(int field, Order order) {
    +
    +		if (!this.type.isTupleType()) {
    +			throw new InvalidProgramException("Specifying order keys via field positions is only valid for tuple data types");
    +		}
    +		if (field >= this.type.getArity()) {
    +			throw new InvalidProgramException("Order key out of tuple bounds.");
    +		}
    +
    +		if(this.sortKeyPositions == null) {
    +			// set sorting info
    +			this.sortKeyPositions = new int[] {field};
    +			this.sortOrders = new Order[] {order};
    +		} else {
    +			// append sorting info to exising info
    +			int newLength = this.sortKeyPositions.length + 1;
    +			this.sortKeyPositions = Arrays.copyOf(this.sortKeyPositions, newLength);
    +			this.sortOrders = Arrays.copyOf(this.sortOrders, newLength);
    +			this.sortKeyPositions[newLength-1] = field;
    +			this.sortOrders[newLength-1] = order;
    +		}
    +		return this;
    +	}
    +
    +	/**
    +	 * Sorts each local partition of a data set on the field(s) specified by the field expression
    +	 * in the specified {@link Order} before it is emitted by the output format.</br>
    +	 * <b>Note: Non-composite types can only be sorted on the full element which is specified by
    +	 * a wildcard expression ("*" or "_").</b><br/>
    +	 * Data sets of composite types (Tuple or Pojo) can be sorted on multiple fields in different orders
    +	 * by chaining {@link #sortLocalOutput(String, Order)} calls.
    +	 *
    +	 * @param fieldExpression The field expression for the field(s) on which the data set is locally sorted.
    +	 * @param order The Order in which the specified field(s) are locally sorted.
    +	 * @return This data sink operator with specified output order.
    +	 *
    +	 * @see Order
    +	 */
    +	public DataSink<T> sortLocalOutput(String fieldExpression, Order order) {
    +
    +		int numFields;
    +		int[] fields;
    +		Order[] orders;
    +
    +		if(this.type instanceof CompositeType) {
    +			// compute flat field positions for (nested) sorting fields
    +			Keys.ExpressionKeys<T> ek;
    +			try {
    +				ek = new Keys.ExpressionKeys<T>(new String[]{fieldExpression}, this.type);
    +			} catch(IllegalArgumentException iae) {
    +				throw new InvalidProgramException(iae.getMessage());
    --- End diff --
    
    Why are you creating a new exception with the error message instead of forwarding the illegal argument exception?
    I personally like it very much when I can find the exact location where the exception was thrown.


> Add support for locally sorted output
> -------------------------------------
>
>                 Key: FLINK-1105
>                 URL: https://issues.apache.org/jira/browse/FLINK-1105
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Java API
>            Reporter: Fabian Hueske
>            Assignee: Fabian Hueske
>            Priority: Minor
>
> This feature will make it possible to sort the output which is sent to an OutputFormat to obtain a locally sorted result.
> This feature was available in the "old" Java API and has not be ported to the new Java API yet. Hence optimizer and runtime should already have support for this feature. However, the API and job generation part is missing.
> It is also a subfeature of FLINK-598 which will provide also globally sorted results.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)