You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by rmetzger <gi...@git.apache.org> on 2015/02/02 10:46:09 UTC

[GitHub] flink pull request: [FLINK-1105] Add support for locally sorted ou...

Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/347#discussion_r23914605
  
    --- Diff: flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java ---
    @@ -83,6 +93,107 @@ public DataSink(DataSet<T> data, OutputFormat<T> format, TypeInformation<T> type
     	}
     
     	/**
    +	 * Sorts each local partition of a {@link org.apache.flink.api.java.tuple.Tuple} data set
    +	 * on the specified field in the specified {@link Order} before it is emitted by the output format.</br>
    +	 * <b>Note: Only tuple data sets can be sorted using integer field indices.</b><br/>
    +	 * The tuple data set can be sorted on multiple fields in different orders
    +	 * by chaining {@link #sortLocalOutput(int, Order)} calls.
    +	 *
    +	 * @param field The Tuple field on which the data set is locally sorted.
    +	 * @param order The Order in which the specified Tuple field is locally sorted.
    +	 * @return This data sink operator with specified output order.
    +	 *
    +	 * @see org.apache.flink.api.java.tuple.Tuple
    +	 * @see Order
    +	 */
    +	public DataSink<T> sortLocalOutput(int field, Order order) {
    +
    +		if (!this.type.isTupleType()) {
    +			throw new InvalidProgramException("Specifying order keys via field positions is only valid for tuple data types");
    +		}
    +		if (field >= this.type.getArity()) {
    +			throw new InvalidProgramException("Order key out of tuple bounds.");
    +		}
    +
    +		if(this.sortKeyPositions == null) {
    +			// set sorting info
    +			this.sortKeyPositions = new int[] {field};
    +			this.sortOrders = new Order[] {order};
    +		} else {
    +			// append sorting info to exising info
    +			int newLength = this.sortKeyPositions.length + 1;
    +			this.sortKeyPositions = Arrays.copyOf(this.sortKeyPositions, newLength);
    +			this.sortOrders = Arrays.copyOf(this.sortOrders, newLength);
    +			this.sortKeyPositions[newLength-1] = field;
    +			this.sortOrders[newLength-1] = order;
    +		}
    +		return this;
    +	}
    +
    +	/**
    +	 * Sorts each local partition of a data set on the field(s) specified by the field expression
    +	 * in the specified {@link Order} before it is emitted by the output format.</br>
    +	 * <b>Note: Non-composite types can only be sorted on the full element which is specified by
    +	 * a wildcard expression ("*" or "_").</b><br/>
    +	 * Data sets of composite types (Tuple or Pojo) can be sorted on multiple fields in different orders
    +	 * by chaining {@link #sortLocalOutput(String, Order)} calls.
    +	 *
    +	 * @param fieldExpression The field expression for the field(s) on which the data set is locally sorted.
    +	 * @param order The Order in which the specified field(s) are locally sorted.
    +	 * @return This data sink operator with specified output order.
    +	 *
    +	 * @see Order
    +	 */
    +	public DataSink<T> sortLocalOutput(String fieldExpression, Order order) {
    +
    +		int numFields;
    +		int[] fields;
    +		Order[] orders;
    +
    +		if(this.type instanceof CompositeType) {
    +			// compute flat field positions for (nested) sorting fields
    +			Keys.ExpressionKeys<T> ek;
    +			try {
    +				ek = new Keys.ExpressionKeys<T>(new String[]{fieldExpression}, this.type);
    +			} catch(IllegalArgumentException iae) {
    +				throw new InvalidProgramException(iae.getMessage());
    --- End diff --
    
    Why are you creating a new exception with the error message instead of forwarding the illegal argument exception?
    I personally like it very much when I can find the exact location where the exception was thrown.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---