You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/21 11:03:57 UTC

[41/51] [partial] flink git commit: [FLINK-2877] Move Streaming API out of Staging package

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
deleted file mode 100644
index 033e84f..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
+++ /dev/null
@@ -1,648 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.datastream;
-
-import org.apache.commons.lang.SerializationUtils;
-import org.apache.flink.api.common.functions.FoldFunction;
-import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.Utils;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction;
-import org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator;
-import org.apache.flink.streaming.api.functions.aggregation.SumAggregator;
-import org.apache.flink.streaming.api.functions.windowing.FoldWindowFunction;
-import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
-import org.apache.flink.streaming.api.functions.windowing.ReduceWindowFunction;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows;
-import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows;
-import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
-import org.apache.flink.streaming.api.windowing.evictors.Evictor;
-import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
-import org.apache.flink.streaming.api.windowing.triggers.Trigger;
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-import org.apache.flink.streaming.api.windowing.windows.Window;
-import org.apache.flink.streaming.runtime.operators.windowing.AccumulatingProcessingTimeWindowOperator;
-import org.apache.flink.streaming.runtime.operators.windowing.AggregatingProcessingTimeWindowOperator;
-import org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator;
-import org.apache.flink.streaming.runtime.operators.windowing.WindowOperator;
-import org.apache.flink.streaming.runtime.operators.windowing.buffers.HeapWindowBuffer;
-import org.apache.flink.streaming.runtime.operators.windowing.buffers.PreAggregatingHeapWindowBuffer;
-
-/**
- * A {@code WindowedStream} represents a data stream where elements are grouped by
- * key, and for each key, the stream of elements is split into windows based on a
- * {@link org.apache.flink.streaming.api.windowing.assigners.WindowAssigner}. Window emission
- * is triggered based on a {@link org.apache.flink.streaming.api.windowing.triggers.Trigger}.
- *
- * <p>
- * The windows are conceptually evaluated for each key individually, meaning windows can trigger at
- * different points for each key.
- *
- * <p>
- * If an {@link Evictor} is specified it will be used to evict elements from the window after
- * evaluation was triggered by the {@code Trigger} but before the actual evaluation of the window.
- * When using an evictor window performance will degrade significantly, since
- * pre-aggregation of window results cannot be used.
- *
- * <p>
- * Note that the {@code WindowedStream} is purely and API construct, during runtime
- * the {@code WindowedStream} will be collapsed together with the
- * {@code KeyedStream} and the operation over the window into one single operation.
- * 
- * @param <T> The type of elements in the stream.
- * @param <K> The type of the key by which elements are grouped.
- * @param <W> The type of {@code Window} that the {@code WindowAssigner} assigns the elements to.
- */
-public class WindowedStream<T, K, W extends Window> {
-
-	/** The keyed data stream that is windowed by this stream */
-	private final KeyedStream<T, K> input;
-
-	/** The window assigner */
-	private final WindowAssigner<? super T, W> windowAssigner;
-
-	/** The trigger that is used for window evaluation/emission. */
-	private Trigger<? super T, ? super W> trigger;
-
-	/** The evictor that is used for evicting elements before window evaluation. */
-	private Evictor<? super T, ? super W> evictor;
-
-
-	public WindowedStream(KeyedStream<T, K> input,
-			WindowAssigner<? super T, W> windowAssigner) {
-		this.input = input;
-		this.windowAssigner = windowAssigner;
-		this.trigger = windowAssigner.getDefaultTrigger(input.getExecutionEnvironment());
-	}
-
-	/**
-	 * Sets the {@code Trigger} that should be used to trigger window emission.
-	 */
-	public WindowedStream<T, K, W> trigger(Trigger<? super T, ? super W> trigger) {
-		this.trigger = trigger;
-		return this;
-	}
-
-	/**
-	 * Sets the {@code Evictor} that should be used to evict elements from a window before emission.
-	 *
-	 * <p>
-	 * Note: When using an evictor window performance will degrade significantly, since
-	 * pre-aggregation of window results cannot be used.
-	 */
-	public WindowedStream<T, K, W> evictor(Evictor<? super T, ? super W> evictor) {
-		this.evictor = evictor;
-		return this;
-	}
-
-
-	// ------------------------------------------------------------------------
-	//  Operations on the keyed windows
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Applies a reduce function to the window. The window function is called for each evaluation
-	 * of the window for each key individually. The output of the reduce function is interpreted
-	 * as a regular non-windowed stream.
-	 * <p>
-	 * This window will try and pre-aggregate data as much as the window policies permit. For example,
-	 * tumbling time windows can perfectly pre-aggregate the data, meaning that only one element per
-	 * key is stored. Sliding time windows will pre-aggregate on the granularity of the slide interval,
-	 * so a few elements are stored per key (one per slide interval).
-	 * Custom windows may not be able to pre-aggregate, or may need to store extra values in an
-	 * aggregation tree.
-	 * 
-	 * @param function The reduce function.
-	 * @return The data stream that is the result of applying the reduce function to the window. 
-	 */
-	public SingleOutputStreamOperator<T, ?> reduce(ReduceFunction<T> function) {
-		//clean the closure
-		function = input.getExecutionEnvironment().clean(function);
-
-		String callLocation = Utils.getCallLocationName();
-		String udfName = "Reduce at " + callLocation;
-
-		SingleOutputStreamOperator<T, ?> result = createFastTimeOperatorIfValid(function, input.getType(), udfName);
-		if (result != null) {
-			return result;
-		}
-
-		String opName = "TriggerWindow(" + windowAssigner + ", " + trigger + ", " + udfName + ")";
-		KeySelector<T, K> keySel = input.getKeySelector();
-
-		OneInputStreamOperator<T, T> operator;
-
-		boolean setProcessingTime = input.getExecutionEnvironment().getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime;
-
-		if (evictor != null) {
-			operator = new EvictingWindowOperator<>(windowAssigner,
-					windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
-					keySel,
-					input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
-					new HeapWindowBuffer.Factory<T>(),
-					new ReduceWindowFunction<K, W, T>(function),
-					trigger,
-					evictor).enableSetProcessingTime(setProcessingTime);
-
-		} else {
-			// we need to copy because we need our own instance of the pre aggregator
-			@SuppressWarnings("unchecked")
-			ReduceFunction<T> functionCopy = (ReduceFunction<T>) SerializationUtils.clone(function);
-
-			operator = new WindowOperator<>(windowAssigner,
-					windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
-					keySel,
-					input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
-					new PreAggregatingHeapWindowBuffer.Factory<>(functionCopy),
-					new ReduceWindowFunction<K, W, T>(function),
-					trigger).enableSetProcessingTime(setProcessingTime);
-		}
-
-		return input.transform(opName, input.getType(), operator);
-	}
-
-	/**
-	 * Applies the given fold function to each window. The window function is called for each
-	 * evaluation of the window for each key individually. The output of the reduce function is
-	 * interpreted as a regular non-windowed stream.
-	 *
-	 * @param function The fold function.
-	 * @return The data stream that is the result of applying the fold function to the window.
-	 */
-	public <R> SingleOutputStreamOperator<R, ?> fold(R initialValue, FoldFunction<T, R> function) {
-		//clean the closure
-		function = input.getExecutionEnvironment().clean(function);
-
-		TypeInformation<R> resultType = TypeExtractor.getFoldReturnTypes(function, input.getType(),
-				Utils.getCallLocationName(), true);
-
-		return apply(new FoldWindowFunction<K, W, T, R>(initialValue, function), resultType);
-	}
-
-	/**
-	 * Applies the given fold function to each window. The window function is called for each
-	 * evaluation of the window for each key individually. The output of the reduce function is
-	 * interpreted as a regular non-windowed stream.
-	 *
-	 * @param function The fold function.
-	 * @return The data stream that is the result of applying the fold function to the window.
-	 */
-	public <R> SingleOutputStreamOperator<R, ?> fold(R initialValue, FoldFunction<T, R> function, TypeInformation<R> resultType) {
-		//clean the closure
-		function = input.getExecutionEnvironment().clean(function);
-		return apply(new FoldWindowFunction<K, W, T, R>(initialValue, function), resultType);
-	}
-
-	/**
-	 * Applies the given window function to each window. The window function is called for each
-	 * evaluation of the window for each key individually. The output of the window function is
-	 * interpreted as a regular non-windowed stream.
-	 *
-	 * <p>
-	 * Not that this function requires that all data in the windows is buffered until the window
-	 * is evaluated, as the function provides no means of pre-aggregation.
-	 * 
-	 * @param function The window function.
-	 * @return The data stream that is the result of applying the window function to the window.
-	 */
-	public <R> SingleOutputStreamOperator<R, ?> apply(WindowFunction<T, R, K, W> function) {
-		TypeInformation<T> inType = input.getType();
-		TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType(
-				function, WindowFunction.class, true, true, inType, null, false);
-
-		return apply(function, resultType);
-	}
-
-	/**
-	 * Applies the given window function to each window. The window function is called for each
-	 * evaluation of the window for each key individually. The output of the window function is
-	 * interpreted as a regular non-windowed stream.
-	 *
-	 * <p>
-	 * Not that this function requires that all data in the windows is buffered until the window
-	 * is evaluated, as the function provides no means of pre-aggregation.
-	 *
-	 * @param function The window function.
-	 * @param resultType Type information for the result type of the window function
-	 * @return The data stream that is the result of applying the window function to the window.
-	 */
-	public <R> SingleOutputStreamOperator<R, ?> apply(WindowFunction<T, R, K, W> function, TypeInformation<R> resultType) {
-		//clean the closure
-		function = input.getExecutionEnvironment().clean(function);
-
-		String callLocation = Utils.getCallLocationName();
-		String udfName = "WindowApply at " + callLocation;
-
-		SingleOutputStreamOperator<R, ?> result = createFastTimeOperatorIfValid(function, resultType, udfName);
-		if (result != null) {
-			return result;
-		}
-
-
-		String opName = "TriggerWindow(" + windowAssigner + ", " + trigger + ", " + udfName + ")";
-		KeySelector<T, K> keySel = input.getKeySelector();
-
-		WindowOperator<K, T, R, W> operator;
-
-		boolean setProcessingTime = input.getExecutionEnvironment().getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime;
-
-		if (evictor != null) {
-			operator = new EvictingWindowOperator<>(windowAssigner,
-					windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
-					keySel,
-					input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
-					new HeapWindowBuffer.Factory<T>(),
-					function,
-					trigger,
-					evictor).enableSetProcessingTime(setProcessingTime);
-
-		} else {
-			operator = new WindowOperator<>(windowAssigner,
-					windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
-					keySel,
-					input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
-					new HeapWindowBuffer.Factory<T>(),
-					function,
-					trigger).enableSetProcessingTime(setProcessingTime);
-		}
-
-		return input.transform(opName, resultType, operator);
-	}
-
-	/**
-	 * Applies the given window function to each window. The window function is called for each
-	 * evaluation of the window for each key individually. The output of the window function is
-	 * interpreted as a regular non-windowed stream.
-	 *
-	 * <p>
-	 * Arriving data is pre-aggregated using the given pre-aggregation reducer.
-	 *
-	 * @param preAggregator The reduce function that is used for pre-aggregation
-	 * @param function The window function.
-	 * @return The data stream that is the result of applying the window function to the window.
-	 */
-
-	public <R> SingleOutputStreamOperator<R, ?> apply(ReduceFunction<T> preAggregator, WindowFunction<T, R, K, W> function) {
-		TypeInformation<T> inType = input.getType();
-		TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType(
-				function, WindowFunction.class, true, true, inType, null, false);
-
-		return apply(preAggregator, function, resultType);
-	}
-
-	/**
-	 * Applies the given window function to each window. The window function is called for each
-	 * evaluation of the window for each key individually. The output of the window function is
-	 * interpreted as a regular non-windowed stream.
-	 *
-	 * <p>
-	 * Arriving data is pre-aggregated using the given pre-aggregation reducer.
-	 *
-	 * @param preAggregator The reduce function that is used for pre-aggregation
-	 * @param function The window function.
-	 * @param resultType Type information for the result type of the window function
-	 * @return The data stream that is the result of applying the window function to the window.
-	 */
-	public <R> SingleOutputStreamOperator<R, ?> apply(ReduceFunction<T> preAggregator, WindowFunction<T, R, K, W> function, TypeInformation<R> resultType) {
-		//clean the closures
-		function = input.getExecutionEnvironment().clean(function);
-		preAggregator = input.getExecutionEnvironment().clean(preAggregator);
-
-		String callLocation = Utils.getCallLocationName();
-		String udfName = "WindowApply at " + callLocation;
-
-		String opName = "TriggerWindow(" + windowAssigner + ", " + trigger + ", " + udfName + ")";
-		KeySelector<T, K> keySel = input.getKeySelector();
-
-		OneInputStreamOperator<T, R> operator;
-
-		boolean setProcessingTime = input.getExecutionEnvironment().getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime;
-
-		if (evictor != null) {
-			operator = new EvictingWindowOperator<>(windowAssigner,
-					windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
-					keySel,
-					input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
-					new HeapWindowBuffer.Factory<T>(),
-					function,
-					trigger,
-					evictor).enableSetProcessingTime(setProcessingTime);
-
-		} else {
-			operator = new WindowOperator<>(windowAssigner,
-					windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
-					keySel,
-					input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
-					new PreAggregatingHeapWindowBuffer.Factory<>(preAggregator),
-					function,
-					trigger).enableSetProcessingTime(setProcessingTime);
-		}
-
-		return input.transform(opName, resultType, operator);
-	}
-
-	// ------------------------------------------------------------------------
-	//  Aggregations on the keyed windows
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Applies an aggregation that sums every window of the data stream at the
-	 * given position.
-	 *
-	 * @param positionToSum The position in the tuple/array to sum
-	 * @return The transformed DataStream.
-	 */
-	public SingleOutputStreamOperator<T, ?> sum(int positionToSum) {
-		return aggregate(new SumAggregator<>(positionToSum, input.getType(), input.getExecutionConfig()));
-	}
-
-	/**
-	 * Applies an aggregation that sums every window of the pojo data stream at
-	 * the given field for every window.
-	 *
-	 * <p>
-	 * A field expression is either
-	 * the name of a public field or a getter method with parentheses of the
-	 * stream's underlying type. A dot can be used to drill down into objects,
-	 * as in {@code "field1.getInnerField2()" }.
-	 *
-	 * @param field The field to sum
-	 * @return The transformed DataStream.
-	 */
-	public SingleOutputStreamOperator<T, ?> sum(String field) {
-		return aggregate(new SumAggregator<>(field, input.getType(), input.getExecutionConfig()));
-	}
-
-	/**
-	 * Applies an aggregation that that gives the minimum value of every window
-	 * of the data stream at the given position.
-	 *
-	 * @param positionToMin The position to minimize
-	 * @return The transformed DataStream.
-	 */
-	public SingleOutputStreamOperator<T, ?> min(int positionToMin) {
-		return aggregate(new ComparableAggregator<>(positionToMin, input.getType(), AggregationFunction.AggregationType.MIN, input.getExecutionConfig()));
-	}
-
-	/**
-	 * Applies an aggregation that that gives the minimum value of the pojo data
-	 * stream at the given field expression for every window.
-	 *
-	 * <p>
-	 * A field
-	 * expression is either the name of a public field or a getter method with
-	 * parentheses of the {@link DataStream}S underlying type. A dot can be used
-	 * to drill down into objects, as in {@code "field1.getInnerField2()" }.
-	 *
-	 * @param field The field expression based on which the aggregation will be applied.
-	 * @return The transformed DataStream.
-	 */
-	public SingleOutputStreamOperator<T, ?> min(String field) {
-		return aggregate(new ComparableAggregator<>(field, input.getType(), AggregationFunction.AggregationType.MIN, false, input.getExecutionConfig()));
-	}
-
-	/**
-	 * Applies an aggregation that gives the minimum element of every window of
-	 * the data stream by the given position. If more elements have the same
-	 * minimum value the operator returns the first element by default.
-	 *
-	 * @param positionToMinBy
-	 *            The position to minimize by
-	 * @return The transformed DataStream.
-	 */
-	public SingleOutputStreamOperator<T, ?> minBy(int positionToMinBy) {
-		return this.minBy(positionToMinBy, true);
-	}
-
-	/**
-	 * Applies an aggregation that gives the minimum element of every window of
-	 * the data stream by the given position. If more elements have the same
-	 * minimum value the operator returns the first element by default.
-	 *
-	 * @param positionToMinBy The position to minimize by
-	 * @return The transformed DataStream.
-	 */
-	public SingleOutputStreamOperator<T, ?> minBy(String positionToMinBy) {
-		return this.minBy(positionToMinBy, true);
-	}
-
-	/**
-	 * Applies an aggregation that gives the minimum element of every window of
-	 * the data stream by the given position. If more elements have the same
-	 * minimum value the operator returns either the first or last one depending
-	 * on the parameter setting.
-	 *
-	 * @param positionToMinBy The position to minimize
-	 * @param first If true, then the operator return the first element with the minimum value, otherwise returns the last
-	 * @return The transformed DataStream.
-	 */
-	public SingleOutputStreamOperator<T, ?> minBy(int positionToMinBy, boolean first) {
-		return aggregate(new ComparableAggregator<>(positionToMinBy, input.getType(), AggregationFunction.AggregationType.MINBY, first, input.getExecutionConfig()));
-	}
-
-	/**
-	 * Applies an aggregation that that gives the minimum element of the pojo
-	 * data stream by the given field expression for every window. A field
-	 * expression is either the name of a public field or a getter method with
-	 * parentheses of the {@link DataStream DataStreams} underlying type. A dot can be used
-	 * to drill down into objects, as in {@code "field1.getInnerField2()" }.
-	 *
-	 * @param field The field expression based on which the aggregation will be applied.
-	 * @param first If True then in case of field equality the first object will be returned
-	 * @return The transformed DataStream.
-	 */
-	public SingleOutputStreamOperator<T, ?> minBy(String field, boolean first) {
-		return aggregate(new ComparableAggregator<>(field, input.getType(), AggregationFunction.AggregationType.MINBY, first, input.getExecutionConfig()));
-	}
-
-	/**
-	 * Applies an aggregation that gives the maximum value of every window of
-	 * the data stream at the given position.
-	 *
-	 * @param positionToMax The position to maximize
-	 * @return The transformed DataStream.
-	 */
-	public SingleOutputStreamOperator<T, ?> max(int positionToMax) {
-		return aggregate(new ComparableAggregator<>(positionToMax, input.getType(), AggregationFunction.AggregationType.MAX, input.getExecutionConfig()));
-	}
-
-	/**
-	 * Applies an aggregation that that gives the maximum value of the pojo data
-	 * stream at the given field expression for every window. A field expression
-	 * is either the name of a public field or a getter method with parentheses
-	 * of the {@link DataStream DataStreams} underlying type. A dot can be used to drill
-	 * down into objects, as in {@code "field1.getInnerField2()" }.
-	 *
-	 * @param field The field expression based on which the aggregation will be applied.
-	 * @return The transformed DataStream.
-	 */
-	public SingleOutputStreamOperator<T, ?> max(String field) {
-		return aggregate(new ComparableAggregator<>(field, input.getType(), AggregationFunction.AggregationType.MAX, false, input.getExecutionConfig()));
-	}
-
-	/**
-	 * Applies an aggregation that gives the maximum element of every window of
-	 * the data stream by the given position. If more elements have the same
-	 * maximum value the operator returns the first by default.
-	 *
-	 * @param positionToMaxBy
-	 *            The position to maximize by
-	 * @return The transformed DataStream.
-	 */
-	public SingleOutputStreamOperator<T, ?> maxBy(int positionToMaxBy) {
-		return this.maxBy(positionToMaxBy, true);
-	}
-
-	/**
-	 * Applies an aggregation that gives the maximum element of every window of
-	 * the data stream by the given position. If more elements have the same
-	 * maximum value the operator returns the first by default.
-	 *
-	 * @param positionToMaxBy
-	 *            The position to maximize by
-	 * @return The transformed DataStream.
-	 */
-	public SingleOutputStreamOperator<T, ?> maxBy(String positionToMaxBy) {
-		return this.maxBy(positionToMaxBy, true);
-	}
-
-	/**
-	 * Applies an aggregation that gives the maximum element of every window of
-	 * the data stream by the given position. If more elements have the same
-	 * maximum value the operator returns either the first or last one depending
-	 * on the parameter setting.
-	 *
-	 * @param positionToMaxBy The position to maximize by
-	 * @param first If true, then the operator return the first element with the maximum value, otherwise returns the last
-	 * @return The transformed DataStream.
-	 */
-	public SingleOutputStreamOperator<T, ?> maxBy(int positionToMaxBy, boolean first) {
-		return aggregate(new ComparableAggregator<>(positionToMaxBy, input.getType(), AggregationFunction.AggregationType.MAXBY, first, input.getExecutionConfig()));
-	}
-
-	/**
-	 * Applies an aggregation that that gives the maximum element of the pojo
-	 * data stream by the given field expression for every window. A field
-	 * expression is either the name of a public field or a getter method with
-	 * parentheses of the {@link DataStream}S underlying type. A dot can be used
-	 * to drill down into objects, as in {@code "field1.getInnerField2()" }.
-	 *
-	 * @param field The field expression based on which the aggregation will be applied.
-	 * @param first If True then in case of field equality the first object will be returned
-	 * @return The transformed DataStream.
-	 */
-	public SingleOutputStreamOperator<T, ?> maxBy(String field, boolean first) {
-		return aggregate(new ComparableAggregator<>(field, input.getType(), AggregationFunction.AggregationType.MAXBY, first, input.getExecutionConfig()));
-	}
-
-	private SingleOutputStreamOperator<T, ?> aggregate(AggregationFunction<T> aggregator) {
-		return reduce(aggregator);
-	}
-
-	// ------------------------------------------------------------------------
-	//  Utilities
-	// ------------------------------------------------------------------------
-
-	private <R> SingleOutputStreamOperator<R, ?> createFastTimeOperatorIfValid(
-			Function function,
-			TypeInformation<R> resultType,
-			String functionName) {
-
-		if (windowAssigner instanceof SlidingTimeWindows && trigger instanceof ProcessingTimeTrigger && evictor == null) {
-			SlidingTimeWindows timeWindows = (SlidingTimeWindows) windowAssigner;
-			final long windowLength = timeWindows.getSize();
-			final long windowSlide = timeWindows.getSlide();
-
-			String opName = "Fast " + timeWindows + " of " + functionName;
-
-			if (function instanceof ReduceFunction) {
-				@SuppressWarnings("unchecked")
-				ReduceFunction<T> reducer = (ReduceFunction<T>) function;
-
-				@SuppressWarnings("unchecked")
-				OneInputStreamOperator<T, R> op = (OneInputStreamOperator<T, R>)
-						new AggregatingProcessingTimeWindowOperator<>(
-								reducer, input.getKeySelector(), 
-								input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
-								input.getType().createSerializer(getExecutionEnvironment().getConfig()),
-								windowLength, windowSlide);
-				return input.transform(opName, resultType, op);
-			}
-			else if (function instanceof WindowFunction) {
-				@SuppressWarnings("unchecked")
-				WindowFunction<T, R, K, TimeWindow> wf = (WindowFunction<T, R, K, TimeWindow>) function;
-
-				OneInputStreamOperator<T, R> op = new AccumulatingProcessingTimeWindowOperator<>(
-						wf, input.getKeySelector(),
-						input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
-						input.getType().createSerializer(getExecutionEnvironment().getConfig()),
-						windowLength, windowSlide);
-				return input.transform(opName, resultType, op);
-			}
-		} else if (windowAssigner instanceof TumblingTimeWindows && trigger instanceof ProcessingTimeTrigger && evictor == null) {
-			TumblingTimeWindows timeWindows = (TumblingTimeWindows) windowAssigner;
-			final long windowLength = timeWindows.getSize();
-			final long windowSlide = timeWindows.getSize();
-
-			String opName = "Fast " + timeWindows + " of " + functionName;
-
-			if (function instanceof ReduceFunction) {
-				@SuppressWarnings("unchecked")
-				ReduceFunction<T> reducer = (ReduceFunction<T>) function;
-
-				@SuppressWarnings("unchecked")
-				OneInputStreamOperator<T, R> op = (OneInputStreamOperator<T, R>)
-						new AggregatingProcessingTimeWindowOperator<>(
-								reducer,
-								input.getKeySelector(),
-								input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
-								input.getType().createSerializer(getExecutionEnvironment().getConfig()),
-								windowLength, windowSlide);
-				return input.transform(opName, resultType, op);
-			}
-			else if (function instanceof WindowFunction) {
-				@SuppressWarnings("unchecked")
-				WindowFunction<T, R, K, TimeWindow> wf = (WindowFunction<T, R, K, TimeWindow>) function;
-
-				OneInputStreamOperator<T, R> op = new AccumulatingProcessingTimeWindowOperator<>(
-						wf, input.getKeySelector(),
-						input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
-						input.getType().createSerializer(getExecutionEnvironment().getConfig()),
-						windowLength, windowSlide);
-				return input.transform(opName, resultType, op);
-			}
-		}
-
-		return null;
-	}
-
-	public StreamExecutionEnvironment getExecutionEnvironment() {
-		return input.getExecutionEnvironment();
-	}
-
-	public TypeInformation<T> getInputType() {
-		return input.getType();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
deleted file mode 100644
index f0bd174..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.environment;
-
-import org.apache.flink.api.common.InvalidProgramException;
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.StreamingMode;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * The LocalStreamEnvironment is a StreamExecutionEnvironment that runs the program locally,
- * multi-threaded, in the JVM where the environment is instantiated. It spawns an embedded
- * Flink cluster in the background and executes the program on that cluster.
- *
- * <p>When this environment is instantiated, it uses a default parallelism of {@code 1}. The default
- * parallelism can be set via {@link #setParallelism(int)}.
- *
- * <p>Local environments can also be instantiated through {@link StreamExecutionEnvironment#createLocalEnvironment()}
- * and {@link StreamExecutionEnvironment#createLocalEnvironment(int)}. The former version will pick a
- * default parallelism equal to the number of hardware contexts in the local machine.
- */
-public class LocalStreamEnvironment extends StreamExecutionEnvironment {
-
-	private static final Logger LOG = LoggerFactory.getLogger(LocalStreamEnvironment.class);
-	
-	/** The configuration to use for the local cluster */
-	private final Configuration conf;
-
-	/**
-	 * Creates a new local stream environment that uses the default configuration.
-	 */
-	public LocalStreamEnvironment() {
-		this(null);
-	}
-
-	/**
-	 * Creates a new local stream environment that configures its local executor with the given configuration.
-	 *
-	 * @param config The configuration used to configure the local executor.
-	 */
-	public LocalStreamEnvironment(Configuration config) {
-		if (!ExecutionEnvironment.areExplicitEnvironmentsAllowed()) {
-			throw new InvalidProgramException(
-					"The LocalStreamEnvironment cannot be used when submitting a program through a client, " +
-							"or running in a TestEnvironment context.");
-		}
-		
-		this.conf = config == null ? new Configuration() : config;
-	}
-
-	/**
-	 * Executes the JobGraph of the on a mini cluster of CLusterUtil with a user
-	 * specified name.
-	 * 
-	 * @param jobName
-	 *            name of the job
-	 * @return The result of the job execution, containing elapsed time and accumulators.
-	 */
-	@Override
-	public JobExecutionResult execute(String jobName) throws Exception {
-		// transform the streaming program into a JobGraph
-		JobGraph jobGraph = getStreamGraph().getJobGraph(jobName);
-		
-		Configuration configuration = new Configuration();
-		configuration.addAll(jobGraph.getJobConfiguration());
-
-		configuration.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -1L);
-		configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, getParallelism());
-		
-		// add (and override) the settings with what the user defined
-		configuration.addAll(this.conf);
-		
-		if (LOG.isInfoEnabled()) {
-			LOG.info("Running job on local embedded Flink mini cluster");
-		}
-
-		LocalFlinkMiniCluster exec = new LocalFlinkMiniCluster(configuration, true, StreamingMode.STREAMING);
-		try {
-			exec.start();
-			return exec.submitJobAndWait(jobGraph, getConfig().isSysoutLoggingEnabled());
-		}
-		finally {
-			transformations.clear();
-			exec.stop();
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
deleted file mode 100644
index 02c938e..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
+++ /dev/null
@@ -1,253 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.environment;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.MalformedURLException;
-import java.net.URISyntaxException;
-import java.net.URL;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.flink.api.common.InvalidProgramException;
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.client.program.Client;
-import org.apache.flink.client.program.JobWithJars;
-import org.apache.flink.client.program.ProgramInvocationException;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class RemoteStreamEnvironment extends StreamExecutionEnvironment {
-	
-	private static final Logger LOG = LoggerFactory.getLogger(RemoteStreamEnvironment.class);
-
-	/** The hostname of the JobManager */
-	private final String host;
-
-	/** The port of the JobManager main actor system */
-	private final int port;
-
-	/** The configuration used to parametrize the client that connects to the remote cluster */
-	private final Configuration config;
-
-	/** The jar files that need to be attached to each job */
-	private final List<URL> jarFiles;
-	
-	/** The classpaths that need to be attached to each job */
-	private final List<URL> globalClasspaths;
-
-	/**
-	 * Creates a new RemoteStreamEnvironment that points to the master
-	 * (JobManager) described by the given host name and port.
-	 * 
-	 * @param host
-	 *            The host name or address of the master (JobManager), where the
-	 *            program should be executed.
-	 * @param port
-	 *            The port of the master (JobManager), where the program should
-	 *            be executed.
-	 * @param jarFiles
-	 *            The JAR files with code that needs to be shipped to the
-	 *            cluster. If the program uses user-defined functions,
-	 *            user-defined input formats, or any libraries, those must be
-	 *            provided in the JAR files.
-	 */
-	public RemoteStreamEnvironment(String host, int port, String... jarFiles) {
-		this(host, port, null, jarFiles);
-	}
-
-	/**
-	 * Creates a new RemoteStreamEnvironment that points to the master
-	 * (JobManager) described by the given host name and port.
-	 *
-	 * @param host
-	 *            The host name or address of the master (JobManager), where the
-	 *            program should be executed.
-	 * @param port
-	 *            The port of the master (JobManager), where the program should
-	 *            be executed.
-	 * @param config
-	 *            The configuration used to parametrize the client that connects to the
-	 *            remote cluster.
-	 * @param jarFiles
-	 *            The JAR files with code that needs to be shipped to the
-	 *            cluster. If the program uses user-defined functions,
-	 *            user-defined input formats, or any libraries, those must be
-	 *            provided in the JAR files.
-	 */
-	public RemoteStreamEnvironment(String host, int port, Configuration config, String... jarFiles) {
-		this(host, port, config, jarFiles, null);
-	}
-
-	/**
-	 * Creates a new RemoteStreamEnvironment that points to the master
-	 * (JobManager) described by the given host name and port.
-	 *
-	 * @param host
-	 *            The host name or address of the master (JobManager), where the
-	 *            program should be executed.
-	 * @param port
-	 *            The port of the master (JobManager), where the program should
-	 *            be executed.
-	 * @param config
-	 *            The configuration used to parametrize the client that connects to the
-	 *            remote cluster.
-	 * @param jarFiles
-	 *            The JAR files with code that needs to be shipped to the
-	 *            cluster. If the program uses user-defined functions,
-	 *            user-defined input formats, or any libraries, those must be
-	 *            provided in the JAR files.
-	 * @param globalClasspaths 
-	 *            The paths of directories and JAR files that are added to each user code 
-	 *            classloader on all nodes in the cluster. Note that the paths must specify a 
-	 *            protocol (e.g. file://) and be accessible on all nodes (e.g. by means of a NFS share).
-	 *            The protocol must be supported by the {@link java.net.URLClassLoader}.
-	 */
-	public RemoteStreamEnvironment(String host, int port, Configuration config, String[] jarFiles, URL[] globalClasspaths) {
-		if (!ExecutionEnvironment.areExplicitEnvironmentsAllowed()) {
-			throw new InvalidProgramException(
-					"The RemoteEnvironment cannot be used when submitting a program through a client, " +
-							"or running in a TestEnvironment context.");
-		}
-		
-		if (host == null) {
-			throw new NullPointerException("Host must not be null.");
-		}
-		if (port < 1 || port >= 0xffff) {
-			throw new IllegalArgumentException("Port out of range");
-		}
-
-		this.host = host;
-		this.port = port;
-		this.config = config == null ? new Configuration() : config;
-		this.jarFiles = new ArrayList<>(jarFiles.length);
-		for (String jarFile : jarFiles) {
-			try {
-				URL jarFileUrl = new File(jarFile).getAbsoluteFile().toURI().toURL();
-				this.jarFiles.add(jarFileUrl);
-				JobWithJars.checkJarFile(jarFileUrl);
-			} catch (MalformedURLException e) {
-				throw new IllegalArgumentException("JAR file path is invalid '" + jarFile + "'", e);
-			} catch (IOException e) {
-				throw new RuntimeException("Problem with jar file " + jarFile, e);
-			}
-		}
-		if (globalClasspaths == null) {
-			this.globalClasspaths = Collections.emptyList();
-		}
-		else {
-			this.globalClasspaths = Arrays.asList(globalClasspaths);
-		}
-	}
-
-	@Override
-	public JobExecutionResult execute(String jobName) throws ProgramInvocationException {
-		JobGraph jobGraph = getStreamGraph().getJobGraph(jobName);
-		transformations.clear();
-		return executeRemotely(jobGraph);
-	}
-
-	/**
-	 * Executes the remote job.
-	 * 
-	 * @param jobGraph
-	 *            jobGraph to execute
-	 * @return The result of the job execution, containing elapsed time and accumulators.
-	 */
-	private JobExecutionResult executeRemotely(JobGraph jobGraph) throws ProgramInvocationException {
-		if (LOG.isInfoEnabled()) {
-			LOG.info("Running remotely at {}:{}", host, port);
-		}
-
-		for (URL jarFile : jarFiles) {
-			try {
-				jobGraph.addJar(new Path(jarFile.toURI()));
-			} catch (URISyntaxException e) {
-				throw new ProgramInvocationException("URL is invalid", e);
-			}
-		}
-
-		ClassLoader usercodeClassLoader = JobWithJars.buildUserCodeClassLoader(jarFiles, globalClasspaths,
-			getClass().getClassLoader());
-		
-		Configuration configuration = new Configuration();
-		configuration.addAll(jobGraph.getJobConfiguration());
-		configuration.addAll(this.config);
-		
-		configuration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, host);
-		configuration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, port);
-
-		Client client;
-		try {
-			client = new Client(configuration);
-			client.setPrintStatusDuringExecution(getConfig().isSysoutLoggingEnabled());
-		}
-		catch (Exception e) {
-			throw new ProgramInvocationException("Cannot establish connection to JobManager: " + e.getMessage(), e);
-		}
-
-		try {
-			return client.runBlocking(jobGraph, usercodeClassLoader);
-		}
-		catch (ProgramInvocationException e) {
-			throw e;
-		}
-		catch (Exception e) {
-			String term = e.getMessage() == null ? "." : (": " + e.getMessage());
-			throw new ProgramInvocationException("The program execution failed" + term, e);
-		}
-		finally {
-			client.shutdown();
-		}
-	}
-
-	@Override
-	public String toString() {
-		return "Remote Environment (" + this.host + ":" + this.port + " - parallelism = "
-				+ (getParallelism() == -1 ? "default" : getParallelism()) + ")";
-	}
-
-	/**
-	 * Gets the hostname of the master (JobManager), where the
-	 * program will be executed.
-	 *
-	 * @return The hostname of the master
-	 */
-	public String getHost() {
-		return host;
-	}
-
-	/**
-	 * Gets the port of the master (JobManager), where the
-	 * program will be executed.
-	 *
-	 * @return The port of the master
-	 */
-	public int getPort() {
-		return port;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
deleted file mode 100644
index b2a5435..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.environment;
-
-import java.net.URL;
-import java.util.List;
-
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.common.JobSubmissionResult;
-import org.apache.flink.client.program.Client;
-import org.apache.flink.client.program.JobWithJars;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.GlobalConfiguration;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class StreamContextEnvironment extends StreamExecutionEnvironment {
-
-	private static final Logger LOG = LoggerFactory.getLogger(StreamContextEnvironment.class);
-
-	private final List<URL> jars;
-
-	private final List<URL> classpaths;
-	
-	private final Client client;
-
-	private final ClassLoader userCodeClassLoader;
-	
-	private final boolean wait;
-
-	protected StreamContextEnvironment(Client client, List<URL> jars, List<URL> classpaths, int parallelism,
-			boolean wait) {
-		this.client = client;
-		this.jars = jars;
-		this.classpaths = classpaths;
-		this.wait = wait;
-		
-		this.userCodeClassLoader = JobWithJars.buildUserCodeClassLoader(jars, classpaths,
-				getClass().getClassLoader());
-		
-		if (parallelism > 0) {
-			setParallelism(parallelism);
-		}
-		else {
-			// determine parallelism
-			setParallelism(GlobalConfiguration.getInteger(
-					ConfigConstants.DEFAULT_PARALLELISM_KEY,
-					ConfigConstants.DEFAULT_PARALLELISM));
-		}
-	}
-
-	@Override
-	public JobExecutionResult execute() throws Exception {
-		return execute(null);
-	}
-
-	@Override
-	public JobExecutionResult execute(String jobName) throws Exception {
-
-		JobGraph jobGraph;
-		if (jobName == null) {
-			jobGraph = this.getStreamGraph().getJobGraph();
-		} else {
-			jobGraph = this.getStreamGraph().getJobGraph(jobName);
-		}
-
-		transformations.clear();
-
-		// attach all necessary jar files to the JobGraph
-		for (URL file : jars) {
-			jobGraph.addJar(new Path(file.toURI()));
-		}
-
-		jobGraph.setClasspaths(classpaths);
-
-		// execute the programs
-		if (wait) {
-			return client.runBlocking(jobGraph, userCodeClassLoader);
-		} else {
-			JobSubmissionResult result = client.runDetached(jobGraph, userCodeClassLoader);
-			LOG.warn("Job was executed in detached mode, the results will be available on completion.");
-			return JobExecutionResult.fromJobSubmissionResult(result);
-		}
-	}
-}