You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/21 11:03:57 UTC
[41/51] [partial] flink git commit: [FLINK-2877] Move Streaming API
out of Staging package
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
deleted file mode 100644
index 033e84f..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
+++ /dev/null
@@ -1,648 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.datastream;
-
-import org.apache.commons.lang.SerializationUtils;
-import org.apache.flink.api.common.functions.FoldFunction;
-import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.Utils;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction;
-import org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator;
-import org.apache.flink.streaming.api.functions.aggregation.SumAggregator;
-import org.apache.flink.streaming.api.functions.windowing.FoldWindowFunction;
-import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
-import org.apache.flink.streaming.api.functions.windowing.ReduceWindowFunction;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows;
-import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows;
-import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
-import org.apache.flink.streaming.api.windowing.evictors.Evictor;
-import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
-import org.apache.flink.streaming.api.windowing.triggers.Trigger;
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-import org.apache.flink.streaming.api.windowing.windows.Window;
-import org.apache.flink.streaming.runtime.operators.windowing.AccumulatingProcessingTimeWindowOperator;
-import org.apache.flink.streaming.runtime.operators.windowing.AggregatingProcessingTimeWindowOperator;
-import org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator;
-import org.apache.flink.streaming.runtime.operators.windowing.WindowOperator;
-import org.apache.flink.streaming.runtime.operators.windowing.buffers.HeapWindowBuffer;
-import org.apache.flink.streaming.runtime.operators.windowing.buffers.PreAggregatingHeapWindowBuffer;
-
-/**
- * A {@code WindowedStream} represents a data stream where elements are grouped by
- * key, and for each key, the stream of elements is split into windows based on a
- * {@link org.apache.flink.streaming.api.windowing.assigners.WindowAssigner}. Window emission
- * is triggered based on a {@link org.apache.flink.streaming.api.windowing.triggers.Trigger}.
- *
- * <p>
- * The windows are conceptually evaluated for each key individually, meaning windows can trigger at
- * different points for each key.
- *
- * <p>
- * If an {@link Evictor} is specified it will be used to evict elements from the window after
- * evaluation was triggered by the {@code Trigger} but before the actual evaluation of the window.
- * When using an evictor window performance will degrade significantly, since
- * pre-aggregation of window results cannot be used.
- *
- * <p>
- * Note that the {@code WindowedStream} is purely and API construct, during runtime
- * the {@code WindowedStream} will be collapsed together with the
- * {@code KeyedStream} and the operation over the window into one single operation.
- *
- * @param <T> The type of elements in the stream.
- * @param <K> The type of the key by which elements are grouped.
- * @param <W> The type of {@code Window} that the {@code WindowAssigner} assigns the elements to.
- */
-public class WindowedStream<T, K, W extends Window> {
-
- /** The keyed data stream that is windowed by this stream */
- private final KeyedStream<T, K> input;
-
- /** The window assigner */
- private final WindowAssigner<? super T, W> windowAssigner;
-
- /** The trigger that is used for window evaluation/emission. */
- private Trigger<? super T, ? super W> trigger;
-
- /** The evictor that is used for evicting elements before window evaluation. */
- private Evictor<? super T, ? super W> evictor;
-
-
- public WindowedStream(KeyedStream<T, K> input,
- WindowAssigner<? super T, W> windowAssigner) {
- this.input = input;
- this.windowAssigner = windowAssigner;
- this.trigger = windowAssigner.getDefaultTrigger(input.getExecutionEnvironment());
- }
-
- /**
- * Sets the {@code Trigger} that should be used to trigger window emission.
- */
- public WindowedStream<T, K, W> trigger(Trigger<? super T, ? super W> trigger) {
- this.trigger = trigger;
- return this;
- }
-
- /**
- * Sets the {@code Evictor} that should be used to evict elements from a window before emission.
- *
- * <p>
- * Note: When using an evictor window performance will degrade significantly, since
- * pre-aggregation of window results cannot be used.
- */
- public WindowedStream<T, K, W> evictor(Evictor<? super T, ? super W> evictor) {
- this.evictor = evictor;
- return this;
- }
-
-
- // ------------------------------------------------------------------------
- // Operations on the keyed windows
- // ------------------------------------------------------------------------
-
- /**
- * Applies a reduce function to the window. The window function is called for each evaluation
- * of the window for each key individually. The output of the reduce function is interpreted
- * as a regular non-windowed stream.
- * <p>
- * This window will try and pre-aggregate data as much as the window policies permit. For example,
- * tumbling time windows can perfectly pre-aggregate the data, meaning that only one element per
- * key is stored. Sliding time windows will pre-aggregate on the granularity of the slide interval,
- * so a few elements are stored per key (one per slide interval).
- * Custom windows may not be able to pre-aggregate, or may need to store extra values in an
- * aggregation tree.
- *
- * @param function The reduce function.
- * @return The data stream that is the result of applying the reduce function to the window.
- */
- public SingleOutputStreamOperator<T, ?> reduce(ReduceFunction<T> function) {
- //clean the closure
- function = input.getExecutionEnvironment().clean(function);
-
- String callLocation = Utils.getCallLocationName();
- String udfName = "Reduce at " + callLocation;
-
- SingleOutputStreamOperator<T, ?> result = createFastTimeOperatorIfValid(function, input.getType(), udfName);
- if (result != null) {
- return result;
- }
-
- String opName = "TriggerWindow(" + windowAssigner + ", " + trigger + ", " + udfName + ")";
- KeySelector<T, K> keySel = input.getKeySelector();
-
- OneInputStreamOperator<T, T> operator;
-
- boolean setProcessingTime = input.getExecutionEnvironment().getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime;
-
- if (evictor != null) {
- operator = new EvictingWindowOperator<>(windowAssigner,
- windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
- keySel,
- input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
- new HeapWindowBuffer.Factory<T>(),
- new ReduceWindowFunction<K, W, T>(function),
- trigger,
- evictor).enableSetProcessingTime(setProcessingTime);
-
- } else {
- // we need to copy because we need our own instance of the pre aggregator
- @SuppressWarnings("unchecked")
- ReduceFunction<T> functionCopy = (ReduceFunction<T>) SerializationUtils.clone(function);
-
- operator = new WindowOperator<>(windowAssigner,
- windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
- keySel,
- input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
- new PreAggregatingHeapWindowBuffer.Factory<>(functionCopy),
- new ReduceWindowFunction<K, W, T>(function),
- trigger).enableSetProcessingTime(setProcessingTime);
- }
-
- return input.transform(opName, input.getType(), operator);
- }
-
- /**
- * Applies the given fold function to each window. The window function is called for each
- * evaluation of the window for each key individually. The output of the reduce function is
- * interpreted as a regular non-windowed stream.
- *
- * @param function The fold function.
- * @return The data stream that is the result of applying the fold function to the window.
- */
- public <R> SingleOutputStreamOperator<R, ?> fold(R initialValue, FoldFunction<T, R> function) {
- //clean the closure
- function = input.getExecutionEnvironment().clean(function);
-
- TypeInformation<R> resultType = TypeExtractor.getFoldReturnTypes(function, input.getType(),
- Utils.getCallLocationName(), true);
-
- return apply(new FoldWindowFunction<K, W, T, R>(initialValue, function), resultType);
- }
-
- /**
- * Applies the given fold function to each window. The window function is called for each
- * evaluation of the window for each key individually. The output of the reduce function is
- * interpreted as a regular non-windowed stream.
- *
- * @param function The fold function.
- * @return The data stream that is the result of applying the fold function to the window.
- */
- public <R> SingleOutputStreamOperator<R, ?> fold(R initialValue, FoldFunction<T, R> function, TypeInformation<R> resultType) {
- //clean the closure
- function = input.getExecutionEnvironment().clean(function);
- return apply(new FoldWindowFunction<K, W, T, R>(initialValue, function), resultType);
- }
-
- /**
- * Applies the given window function to each window. The window function is called for each
- * evaluation of the window for each key individually. The output of the window function is
- * interpreted as a regular non-windowed stream.
- *
- * <p>
- * Not that this function requires that all data in the windows is buffered until the window
- * is evaluated, as the function provides no means of pre-aggregation.
- *
- * @param function The window function.
- * @return The data stream that is the result of applying the window function to the window.
- */
- public <R> SingleOutputStreamOperator<R, ?> apply(WindowFunction<T, R, K, W> function) {
- TypeInformation<T> inType = input.getType();
- TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType(
- function, WindowFunction.class, true, true, inType, null, false);
-
- return apply(function, resultType);
- }
-
- /**
- * Applies the given window function to each window. The window function is called for each
- * evaluation of the window for each key individually. The output of the window function is
- * interpreted as a regular non-windowed stream.
- *
- * <p>
- * Not that this function requires that all data in the windows is buffered until the window
- * is evaluated, as the function provides no means of pre-aggregation.
- *
- * @param function The window function.
- * @param resultType Type information for the result type of the window function
- * @return The data stream that is the result of applying the window function to the window.
- */
- public <R> SingleOutputStreamOperator<R, ?> apply(WindowFunction<T, R, K, W> function, TypeInformation<R> resultType) {
- //clean the closure
- function = input.getExecutionEnvironment().clean(function);
-
- String callLocation = Utils.getCallLocationName();
- String udfName = "WindowApply at " + callLocation;
-
- SingleOutputStreamOperator<R, ?> result = createFastTimeOperatorIfValid(function, resultType, udfName);
- if (result != null) {
- return result;
- }
-
-
- String opName = "TriggerWindow(" + windowAssigner + ", " + trigger + ", " + udfName + ")";
- KeySelector<T, K> keySel = input.getKeySelector();
-
- WindowOperator<K, T, R, W> operator;
-
- boolean setProcessingTime = input.getExecutionEnvironment().getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime;
-
- if (evictor != null) {
- operator = new EvictingWindowOperator<>(windowAssigner,
- windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
- keySel,
- input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
- new HeapWindowBuffer.Factory<T>(),
- function,
- trigger,
- evictor).enableSetProcessingTime(setProcessingTime);
-
- } else {
- operator = new WindowOperator<>(windowAssigner,
- windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
- keySel,
- input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
- new HeapWindowBuffer.Factory<T>(),
- function,
- trigger).enableSetProcessingTime(setProcessingTime);
- }
-
- return input.transform(opName, resultType, operator);
- }
-
- /**
- * Applies the given window function to each window. The window function is called for each
- * evaluation of the window for each key individually. The output of the window function is
- * interpreted as a regular non-windowed stream.
- *
- * <p>
- * Arriving data is pre-aggregated using the given pre-aggregation reducer.
- *
- * @param preAggregator The reduce function that is used for pre-aggregation
- * @param function The window function.
- * @return The data stream that is the result of applying the window function to the window.
- */
-
- public <R> SingleOutputStreamOperator<R, ?> apply(ReduceFunction<T> preAggregator, WindowFunction<T, R, K, W> function) {
- TypeInformation<T> inType = input.getType();
- TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType(
- function, WindowFunction.class, true, true, inType, null, false);
-
- return apply(preAggregator, function, resultType);
- }
-
- /**
- * Applies the given window function to each window. The window function is called for each
- * evaluation of the window for each key individually. The output of the window function is
- * interpreted as a regular non-windowed stream.
- *
- * <p>
- * Arriving data is pre-aggregated using the given pre-aggregation reducer.
- *
- * @param preAggregator The reduce function that is used for pre-aggregation
- * @param function The window function.
- * @param resultType Type information for the result type of the window function
- * @return The data stream that is the result of applying the window function to the window.
- */
- public <R> SingleOutputStreamOperator<R, ?> apply(ReduceFunction<T> preAggregator, WindowFunction<T, R, K, W> function, TypeInformation<R> resultType) {
- //clean the closures
- function = input.getExecutionEnvironment().clean(function);
- preAggregator = input.getExecutionEnvironment().clean(preAggregator);
-
- String callLocation = Utils.getCallLocationName();
- String udfName = "WindowApply at " + callLocation;
-
- String opName = "TriggerWindow(" + windowAssigner + ", " + trigger + ", " + udfName + ")";
- KeySelector<T, K> keySel = input.getKeySelector();
-
- OneInputStreamOperator<T, R> operator;
-
- boolean setProcessingTime = input.getExecutionEnvironment().getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime;
-
- if (evictor != null) {
- operator = new EvictingWindowOperator<>(windowAssigner,
- windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
- keySel,
- input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
- new HeapWindowBuffer.Factory<T>(),
- function,
- trigger,
- evictor).enableSetProcessingTime(setProcessingTime);
-
- } else {
- operator = new WindowOperator<>(windowAssigner,
- windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
- keySel,
- input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
- new PreAggregatingHeapWindowBuffer.Factory<>(preAggregator),
- function,
- trigger).enableSetProcessingTime(setProcessingTime);
- }
-
- return input.transform(opName, resultType, operator);
- }
-
- // ------------------------------------------------------------------------
- // Aggregations on the keyed windows
- // ------------------------------------------------------------------------
-
- /**
- * Applies an aggregation that sums every window of the data stream at the
- * given position.
- *
- * @param positionToSum The position in the tuple/array to sum
- * @return The transformed DataStream.
- */
- public SingleOutputStreamOperator<T, ?> sum(int positionToSum) {
- return aggregate(new SumAggregator<>(positionToSum, input.getType(), input.getExecutionConfig()));
- }
-
- /**
- * Applies an aggregation that sums every window of the pojo data stream at
- * the given field for every window.
- *
- * <p>
- * A field expression is either
- * the name of a public field or a getter method with parentheses of the
- * stream's underlying type. A dot can be used to drill down into objects,
- * as in {@code "field1.getInnerField2()" }.
- *
- * @param field The field to sum
- * @return The transformed DataStream.
- */
- public SingleOutputStreamOperator<T, ?> sum(String field) {
- return aggregate(new SumAggregator<>(field, input.getType(), input.getExecutionConfig()));
- }
-
- /**
- * Applies an aggregation that that gives the minimum value of every window
- * of the data stream at the given position.
- *
- * @param positionToMin The position to minimize
- * @return The transformed DataStream.
- */
- public SingleOutputStreamOperator<T, ?> min(int positionToMin) {
- return aggregate(new ComparableAggregator<>(positionToMin, input.getType(), AggregationFunction.AggregationType.MIN, input.getExecutionConfig()));
- }
-
- /**
- * Applies an aggregation that that gives the minimum value of the pojo data
- * stream at the given field expression for every window.
- *
- * <p>
- * A field
- * expression is either the name of a public field or a getter method with
- * parentheses of the {@link DataStream}S underlying type. A dot can be used
- * to drill down into objects, as in {@code "field1.getInnerField2()" }.
- *
- * @param field The field expression based on which the aggregation will be applied.
- * @return The transformed DataStream.
- */
- public SingleOutputStreamOperator<T, ?> min(String field) {
- return aggregate(new ComparableAggregator<>(field, input.getType(), AggregationFunction.AggregationType.MIN, false, input.getExecutionConfig()));
- }
-
- /**
- * Applies an aggregation that gives the minimum element of every window of
- * the data stream by the given position. If more elements have the same
- * minimum value the operator returns the first element by default.
- *
- * @param positionToMinBy
- * The position to minimize by
- * @return The transformed DataStream.
- */
- public SingleOutputStreamOperator<T, ?> minBy(int positionToMinBy) {
- return this.minBy(positionToMinBy, true);
- }
-
- /**
- * Applies an aggregation that gives the minimum element of every window of
- * the data stream by the given position. If more elements have the same
- * minimum value the operator returns the first element by default.
- *
- * @param positionToMinBy The position to minimize by
- * @return The transformed DataStream.
- */
- public SingleOutputStreamOperator<T, ?> minBy(String positionToMinBy) {
- return this.minBy(positionToMinBy, true);
- }
-
- /**
- * Applies an aggregation that gives the minimum element of every window of
- * the data stream by the given position. If more elements have the same
- * minimum value the operator returns either the first or last one depending
- * on the parameter setting.
- *
- * @param positionToMinBy The position to minimize
- * @param first If true, then the operator return the first element with the minimum value, otherwise returns the last
- * @return The transformed DataStream.
- */
- public SingleOutputStreamOperator<T, ?> minBy(int positionToMinBy, boolean first) {
- return aggregate(new ComparableAggregator<>(positionToMinBy, input.getType(), AggregationFunction.AggregationType.MINBY, first, input.getExecutionConfig()));
- }
-
- /**
- * Applies an aggregation that that gives the minimum element of the pojo
- * data stream by the given field expression for every window. A field
- * expression is either the name of a public field or a getter method with
- * parentheses of the {@link DataStream DataStreams} underlying type. A dot can be used
- * to drill down into objects, as in {@code "field1.getInnerField2()" }.
- *
- * @param field The field expression based on which the aggregation will be applied.
- * @param first If True then in case of field equality the first object will be returned
- * @return The transformed DataStream.
- */
- public SingleOutputStreamOperator<T, ?> minBy(String field, boolean first) {
- return aggregate(new ComparableAggregator<>(field, input.getType(), AggregationFunction.AggregationType.MINBY, first, input.getExecutionConfig()));
- }
-
- /**
- * Applies an aggregation that gives the maximum value of every window of
- * the data stream at the given position.
- *
- * @param positionToMax The position to maximize
- * @return The transformed DataStream.
- */
- public SingleOutputStreamOperator<T, ?> max(int positionToMax) {
- return aggregate(new ComparableAggregator<>(positionToMax, input.getType(), AggregationFunction.AggregationType.MAX, input.getExecutionConfig()));
- }
-
- /**
- * Applies an aggregation that that gives the maximum value of the pojo data
- * stream at the given field expression for every window. A field expression
- * is either the name of a public field or a getter method with parentheses
- * of the {@link DataStream DataStreams} underlying type. A dot can be used to drill
- * down into objects, as in {@code "field1.getInnerField2()" }.
- *
- * @param field The field expression based on which the aggregation will be applied.
- * @return The transformed DataStream.
- */
- public SingleOutputStreamOperator<T, ?> max(String field) {
- return aggregate(new ComparableAggregator<>(field, input.getType(), AggregationFunction.AggregationType.MAX, false, input.getExecutionConfig()));
- }
-
- /**
- * Applies an aggregation that gives the maximum element of every window of
- * the data stream by the given position. If more elements have the same
- * maximum value the operator returns the first by default.
- *
- * @param positionToMaxBy
- * The position to maximize by
- * @return The transformed DataStream.
- */
- public SingleOutputStreamOperator<T, ?> maxBy(int positionToMaxBy) {
- return this.maxBy(positionToMaxBy, true);
- }
-
- /**
- * Applies an aggregation that gives the maximum element of every window of
- * the data stream by the given position. If more elements have the same
- * maximum value the operator returns the first by default.
- *
- * @param positionToMaxBy
- * The position to maximize by
- * @return The transformed DataStream.
- */
- public SingleOutputStreamOperator<T, ?> maxBy(String positionToMaxBy) {
- return this.maxBy(positionToMaxBy, true);
- }
-
- /**
- * Applies an aggregation that gives the maximum element of every window of
- * the data stream by the given position. If more elements have the same
- * maximum value the operator returns either the first or last one depending
- * on the parameter setting.
- *
- * @param positionToMaxBy The position to maximize by
- * @param first If true, then the operator return the first element with the maximum value, otherwise returns the last
- * @return The transformed DataStream.
- */
- public SingleOutputStreamOperator<T, ?> maxBy(int positionToMaxBy, boolean first) {
- return aggregate(new ComparableAggregator<>(positionToMaxBy, input.getType(), AggregationFunction.AggregationType.MAXBY, first, input.getExecutionConfig()));
- }
-
- /**
- * Applies an aggregation that that gives the maximum element of the pojo
- * data stream by the given field expression for every window. A field
- * expression is either the name of a public field or a getter method with
- * parentheses of the {@link DataStream}S underlying type. A dot can be used
- * to drill down into objects, as in {@code "field1.getInnerField2()" }.
- *
- * @param field The field expression based on which the aggregation will be applied.
- * @param first If True then in case of field equality the first object will be returned
- * @return The transformed DataStream.
- */
- public SingleOutputStreamOperator<T, ?> maxBy(String field, boolean first) {
- return aggregate(new ComparableAggregator<>(field, input.getType(), AggregationFunction.AggregationType.MAXBY, first, input.getExecutionConfig()));
- }
-
- private SingleOutputStreamOperator<T, ?> aggregate(AggregationFunction<T> aggregator) {
- return reduce(aggregator);
- }
-
- // ------------------------------------------------------------------------
- // Utilities
- // ------------------------------------------------------------------------
-
- private <R> SingleOutputStreamOperator<R, ?> createFastTimeOperatorIfValid(
- Function function,
- TypeInformation<R> resultType,
- String functionName) {
-
- if (windowAssigner instanceof SlidingTimeWindows && trigger instanceof ProcessingTimeTrigger && evictor == null) {
- SlidingTimeWindows timeWindows = (SlidingTimeWindows) windowAssigner;
- final long windowLength = timeWindows.getSize();
- final long windowSlide = timeWindows.getSlide();
-
- String opName = "Fast " + timeWindows + " of " + functionName;
-
- if (function instanceof ReduceFunction) {
- @SuppressWarnings("unchecked")
- ReduceFunction<T> reducer = (ReduceFunction<T>) function;
-
- @SuppressWarnings("unchecked")
- OneInputStreamOperator<T, R> op = (OneInputStreamOperator<T, R>)
- new AggregatingProcessingTimeWindowOperator<>(
- reducer, input.getKeySelector(),
- input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
- input.getType().createSerializer(getExecutionEnvironment().getConfig()),
- windowLength, windowSlide);
- return input.transform(opName, resultType, op);
- }
- else if (function instanceof WindowFunction) {
- @SuppressWarnings("unchecked")
- WindowFunction<T, R, K, TimeWindow> wf = (WindowFunction<T, R, K, TimeWindow>) function;
-
- OneInputStreamOperator<T, R> op = new AccumulatingProcessingTimeWindowOperator<>(
- wf, input.getKeySelector(),
- input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
- input.getType().createSerializer(getExecutionEnvironment().getConfig()),
- windowLength, windowSlide);
- return input.transform(opName, resultType, op);
- }
- } else if (windowAssigner instanceof TumblingTimeWindows && trigger instanceof ProcessingTimeTrigger && evictor == null) {
- TumblingTimeWindows timeWindows = (TumblingTimeWindows) windowAssigner;
- final long windowLength = timeWindows.getSize();
- final long windowSlide = timeWindows.getSize();
-
- String opName = "Fast " + timeWindows + " of " + functionName;
-
- if (function instanceof ReduceFunction) {
- @SuppressWarnings("unchecked")
- ReduceFunction<T> reducer = (ReduceFunction<T>) function;
-
- @SuppressWarnings("unchecked")
- OneInputStreamOperator<T, R> op = (OneInputStreamOperator<T, R>)
- new AggregatingProcessingTimeWindowOperator<>(
- reducer,
- input.getKeySelector(),
- input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
- input.getType().createSerializer(getExecutionEnvironment().getConfig()),
- windowLength, windowSlide);
- return input.transform(opName, resultType, op);
- }
- else if (function instanceof WindowFunction) {
- @SuppressWarnings("unchecked")
- WindowFunction<T, R, K, TimeWindow> wf = (WindowFunction<T, R, K, TimeWindow>) function;
-
- OneInputStreamOperator<T, R> op = new AccumulatingProcessingTimeWindowOperator<>(
- wf, input.getKeySelector(),
- input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
- input.getType().createSerializer(getExecutionEnvironment().getConfig()),
- windowLength, windowSlide);
- return input.transform(opName, resultType, op);
- }
- }
-
- return null;
- }
-
- public StreamExecutionEnvironment getExecutionEnvironment() {
- return input.getExecutionEnvironment();
- }
-
- public TypeInformation<T> getInputType() {
- return input.getType();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
deleted file mode 100644
index f0bd174..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.environment;
-
-import org.apache.flink.api.common.InvalidProgramException;
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.StreamingMode;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * The LocalStreamEnvironment is a StreamExecutionEnvironment that runs the program locally,
- * multi-threaded, in the JVM where the environment is instantiated. It spawns an embedded
- * Flink cluster in the background and executes the program on that cluster.
- *
- * <p>When this environment is instantiated, it uses a default parallelism of {@code 1}. The default
- * parallelism can be set via {@link #setParallelism(int)}.
- *
- * <p>Local environments can also be instantiated through {@link StreamExecutionEnvironment#createLocalEnvironment()}
- * and {@link StreamExecutionEnvironment#createLocalEnvironment(int)}. The former version will pick a
- * default parallelism equal to the number of hardware contexts in the local machine.
- */
-public class LocalStreamEnvironment extends StreamExecutionEnvironment {
-
- private static final Logger LOG = LoggerFactory.getLogger(LocalStreamEnvironment.class);
-
- /** The configuration to use for the local cluster */
- private final Configuration conf;
-
- /**
- * Creates a new local stream environment that uses the default configuration.
- */
- public LocalStreamEnvironment() {
- this(null);
- }
-
- /**
- * Creates a new local stream environment that configures its local executor with the given configuration.
- *
- * @param config The configuration used to configure the local executor.
- */
- public LocalStreamEnvironment(Configuration config) {
- if (!ExecutionEnvironment.areExplicitEnvironmentsAllowed()) {
- throw new InvalidProgramException(
- "The LocalStreamEnvironment cannot be used when submitting a program through a client, " +
- "or running in a TestEnvironment context.");
- }
-
- this.conf = config == null ? new Configuration() : config;
- }
-
- /**
- * Executes the JobGraph of the on a mini cluster of CLusterUtil with a user
- * specified name.
- *
- * @param jobName
- * name of the job
- * @return The result of the job execution, containing elapsed time and accumulators.
- */
- @Override
- public JobExecutionResult execute(String jobName) throws Exception {
- // transform the streaming program into a JobGraph
- JobGraph jobGraph = getStreamGraph().getJobGraph(jobName);
-
- Configuration configuration = new Configuration();
- configuration.addAll(jobGraph.getJobConfiguration());
-
- configuration.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -1L);
- configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, getParallelism());
-
- // add (and override) the settings with what the user defined
- configuration.addAll(this.conf);
-
- if (LOG.isInfoEnabled()) {
- LOG.info("Running job on local embedded Flink mini cluster");
- }
-
- LocalFlinkMiniCluster exec = new LocalFlinkMiniCluster(configuration, true, StreamingMode.STREAMING);
- try {
- exec.start();
- return exec.submitJobAndWait(jobGraph, getConfig().isSysoutLoggingEnabled());
- }
- finally {
- transformations.clear();
- exec.stop();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
deleted file mode 100644
index 02c938e..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
+++ /dev/null
@@ -1,253 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.environment;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.MalformedURLException;
-import java.net.URISyntaxException;
-import java.net.URL;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.flink.api.common.InvalidProgramException;
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.client.program.Client;
-import org.apache.flink.client.program.JobWithJars;
-import org.apache.flink.client.program.ProgramInvocationException;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class RemoteStreamEnvironment extends StreamExecutionEnvironment {
-
- private static final Logger LOG = LoggerFactory.getLogger(RemoteStreamEnvironment.class);
-
- /** The hostname of the JobManager */
- private final String host;
-
- /** The port of the JobManager main actor system */
- private final int port;
-
- /** The configuration used to parametrize the client that connects to the remote cluster */
- private final Configuration config;
-
- /** The jar files that need to be attached to each job */
- private final List<URL> jarFiles;
-
- /** The classpaths that need to be attached to each job */
- private final List<URL> globalClasspaths;
-
- /**
- * Creates a new RemoteStreamEnvironment that points to the master
- * (JobManager) described by the given host name and port.
- *
- * @param host
- * The host name or address of the master (JobManager), where the
- * program should be executed.
- * @param port
- * The port of the master (JobManager), where the program should
- * be executed.
- * @param jarFiles
- * The JAR files with code that needs to be shipped to the
- * cluster. If the program uses user-defined functions,
- * user-defined input formats, or any libraries, those must be
- * provided in the JAR files.
- */
- public RemoteStreamEnvironment(String host, int port, String... jarFiles) {
- this(host, port, null, jarFiles);
- }
-
- /**
- * Creates a new RemoteStreamEnvironment that points to the master
- * (JobManager) described by the given host name and port.
- *
- * @param host
- * The host name or address of the master (JobManager), where the
- * program should be executed.
- * @param port
- * The port of the master (JobManager), where the program should
- * be executed.
- * @param config
- * The configuration used to parametrize the client that connects to the
- * remote cluster.
- * @param jarFiles
- * The JAR files with code that needs to be shipped to the
- * cluster. If the program uses user-defined functions,
- * user-defined input formats, or any libraries, those must be
- * provided in the JAR files.
- */
- public RemoteStreamEnvironment(String host, int port, Configuration config, String... jarFiles) {
- this(host, port, config, jarFiles, null);
- }
-
- /**
- * Creates a new RemoteStreamEnvironment that points to the master
- * (JobManager) described by the given host name and port.
- *
- * @param host
- * The host name or address of the master (JobManager), where the
- * program should be executed.
- * @param port
- * The port of the master (JobManager), where the program should
- * be executed.
- * @param config
- * The configuration used to parametrize the client that connects to the
- * remote cluster.
- * @param jarFiles
- * The JAR files with code that needs to be shipped to the
- * cluster. If the program uses user-defined functions,
- * user-defined input formats, or any libraries, those must be
- * provided in the JAR files.
- * @param globalClasspaths
- * The paths of directories and JAR files that are added to each user code
- * classloader on all nodes in the cluster. Note that the paths must specify a
- * protocol (e.g. file://) and be accessible on all nodes (e.g. by means of a NFS share).
- * The protocol must be supported by the {@link java.net.URLClassLoader}.
- */
- public RemoteStreamEnvironment(String host, int port, Configuration config, String[] jarFiles, URL[] globalClasspaths) {
- if (!ExecutionEnvironment.areExplicitEnvironmentsAllowed()) {
- throw new InvalidProgramException(
- "The RemoteEnvironment cannot be used when submitting a program through a client, " +
- "or running in a TestEnvironment context.");
- }
-
- if (host == null) {
- throw new NullPointerException("Host must not be null.");
- }
- if (port < 1 || port >= 0xffff) {
- throw new IllegalArgumentException("Port out of range");
- }
-
- this.host = host;
- this.port = port;
- this.config = config == null ? new Configuration() : config;
- this.jarFiles = new ArrayList<>(jarFiles.length);
- for (String jarFile : jarFiles) {
- try {
- URL jarFileUrl = new File(jarFile).getAbsoluteFile().toURI().toURL();
- this.jarFiles.add(jarFileUrl);
- JobWithJars.checkJarFile(jarFileUrl);
- } catch (MalformedURLException e) {
- throw new IllegalArgumentException("JAR file path is invalid '" + jarFile + "'", e);
- } catch (IOException e) {
- throw new RuntimeException("Problem with jar file " + jarFile, e);
- }
- }
- if (globalClasspaths == null) {
- this.globalClasspaths = Collections.emptyList();
- }
- else {
- this.globalClasspaths = Arrays.asList(globalClasspaths);
- }
- }
-
- @Override
- public JobExecutionResult execute(String jobName) throws ProgramInvocationException {
- JobGraph jobGraph = getStreamGraph().getJobGraph(jobName);
- transformations.clear();
- return executeRemotely(jobGraph);
- }
-
- /**
- * Executes the remote job.
- *
- * @param jobGraph
- * jobGraph to execute
- * @return The result of the job execution, containing elapsed time and accumulators.
- */
- private JobExecutionResult executeRemotely(JobGraph jobGraph) throws ProgramInvocationException {
- if (LOG.isInfoEnabled()) {
- LOG.info("Running remotely at {}:{}", host, port);
- }
-
- for (URL jarFile : jarFiles) {
- try {
- jobGraph.addJar(new Path(jarFile.toURI()));
- } catch (URISyntaxException e) {
- throw new ProgramInvocationException("URL is invalid", e);
- }
- }
-
- ClassLoader usercodeClassLoader = JobWithJars.buildUserCodeClassLoader(jarFiles, globalClasspaths,
- getClass().getClassLoader());
-
- Configuration configuration = new Configuration();
- configuration.addAll(jobGraph.getJobConfiguration());
- configuration.addAll(this.config);
-
- configuration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, host);
- configuration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, port);
-
- Client client;
- try {
- client = new Client(configuration);
- client.setPrintStatusDuringExecution(getConfig().isSysoutLoggingEnabled());
- }
- catch (Exception e) {
- throw new ProgramInvocationException("Cannot establish connection to JobManager: " + e.getMessage(), e);
- }
-
- try {
- return client.runBlocking(jobGraph, usercodeClassLoader);
- }
- catch (ProgramInvocationException e) {
- throw e;
- }
- catch (Exception e) {
- String term = e.getMessage() == null ? "." : (": " + e.getMessage());
- throw new ProgramInvocationException("The program execution failed" + term, e);
- }
- finally {
- client.shutdown();
- }
- }
-
- @Override
- public String toString() {
- return "Remote Environment (" + this.host + ":" + this.port + " - parallelism = "
- + (getParallelism() == -1 ? "default" : getParallelism()) + ")";
- }
-
- /**
- * Gets the hostname of the master (JobManager), where the
- * program will be executed.
- *
- * @return The hostname of the master
- */
- public String getHost() {
- return host;
- }
-
- /**
- * Gets the port of the master (JobManager), where the
- * program will be executed.
- *
- * @return The port of the master
- */
- public int getPort() {
- return port;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
deleted file mode 100644
index b2a5435..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.environment;
-
-import java.net.URL;
-import java.util.List;
-
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.common.JobSubmissionResult;
-import org.apache.flink.client.program.Client;
-import org.apache.flink.client.program.JobWithJars;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.GlobalConfiguration;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class StreamContextEnvironment extends StreamExecutionEnvironment {
-
- private static final Logger LOG = LoggerFactory.getLogger(StreamContextEnvironment.class);
-
- private final List<URL> jars;
-
- private final List<URL> classpaths;
-
- private final Client client;
-
- private final ClassLoader userCodeClassLoader;
-
- private final boolean wait;
-
- protected StreamContextEnvironment(Client client, List<URL> jars, List<URL> classpaths, int parallelism,
- boolean wait) {
- this.client = client;
- this.jars = jars;
- this.classpaths = classpaths;
- this.wait = wait;
-
- this.userCodeClassLoader = JobWithJars.buildUserCodeClassLoader(jars, classpaths,
- getClass().getClassLoader());
-
- if (parallelism > 0) {
- setParallelism(parallelism);
- }
- else {
- // determine parallelism
- setParallelism(GlobalConfiguration.getInteger(
- ConfigConstants.DEFAULT_PARALLELISM_KEY,
- ConfigConstants.DEFAULT_PARALLELISM));
- }
- }
-
- @Override
- public JobExecutionResult execute() throws Exception {
- return execute(null);
- }
-
- @Override
- public JobExecutionResult execute(String jobName) throws Exception {
-
- JobGraph jobGraph;
- if (jobName == null) {
- jobGraph = this.getStreamGraph().getJobGraph();
- } else {
- jobGraph = this.getStreamGraph().getJobGraph(jobName);
- }
-
- transformations.clear();
-
- // attach all necessary jar files to the JobGraph
- for (URL file : jars) {
- jobGraph.addJar(new Path(file.toURI()));
- }
-
- jobGraph.setClasspaths(classpaths);
-
- // execute the programs
- if (wait) {
- return client.runBlocking(jobGraph, userCodeClassLoader);
- } else {
- JobSubmissionResult result = client.runDetached(jobGraph, userCodeClassLoader);
- LOG.warn("Job was executed in detached mode, the results will be available on completion.");
- return JobExecutionResult.fromJobSubmissionResult(result);
- }
- }
-}