You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/09 12:16:43 UTC
[07/10] flink git commit: [FLINK-2780] Remove Old Windowing Logic and
API
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/StreamWindowSerializer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/StreamWindowSerializer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/StreamWindowSerializer.java
deleted file mode 100644
index 529850f..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/StreamWindowSerializer.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUStreamRecord<?>WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing;
-
-import java.io.IOException;
-
-import com.google.common.base.Preconditions;
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.base.BooleanSerializer;
-import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-
-public final class StreamWindowSerializer<T> extends TypeSerializer<StreamWindow<T>> {
-
- private static final long serialVersionUID = 1L;
-
- private final TypeSerializer<T> typeSerializer;
- TypeSerializer<Integer> intSerializer = IntSerializer.INSTANCE;
- TypeSerializer<Boolean> boolSerializer = BooleanSerializer.INSTANCE;
-
- public StreamWindowSerializer(TypeInformation<T> typeInfo, ExecutionConfig conf) {
- Preconditions.checkNotNull(typeInfo);
-
- this.typeSerializer = typeInfo.createSerializer(conf);
- }
-
- public TypeSerializer<T> getObjectSerializer() {
- return typeSerializer;
- }
-
- @Override
- public boolean isImmutableType() {
- return false;
- }
-
- @Override
- public StreamWindow<T> createInstance() {
- return new StreamWindow<T>(0, 0);
- }
-
- @Override
- public StreamWindow<T> copy(StreamWindow<T> from) {
- return new StreamWindow<T>(from, typeSerializer);
- }
-
- @Override
- public StreamWindow<T> copy(StreamWindow<T> from, StreamWindow<T> reuse) {
- reuse.clear();
- reuse.windowID = from.windowID;
- reuse.numberOfParts = from.numberOfParts;
- for (T element : from) {
- reuse.add(typeSerializer.copy(element));
- }
- return reuse;
- }
-
- @Override
- public int getLength() {
- return -1;
- }
-
- @Override
- public void serialize(StreamWindow<T> window, DataOutputView target) throws IOException {
-
- intSerializer.serialize(window.windowID, target);
- intSerializer.serialize(window.numberOfParts, target);
-
- intSerializer.serialize(window.size(), target);
-
- for (T element : window) {
- typeSerializer.serialize(element, target);
- }
- }
-
- @Override
- public StreamWindow<T> deserialize(DataInputView source) throws IOException {
- return deserialize(createInstance(), source);
- }
-
- @Override
- public StreamWindow<T> deserialize(StreamWindow<T> reuse, DataInputView source)
- throws IOException {
-
- StreamWindow<T> window = reuse;
- window.clear();
-
- window.windowID = intSerializer.deserialize(source);
- window.numberOfParts = intSerializer.deserialize(source);
-
- int size = intSerializer.deserialize(source);
-
- for (int i = 0; i < size; i++) {
- window.add(typeSerializer.deserialize(source));
- }
-
- return window;
- }
-
- @Override
- public void copy(DataInputView source, DataOutputView target) throws IOException {
- serialize(deserialize(source), target);
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj instanceof StreamWindowSerializer) {
- StreamWindowSerializer<?> other = (StreamWindowSerializer<?>) obj;
-
- return other.canEqual(this) && typeSerializer.equals(other.typeSerializer);
- } else {
- return false;
- }
- }
-
- @Override
- public boolean canEqual(Object obj) {
- return obj instanceof StreamWindowSerializer;
- }
-
- @Override
- public int hashCode() {
- return typeSerializer.hashCode();
- }
-
- @Override
- public TypeSerializer<StreamWindow<T>> duplicate() {
- return this;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/StreamWindowTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/StreamWindowTypeInfo.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/StreamWindowTypeInfo.java
deleted file mode 100644
index 2c0a999..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/StreamWindowTypeInfo.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUStreamRecord<?>WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing;
-
-import com.google.common.base.Preconditions;
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-
-public class StreamWindowTypeInfo<T> extends TypeInformation<StreamWindow<T>> {
-
- private static final long serialVersionUID = 1L;
-
- final TypeInformation<T> innerType;
-
- public StreamWindowTypeInfo(TypeInformation<T> innerType) {
- this.innerType = Preconditions.checkNotNull(innerType);
- }
-
- public TypeInformation<T> getInnerType() {
- return innerType;
- }
-
- @Override
- public boolean isBasicType() {
- return innerType.isBasicType();
- }
-
- @Override
- public boolean isTupleType() {
- return innerType.isTupleType();
- }
-
- @Override
- public int getArity() {
- return innerType.getArity();
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public Class<StreamWindow<T>> getTypeClass() {
- return (Class<StreamWindow<T>>)(Object)StreamWindow.class;
- }
-
- @Override
- public boolean isKeyType() {
- return innerType.isKeyType();
- }
-
- @Override
- public TypeSerializer<StreamWindow<T>> createSerializer(ExecutionConfig conf) {
- return new StreamWindowSerializer<T>(innerType, conf);
- }
-
- @Override
- public String toString() {
- return getClass().getSimpleName() + "<" + innerType + ">";
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj instanceof StreamWindowTypeInfo) {
- @SuppressWarnings("unchecked")
- StreamWindowTypeInfo<T> streamWindowTypeInfo = (StreamWindowTypeInfo<T>) obj;
-
- return streamWindowTypeInfo.canEqual(this) &&
- innerType.equals(streamWindowTypeInfo.innerType);
- } else {
- return false;
- }
- }
-
- @Override
- public int hashCode() {
- return innerType.hashCode();
- }
-
- @Override
- public boolean canEqual(Object obj) {
- return obj instanceof StreamWindowTypeInfo;
- }
-
- @Override
- public int getTotalFields() {
- return innerType.getTotalFields();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/WindowEvent.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/WindowEvent.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/WindowEvent.java
deleted file mode 100644
index 359dfb3..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/WindowEvent.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-
-/**
- * Type representing events sent to the window buffer. The first field should
- * contain the window element the second field encodes triggers and evictions if
- * the second field is greater than 0 it represents an eviction if it equals -1
- * it represents a trigger.
- */
-public class WindowEvent<T> extends Tuple2<T, Integer> {
- private static final long serialVersionUID = 1L;
-
- public boolean isElement() {
- return f1 == 0;
- }
-
- public boolean isEviction() {
- return f1 > 0;
- }
-
- public boolean isTrigger() {
- return f1 == -1;
- }
-
- public Integer getEviction() {
- return f1;
- }
-
- public T getElement() {
- return f0;
- }
-
- public WindowEvent<T> setElement(T element) {
- f0 = element;
- f1 = 0;
- return this;
- }
-
- public WindowEvent<T> setTrigger() {
- f1 = -1;
- return this;
- }
-
- public WindowEvent<T> setEviction(Integer n) {
- if (n > 0) {
- f1 = n;
- return this;
- } else {
- throw new RuntimeException("Must evict at least 1");
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/WindowUtils.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/WindowUtils.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/WindowUtils.java
deleted file mode 100644
index a899b74..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/WindowUtils.java
+++ /dev/null
@@ -1,203 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing;
-
-import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
-import org.apache.flink.streaming.api.windowing.policy.CountEvictionPolicy;
-import org.apache.flink.streaming.api.windowing.policy.CountTriggerPolicy;
-import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
-import org.apache.flink.streaming.api.windowing.policy.KeepAllEvictionPolicy;
-import org.apache.flink.streaming.api.windowing.policy.TimeEvictionPolicy;
-import org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy;
-import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
-import org.apache.flink.streaming.api.windowing.policy.TumblingEvictionPolicy;
-
-/**
- * Utility class that contains helper methods to work with stream windowing.
- */
-public class WindowUtils {
-
- public enum WindowTransformation {
- REDUCEWINDOW, MAPWINDOW, FOLDWINDOW, NONE;
- private Function UDF;
-
- public WindowTransformation with(Function UDF) {
- this.UDF = UDF;
- return this;
- }
-
- public Function getUDF() {
- return UDF;
- }
- }
-
- public static boolean isParallelPolicy(TriggerPolicy<?> trigger, EvictionPolicy<?> eviction,
- int parallelism) {
- return ((eviction instanceof CountEvictionPolicy && (trigger instanceof CountTriggerPolicy || trigger instanceof TimeTriggerPolicy))
- || (eviction instanceof TumblingEvictionPolicy && trigger instanceof CountTriggerPolicy) || (WindowUtils
- .isTimeOnly(trigger, eviction) && parallelism > 1));
- }
-
- public static boolean isSlidingTimePolicy(TriggerPolicy<?> trigger, EvictionPolicy<?> eviction) {
- if (isTimeOnly(trigger, eviction)) {
- long slide = getSlideSize(trigger);
- long window = getWindowSize(eviction);
-
- return slide < window
- && getTimeStampWrapper(trigger).equals(getTimeStampWrapper(eviction));
- } else {
- return false;
- }
- }
-
- public static boolean isSlidingCountPolicy(TriggerPolicy<?> trigger, EvictionPolicy<?> eviction) {
- if (isCountOnly(trigger, eviction)) {
- long slide = getSlideSize(trigger);
- long window = getWindowSize(eviction);
-
- return slide < window
- && ((CountTriggerPolicy<?>) trigger).getStart() == ((CountEvictionPolicy<?>) eviction)
- .getStart()
- && ((CountEvictionPolicy<?>) eviction).getDeleteOnEviction() == 1;
- } else {
- return false;
- }
- }
-
- public static <X> TimestampWrapper<X> getTimeStampWrapper(TriggerPolicy<X> trigger) {
- if (trigger instanceof TimeTriggerPolicy) {
- return ((TimeTriggerPolicy<X>) trigger).getTimeStampWrapper();
- } else {
- throw new IllegalArgumentException(
- "Timestamp wrapper can only be accessed for time policies");
- }
- }
-
- public static <X> TimestampWrapper<X> getTimeStampWrapper(EvictionPolicy<X> eviction) {
- if (eviction instanceof EvictionPolicy) {
- return ((TimeEvictionPolicy<X>) eviction).getTimeStampWrapper();
- } else {
- throw new IllegalArgumentException(
- "Timestamp wrapper can only be accessed for time policies");
- }
- }
-
- public static long getSlideSize(TriggerPolicy<?> trigger) {
- if (trigger instanceof TimeTriggerPolicy) {
- return ((TimeTriggerPolicy<?>) trigger).getSlideSize();
- } else if (trigger instanceof CountTriggerPolicy) {
- return ((CountTriggerPolicy<?>) trigger).getSlideSize();
- } else {
- throw new IllegalArgumentException(
- "Slide size can only be accessed for time or count policies");
- }
- }
-
- public static long getWindowSize(EvictionPolicy<?> eviction) {
- if (eviction instanceof TimeEvictionPolicy) {
- return ((TimeEvictionPolicy<?>) eviction).getWindowSize();
- } else if (eviction instanceof CountEvictionPolicy) {
- return ((CountEvictionPolicy<?>) eviction).getWindowSize();
- } else {
- throw new IllegalArgumentException(
- "Window size can only be accessed for time or count policies");
- }
- }
-
- public static boolean isTumblingPolicy(TriggerPolicy<?> trigger, EvictionPolicy<?> eviction) {
- if (eviction instanceof TumblingEvictionPolicy || eviction instanceof KeepAllEvictionPolicy) {
- return true;
- } else if (isTimeOnly(trigger, eviction)) {
- long slide = getSlideSize(trigger);
- long window = getWindowSize(eviction);
-
- return slide == window
- && getTimeStampWrapper(trigger).equals(getTimeStampWrapper(eviction));
- } else if (isCountOnly(trigger, eviction)) {
- long slide = getSlideSize(trigger);
- long window = getWindowSize(eviction);
-
- return slide == window
- && ((CountTriggerPolicy<?>) trigger).getStart() == ((CountEvictionPolicy<?>) eviction)
- .getStart()
- && ((CountEvictionPolicy<?>) eviction).getDeleteOnEviction() == 1;
- } else {
- return false;
- }
- }
-
- public static boolean isTimeOnly(TriggerPolicy<?> trigger, EvictionPolicy<?> eviction) {
- return trigger instanceof TimeTriggerPolicy
- && (eviction instanceof TimeEvictionPolicy || eviction instanceof KeepAllEvictionPolicy);
- }
-
- public static boolean isCountOnly(TriggerPolicy<?> trigger, EvictionPolicy<?> eviction) {
- return trigger instanceof CountTriggerPolicy && eviction instanceof CountEvictionPolicy;
- }
-
- public static boolean isSystemTimeTrigger(TriggerPolicy<?> trigger) {
- return trigger instanceof TimeTriggerPolicy
- && ((TimeTriggerPolicy<?>) trigger).timestampWrapper.isDefaultTimestamp();
- }
-
- public static class WindowKey<R> implements KeySelector<StreamWindow<R>, Integer> {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public Integer getKey(StreamWindow<R> value) throws Exception {
- return value.windowID;
- }
-
- }
-
- public static boolean isJumpingCountPolicy(TriggerPolicy<?> trigger, EvictionPolicy<?> eviction) {
- if (isCountOnly(trigger, eviction)) {
- long slide = getSlideSize(trigger);
- long window = getWindowSize(eviction);
-
- return slide > window
- && ((CountTriggerPolicy<?>) trigger).getStart() == ((CountEvictionPolicy<?>) eviction)
- .getStart()
- && ((CountEvictionPolicy<?>) eviction).getDeleteOnEviction() == 1;
- } else {
- return false;
- }
- }
-
- public static boolean isJumpingTimePolicy(TriggerPolicy<?> trigger, EvictionPolicy<?> eviction) {
- if (isTimeOnly(trigger, eviction)) {
- long slide = getSlideSize(trigger);
- long window = getWindowSize(eviction);
-
- return slide > window
- && getTimeStampWrapper(trigger).equals(getTimeStampWrapper(eviction));
- } else {
- return false;
- }
- }
-
- /**
- * Private constructor to prevent instantiation.
- */
- private WindowUtils() {
- throw new RuntimeException();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/TimeEvictor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/TimeEvictor.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/TimeEvictor.java
index 5004c42..5776d8d 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/TimeEvictor.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/TimeEvictor.java
@@ -18,6 +18,7 @@
package org.apache.flink.streaming.api.windowing.evictors;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Iterables;
import org.apache.flink.streaming.api.windowing.time.AbstractTime;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -40,7 +41,7 @@ public class TimeEvictor<W extends Window> implements Evictor<Object, W> {
@Override
public int evict(Iterable<StreamRecord<Object>> elements, int size, W window) {
int toEvict = 0;
- long currentTime = System.currentTimeMillis();
+ long currentTime = Iterables.getLast(elements).getTimestamp();
long evictCutoff = currentTime - windowSize;
for (StreamRecord<Object> record: elements) {
if (record.getTimestamp() > evictCutoff) {
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/extractor/ArrayFromTuple.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/extractor/ArrayFromTuple.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/extractor/ArrayFromTuple.java
deleted file mode 100644
index ee878ac..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/extractor/ArrayFromTuple.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.extractor;
-
-import org.apache.flink.api.java.tuple.Tuple;
-
-/**
- * Converts a Tuple to an Object-Array. The field which should be included in
- * the array can selected and reordered as needed.
- */
-public class ArrayFromTuple implements Extractor<Tuple, Object[]> {
-
- /**
- * Auto generated version id
- */
- private static final long serialVersionUID = -6076121226427616818L;
- int[] order = null;
-
- /**
- * Using this constructor the extractor will convert the whole tuple (all
- * fields in the original order) to an array.
- */
- public ArrayFromTuple() {
- // noting to do
- }
-
- /**
- * Using this constructor the extractor will combine the fields as specified
- * in the indexes parameter in an object array.
- *
- * @param indexes
- * the field ids (enumerated from 0)
- */
- public ArrayFromTuple(int... indexes) {
- this.order = indexes;
- }
-
- @Override
- public Object[] extract(Tuple in) {
- Object[] output;
-
- if (order == null) {
- // copy the whole tuple
- output = new Object[in.getArity()];
- for (int i = 0; i < in.getArity(); i++) {
- output[i] = in.getField(i);
- }
- } else {
- // copy user specified order
- output = new Object[order.length];
- for (int i = 0; i < order.length; i++) {
- output[i] = in.getField(order[i]);
- }
- }
-
- return output;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/extractor/ConcatenatedExtract.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/extractor/ConcatenatedExtract.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/extractor/ConcatenatedExtract.java
deleted file mode 100644
index a220abe..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/extractor/ConcatenatedExtract.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.extractor;
-
-/**
- * Combines two extractors which will be executed one after each other.
- *
- * @param <FROM>
- * The input type of the first extractor.
- * @param <OVER>
- * The output type of the first and the input type of the second
- * extractor.
- * @param <TO>
- * The output type of the second extractor and the output type of the
- * over all extraction.
- */
-public class ConcatenatedExtract<FROM, OVER, TO> implements Extractor<FROM, TO> {
-
- /**
- * auto-generated id
- */
- private static final long serialVersionUID = -7807197760725651752L;
-
- private Extractor<FROM, OVER> e1;
- private Extractor<OVER, TO> e2;
-
- /**
- * Combines two extractors which will be executed one after each other.
- *
- * @param e1
- * First extractor: This extractor gets applied to the input data
- * first. Its output as then passed as input to the second
- * extractor.
- * @param e2
- * Second extractor: This extractor gets the output of the first
- * extractor as input. Its output is then the result of the over
- * all extraction.
- */
- public ConcatenatedExtract(Extractor<FROM, OVER> e1, Extractor<OVER, TO> e2) {
- this.e1 = e1;
- this.e2 = e2;
- }
-
- @Override
- public TO extract(FROM in) {
- return e2.extract(e1.extract(in));
- }
-
- public <OUT> ConcatenatedExtract<FROM, TO, OUT> add(Extractor<TO, OUT> e3) {
- return new ConcatenatedExtract<FROM, TO, OUT>(this, e3);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/extractor/Extractor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/extractor/Extractor.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/extractor/Extractor.java
deleted file mode 100644
index b103ca3..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/extractor/Extractor.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.extractor;
-
-import java.io.Serializable;
-
-/**
- * Extractors allow to extract/convert one type to another. They are mostly used
- * to extract some fields out of a more complex structure (Tuple/Array) to run
- * further calculation on the extraction result.
- *
- * @param <FROM>
- * The input data type.
- * @param <TO>
- * The output data type.
- */
-public interface Extractor<FROM, TO> extends Serializable {
-
- /**
- * Extracts/Converts the given input to an object of the output type
- *
- * @param in
- * the input data
- * @return the extracted/converted data
- */
- public TO extract(FROM in);
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/extractor/FieldFromArray.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/extractor/FieldFromArray.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/extractor/FieldFromArray.java
deleted file mode 100644
index 0568276..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/extractor/FieldFromArray.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.extractor;
-
-import java.lang.reflect.Array;
-
-/**
- * Extracts a single field out of an array.
- *
- * @param <OUT>
- * The type of the extracted field.
- */
-public class FieldFromArray<OUT> implements Extractor<Object, OUT> {
-
- /**
- * Auto-gernated version id
- */
- private static final long serialVersionUID = -5161386546695574359L;
- private int fieldId = 0;
-
- /**
- * Extracts the first field (id 0) from the array
- */
- public FieldFromArray() {
- // noting to do => will use default 0
- }
-
- /**
- * Extracts the field with the given id from the array.
- *
- * @param fieldId
- * The id of the field which will be extracted from the array.
- */
- public FieldFromArray(int fieldId) {
- this.fieldId = fieldId;
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public OUT extract(Object in) {
- return (OUT) Array.get(in, fieldId);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/extractor/FieldFromTuple.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/extractor/FieldFromTuple.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/extractor/FieldFromTuple.java
deleted file mode 100644
index 07b38f5..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/extractor/FieldFromTuple.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.extractor;
-
-import org.apache.flink.api.java.tuple.Tuple;
-
-/**
- * Extracts a single field out of a tuple.
- *
- * @param <OUT>
- * The type of the extracted field.
- */
-public class FieldFromTuple<OUT> implements Extractor<Tuple, OUT> {
-
- /**
- * Auto-gernated version id
- */
- private static final long serialVersionUID = -5161386546695574359L;
- private int fieldId = 0;
-
- /**
- * Extracts the first field (id 0) from the tuple
- */
- public FieldFromTuple() {
- // noting to do => will use default 0
- }
-
- /**
- * Extracts the field with the given id from the tuple.
- *
- * @param fieldId
- * The id of the field which will be extracted from the tuple.
- */
- public FieldFromTuple(int fieldId) {
- this.fieldId = fieldId;
- }
-
- @Override
- public OUT extract(Tuple in) {
- return in.getField(fieldId);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/extractor/FieldsFromArray.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/extractor/FieldsFromArray.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/extractor/FieldsFromArray.java
deleted file mode 100644
index 4e98689..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/extractor/FieldsFromArray.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.extractor;
-
-import java.lang.reflect.Array;
-
-/**
- * Extracts multiple fields from an array and puts them into a new array of the
- * specified type.
- *
- * @param <OUT>
- * The type of the output array. If out is set to String, the output
- * of the extractor will be a String[]. If it is set to String[] the
- * output will be String[][].
- */
-public class FieldsFromArray<OUT> implements Extractor<Object, OUT[]> {
-
- /**
- * Auto-generated version id
- */
- private static final long serialVersionUID = 8075055384516397670L;
- private int[] order;
- private Class<OUT> clazz;
-
- /**
- * Extracts multiple fields from an array and puts them in the given order
- * into a new array of the specified type.
- *
- * @param clazz
- * the Class object representing the component type of the new
- * array
- * @param indexes
- * The indexes of the fields to be extracted. Any order is
- * possible, but not more than 255 fields due to limitations in
- * {@link Array#newInstance(Class, int...)}.
- */
- public FieldsFromArray(Class<OUT> clazz, int... indexes) {
- this.order = indexes;
- this.clazz = clazz;
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public OUT[] extract(Object in) {
- OUT[] output = (OUT[]) Array.newInstance(clazz, order.length);
- for (int i = 0; i < order.length; i++) {
- output[i] = (OUT) Array.get(in, this.order[i]);
- }
- return output;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/extractor/FieldsFromTuple.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/extractor/FieldsFromTuple.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/extractor/FieldsFromTuple.java
deleted file mode 100644
index 1bfc461..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/extractor/FieldsFromTuple.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.extractor;
-
-import org.apache.flink.api.java.tuple.Tuple;
-
-/**
- * Extracts one or more fields of the type Double from a tuple and puts them
- * into a new double[]
- */
-public class FieldsFromTuple implements Extractor<Tuple, double[]> {
-
- /**
- * auto generated version id
- */
- private static final long serialVersionUID = -2554079091050273761L;
- int[] indexes;
-
- /**
- * Extracts one or more fields of the the type Double from a tuple and puts
- * them into a new double[] (in the specified order).
- *
- * @param indexes
- * The indexes of the fields to be extracted.
- */
- public FieldsFromTuple(int... indexes) {
- this.indexes = indexes;
- }
-
- @Override
- public double[] extract(Tuple in) {
- double[] out = new double[indexes.length];
- for (int i = 0; i < indexes.length; i++) {
- out[i] = (Double) in.getField(indexes[i]);
- }
- return out;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Count.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Count.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Count.java
deleted file mode 100644
index 3266a24..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Count.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.helper;
-
-import org.apache.flink.streaming.api.windowing.policy.CountEvictionPolicy;
-import org.apache.flink.streaming.api.windowing.policy.CountTriggerPolicy;
-import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
-import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
-
-/**
- * Represents a count based trigger or eviction policy. Use the
- * {@link Count#of(int)} to get an instance.
- */
-@SuppressWarnings("rawtypes")
-public class Count extends WindowingHelper {
-
- private int count;
- private int deleteOnEviction = 1;
- private int startValue = CountTriggerPolicy.DEFAULT_START_VALUE;
-
- /**
- * Specifies on which element a trigger or an eviction should happen (based
- * on the count of the elements).
- *
- * This constructor does exactly the same as {@link Count#of(int)}.
- *
- * @param count
- * the number of elements to count before trigger/evict
- */
- public Count(int count) {
- this.count = count;
- }
-
- @Override
- public EvictionPolicy<?> toEvict() {
- return new CountEvictionPolicy(count, deleteOnEviction);
- }
-
- @Override
- public TriggerPolicy<?> toTrigger() {
- return new CountTriggerPolicy(count, startValue);
- }
-
- /**
- * Sets the number of elements deleted at each eviction (i.e when the number
- * elements exceeds the window size). By default the elements get deleted
- * one by one (deleteOnEvition = 1)
- *
- * @param deleteOnEviction
- * The number of elements deleted at each evition
- * @return Helper representing the count based policy
- *
- */
- public Count withDelete(int deleteOnEviction) {
- this.deleteOnEviction = deleteOnEviction;
- return this;
- }
-
- /**
- * Sets the initial value of the counter. 0 by default
- *
- * @param startValue
- * Starting value of the window counter
- * @return Helper representing the count based policy
- *
- */
- public Count startingAt(int startValue) {
- this.startValue = startValue;
- return this;
- }
-
- /**
- * Specifies a count based eviction (window size) or trigger policy (slide
- * size). For eviction 'count' defines the number of elements in each
- * window. For trigger 'count' defines how often do we call the user
- * function in terms of number of elements received.
- *
- * @param count
- * the number of elements to count before trigger/evict
- * @return Helper representing the count based policy
- */
- public static Count of(int count) {
- return new Count(count);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Delta.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Delta.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Delta.java
deleted file mode 100644
index 31063ab..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Delta.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.helper;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.streaming.api.functions.windowing.delta.DeltaFunction;
-import org.apache.flink.streaming.api.windowing.policy.DeltaPolicy;
-import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
-import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
-
-/**
- * This helper represents a trigger or eviction policy based on a
- * {@link DeltaFunction}.
- *
- * @param <DATA>
- * the data type handled by the delta function represented by this
- * helper.
- */
-public class Delta<DATA> extends WindowingHelper<DATA> {
-
- private DeltaFunction<DATA> deltaFunction;
- private DATA initVal;
- private double threshold;
- private TypeSerializer<DATA> typeSerializer;
-
- /**
- * Creates a delta helper representing a delta count or eviction policy
- * @param deltaFunction
- * The delta function which should be used to calculate the delta
- * points.
- * @param initVal
- * The initial value which will be used to calculate the first
- * delta.
- * @param threshold
- * The threshold used by the delta function.
- */
- public Delta(DeltaFunction<DATA> deltaFunction, DATA initVal, double threshold) {
- this.deltaFunction = deltaFunction;
- this.initVal = initVal;
- this.threshold = threshold;
- }
-
- @Override
- public EvictionPolicy<DATA> toEvict() {
- instantiateTypeSerializer();
- return new DeltaPolicy<DATA>(deltaFunction, initVal, threshold, typeSerializer);
- }
-
- @Override
- public TriggerPolicy<DATA> toTrigger() {
- instantiateTypeSerializer();
- return new DeltaPolicy<DATA>(deltaFunction, initVal, threshold, typeSerializer);
- }
-
- /**
- * Creates a delta helper representing a delta trigger or eviction policy.
- * </br></br> This policy calculates a delta between the data point which
- * triggered last and the currently arrived data point. It triggers if the
- * delta is higher than a specified threshold. </br></br> In case it gets
- * used for eviction, this policy starts from the first element of the
- * buffer and removes all elements from the buffer which have a higher delta
- * then the threshold. As soon as there is an element with a lower delta,
- * the eviction stops.
- *
- * @param deltaFunction
- * The delta function which should be used to calculate the delta
- * points.
- * @param initVal
- * The initial value which will be used to calculate the first
- * delta.
- * @param threshold
- * The threshold used by the delta function.
- * @return Helper representing a delta trigger or eviction policy
- */
- public static <DATA> Delta<DATA> of(double threshold, DeltaFunction<DATA> deltaFunction,
- DATA initVal) {
- return new Delta<DATA>(deltaFunction, initVal, threshold);
- }
-
- @SuppressWarnings("unchecked")
- private void instantiateTypeSerializer(){
- if (executionConfig == null){
- throw new UnsupportedOperationException("ExecutionConfig has to be set to instantiate TypeSerializer.");
- }
- TypeInformation typeInformation = TypeExtractor.getForObject(initVal);
- typeSerializer = typeInformation.createSerializer(executionConfig);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/FullStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/FullStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/FullStream.java
deleted file mode 100644
index 7773d9a..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/FullStream.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.helper;
-
-import java.io.Serializable;
-
-import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
-import org.apache.flink.streaming.api.windowing.policy.KeepAllEvictionPolicy;
-import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
-
-/**
- * Window that represents the full stream history. Can be used only as eviction
- * policy and only with operations that support pre-aggregator such as reduce or
- * aggregations.
- */
-public class FullStream<DATA> extends WindowingHelper<DATA> implements Serializable {
-
- private static final long serialVersionUID = 1L;
-
- private FullStream() {
- }
-
- @Override
- public EvictionPolicy<DATA> toEvict() {
- return new KeepAllEvictionPolicy<DATA>();
- }
-
- @Override
- public TriggerPolicy<DATA> toTrigger() {
- throw new RuntimeException(
- "Full stream policy can be only used as eviction. Use .every(..) after the window call.");
- }
-
- /**
- * Returns a helper representing an eviction that keeps all previous record
- * history.
- */
- public static <R> FullStream<R> window() {
- return new FullStream<R>();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/SystemTimestamp.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/SystemTimestamp.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/SystemTimestamp.java
deleted file mode 100644
index 8581ac5..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/SystemTimestamp.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.helper;
-
-/**
- * {@link Timestamp} implementation to be used when system time is needed to
- * determine windows
- */
-public class SystemTimestamp<T> implements Timestamp<T> {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public long getTimestamp(T value) {
- return System.currentTimeMillis();
- }
-
- public static <R> TimestampWrapper<R> getWrapper() {
- return new TimestampWrapper<R>(new SystemTimestamp<R>(), System.currentTimeMillis());
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Time.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Time.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Time.java
deleted file mode 100644
index 022f975..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Time.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.helper;
-
-import java.util.concurrent.TimeUnit;
-
-import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
-import org.apache.flink.streaming.api.windowing.policy.TimeEvictionPolicy;
-import org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy;
-import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
-
-/**
- * This helper represents a time based count or eviction policy. By default the
- * time is measured with {@link System#currentTimeMillis()} in
- * {@link SystemTimestamp}.
- *
- * @param <DATA>
- * The data type which is handled by the time stamp used in the
- * policy represented by this helper
- */
-public class Time<DATA> extends WindowingHelper<DATA> {
-
- protected long length;
- protected TimeUnit granularity;
- protected TimestampWrapper<DATA> timestampWrapper;
- protected long delay;
-
- /**
- * Creates a helper representing a trigger which triggers every given
- * length or an eviction which evicts all elements older than length.
- *
- * @param length
- * The number of time units
- * @param timeUnit
- * The unit of time such as minute oder millisecond. Note that
- * the smallest possible granularity is milliseconds. Any smaller
- * time unit might cause an error at runtime due to conversion
- * problems.
- * @param timestamp
- * The user defined timestamp that will be used to extract time
- * information from the incoming elements
- * @param startTime
- * The startTime of the stream for computing the first window
- */
- private Time(long length, TimeUnit timeUnit, Timestamp<DATA> timestamp, long startTime) {
- this(length, timeUnit, new TimestampWrapper<DATA>(timestamp, startTime));
- }
-
- /**
- * Creates a helper representing a trigger which triggers every given
- * length or an eviction which evicts all elements older than length.
- *
- * @param length
- * The number of time units
- * @param timeUnit
- * The unit of time such as minute oder millisecond. Note that
- * the smallest possible granularity is milliseconds. Any smaller
- * time unit might cause an error at runtime due to conversion
- * problems.
- * @param timestampWrapper
- * The user defined {@link TimestampWrapper} that will be used to
- * extract time information from the incoming elements
- */
- private Time(long length, TimeUnit timeUnit, TimestampWrapper<DATA> timestampWrapper) {
- this.length = length;
- this.granularity = timeUnit;
- this.timestampWrapper = timestampWrapper;
- }
-
- @Override
- public EvictionPolicy<DATA> toEvict() {
- return new TimeEvictionPolicy<DATA>(granularityInMillis(), timestampWrapper);
- }
-
- @Override
- public TriggerPolicy<DATA> toTrigger() {
- return new TimeTriggerPolicy<DATA>(granularityInMillis(), timestampWrapper);
- }
-
- /**
- * Creates a helper representing a time trigger which triggers every given
- * length (slide size) or a time eviction which evicts all elements older
- * than length (window size) using System time.
- *
- * @param length
- * The number of time units
- * @param timeUnit
- * The unit of time such as minute oder millisecond. Note that
- * the smallest possible granularity is milliseconds. Any smaller
- * time unit might cause an error at runtime due to conversion
- * problems.
- * @return Helper representing the time based trigger and eviction policy
- */
- @SuppressWarnings("unchecked")
- public static <DATA> Time<DATA> of(long length, TimeUnit timeUnit) {
- return new Time<DATA>(length, timeUnit,
- (TimestampWrapper<DATA>) SystemTimestamp.getWrapper());
- }
-
- /**
- * Creates a helper representing a time trigger which triggers every given
- * length (slide size) or a time eviction which evicts all elements older
- * than length (window size) using a user defined timestamp extractor.
- *
- * @param length
- * The number of time units
- * @param timestamp
- * The user defined timestamp that will be used to extract time
- * information from the incoming elements
- * @param startTime
- * The startTime used to compute the first window
- * @return Helper representing the time based trigger and eviction policy
- */
- public static <DATA> Time<DATA> of(long length, Timestamp<DATA> timestamp, long startTime) {
- return new Time<DATA>(length, null, timestamp, startTime);
- }
-
- /**
- * Creates a helper representing a time trigger which triggers every given
- * length (slide size) or a time eviction which evicts all elements older
- * than length (window size) using a user defined timestamp extractor. By
- * default the start time is set to 0.
- *
- * @param length
- * The number of time units
- * @param timestamp
- * The user defined timestamp that will be used to extract time
- * information from the incoming elements
- * @return Helper representing the time based trigger and eviction policy
- */
- public static <DATA> Time<DATA> of(long length, Timestamp<DATA> timestamp) {
- return of(length, timestamp, 0);
- }
-
- protected long granularityInMillis() {
- return granularity == null ? length : granularity.toMillis(length);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Timestamp.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Timestamp.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Timestamp.java
deleted file mode 100644
index fea6020..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Timestamp.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.helper;
-
-import java.io.Serializable;
-
-/**
- * Interface for getting a timestamp from a custom value. Used in window
- * reduces. In order to work properly, the timestamps must be non-decreasing.
- *
- * @param <T>
- * Type of the value to create the timestamp from.
- */
-public interface Timestamp<T> extends Serializable {
-
- /**
- * Values
- *
- * @param value
- * The value to create the timestamp from
- * @return The timestamp
- */
- public long getTimestamp(T value);
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/TimestampWrapper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/TimestampWrapper.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/TimestampWrapper.java
deleted file mode 100644
index c2ec7c2..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/TimestampWrapper.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.helper;
-
-import java.io.Serializable;
-
-public class TimestampWrapper<T> implements Serializable {
-
- private static final long serialVersionUID = 1L;
- private long startTime;
- private Timestamp<T> timestamp;
-
- public TimestampWrapper(Timestamp<T> timeStamp, long startTime) {
- this.timestamp = timeStamp;
- this.startTime = startTime;
- }
-
- public long getTimestamp(T in) {
- return timestamp.getTimestamp(in);
- }
-
- public long getStartTime() {
- return startTime;
- }
-
- public boolean isDefaultTimestamp() {
- return timestamp instanceof SystemTimestamp;
- }
-
- @Override
- public boolean equals(Object other) {
- if (other == null || !(other instanceof TimestampWrapper)) {
- return false;
- } else {
- try {
- @SuppressWarnings("unchecked")
- TimestampWrapper<T> otherTSW = (TimestampWrapper<T>) other;
- if (timestamp instanceof SystemTimestamp
- && otherTSW.timestamp instanceof SystemTimestamp) {
- return true;
- } else {
- return startTime == otherTSW.startTime
- && timestamp.getClass() == otherTSW.timestamp.getClass();
- }
- } catch (ClassCastException e) {
- return false;
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/WindowingHelper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/WindowingHelper.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/WindowingHelper.java
deleted file mode 100644
index 17e142a..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/WindowingHelper.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.helper;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
-import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
-
-/**
- * A helper representing a count or eviction policy. Such helper classes are
- * used to provide a nice and well readable API.
- *
- * @param <DATA>
- * the type of input data handled by this helper
- * @see Count
- * @see Time
- * @see Delta
- */
-public abstract class WindowingHelper<DATA> {
-
- /**
- * Provides information for initial value serialization
- * in {@link Delta}, unused in other subclasses.
- */
- protected ExecutionConfig executionConfig;
-
- /**
- * Method for encapsulating the {@link EvictionPolicy}.
- * @return the eviction policy
- */
- public abstract EvictionPolicy<DATA> toEvict();
-
- /**
- * Method for encapsulating the {@link TriggerPolicy}.
- * @return the trigger policy
- */
- public abstract TriggerPolicy<DATA> toTrigger();
-
- /**
- * Setter for the {@link ExecutionConfig} field.
- * @param executionConfig Desired value
- */
- public final void setExecutionConfig(ExecutionConfig executionConfig){
- this.executionConfig = executionConfig;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveCloneableEvictionPolicyWrapper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveCloneableEvictionPolicyWrapper.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveCloneableEvictionPolicyWrapper.java
deleted file mode 100644
index 29ba9eb..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveCloneableEvictionPolicyWrapper.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.policy;
-
-/**
- * The {@link ActiveEvictionPolicy} wraps around a non active
- * {@link EvictionPolicy}. It forwards all calls to
- * {@link ActiveEvictionPolicy#notifyEvictionWithFakeElement(Object, int)} to
- * {@link EvictionPolicy#notifyEviction(Object, boolean, int)} while the
- * triggered parameter will be set to true.
- *
- * This class additionally implements the clone method and can wrap around
- * {@link CloneableEvictionPolicy} to make it active.
- *
- * @param <DATA>
- * The data type handled by this policy
- */
-public class ActiveCloneableEvictionPolicyWrapper<DATA> extends ActiveEvictionPolicyWrapper<DATA>
- implements CloneableEvictionPolicy<DATA> {
-
- /**
- * Auto generated version ID
- */
- private static final long serialVersionUID = 1520261575300622769L;
- CloneableEvictionPolicy<DATA> nestedPolicy;
-
- /**
- * Creates a wrapper which activates the eviction policy which is wrapped
- * in. This means that the nested policy will get called on fake elements as
- * well as on real elements.
- *
- * This specialized version of the {@link ActiveEvictionPolicyWrapper} works
- * with {@link CloneableEvictionPolicy} and is thereby cloneable as well.
- *
- * @param nestedPolicy
- * The policy which should be activated/wrapped in.
- */
- public ActiveCloneableEvictionPolicyWrapper(CloneableEvictionPolicy<DATA> nestedPolicy) {
- super(nestedPolicy);
- this.nestedPolicy = nestedPolicy;
- }
-
- @Override
- public ActiveCloneableEvictionPolicyWrapper<DATA> clone() {
- return new ActiveCloneableEvictionPolicyWrapper<DATA>(nestedPolicy.clone());
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveEvictionPolicy.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveEvictionPolicy.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveEvictionPolicy.java
deleted file mode 100644
index fe172bc..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveEvictionPolicy.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.policy;
-
-/**
- * This interface is used for active eviction policies. beside the functionality
- * inherited from {@link EvictionPolicy} it provides a method which gets called
- * to notify on fake elements.
- *
- * In case an eviction policy implements this interface instead of the
- * {@link EvictionPolicy} interface, not only the real but also fake data points
- * will cause a notification of the eviction.
- *
- * Fake data points are mostly used in windowing based on time to trigger and
- * evict even if no element arrives at all during a windows duration.
- */
-public interface ActiveEvictionPolicy<DATA> extends EvictionPolicy<DATA> {
-
- /**
- * Proves if and how many elements should be deleted from the element
- * buffer. The eviction takes place after the trigger and after the call to
- * the UDF. This method is only called with fake elements.
- *
- * Note: Fake elements are always considered as triggered. Therefore this
- * method does not have a triggered parameter.
- *
- * @param datapoint
- * the current fake data point
- * @param bufferSize
- * the current size of the buffer (only real elements are
- * counted)
- * @return the number of elements to delete from the buffer (only real
- * elements are counted)
- */
- public int notifyEvictionWithFakeElement(Object datapoint, int bufferSize);
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveEvictionPolicyWrapper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveEvictionPolicyWrapper.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveEvictionPolicyWrapper.java
deleted file mode 100644
index b3b6935..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveEvictionPolicyWrapper.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.policy;
-
-/**
- * This {@link ActiveEvictionPolicy} wraps around a non active
- * {@link EvictionPolicy}. It forwards all calls to
- * {@link ActiveEvictionPolicy#notifyEvictionWithFakeElement(Object, int)} to
- * {@link EvictionPolicy#notifyEviction(Object, boolean, int)} while the
- * triggered parameter will be set to true.
- *
- * @param <DATA>
- * The data type handled by this policy
- */
-public class ActiveEvictionPolicyWrapper<DATA> implements ActiveEvictionPolicy<DATA> {
-
- /**
- * Auto generated version ID
- */
- private static final long serialVersionUID = -7656558669799505882L;
- private EvictionPolicy<DATA> nestedPolicy;
-
- /**
- * Creates a wrapper which activates the eviction policy which is wrapped
- * in. This means that the nested policy will get called on fake elements as
- * well as on real elements.
- *
- * @param nestedPolicy
- * The policy which should be activated/wrapped in.
- */
- public ActiveEvictionPolicyWrapper(EvictionPolicy<DATA> nestedPolicy) {
- if (nestedPolicy == null) {
- throw new RuntimeException("The nested policy must not be null.");
- }
- this.nestedPolicy = nestedPolicy;
- }
-
- @Override
- public int notifyEviction(DATA datapoint, boolean triggered, int bufferSize) {
- return nestedPolicy.notifyEviction(datapoint, triggered, bufferSize);
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public int notifyEvictionWithFakeElement(Object datapoint, int bufferSize) {
- return nestedPolicy.notifyEviction((DATA) datapoint, true, bufferSize);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveTriggerCallback.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveTriggerCallback.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveTriggerCallback.java
deleted file mode 100644
index c44be37..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveTriggerCallback.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.policy;
-
-/**
- * In case an {@link ActiveTriggerPolicy} is used, it can implement own
- * {@link Runnable} classes. Such {@link Runnable} classes will be executed as
- * an own thread and can submit fake elements, to the element buffer at any
- * time.
- *
- * The factory method for runnables of the {@link ActiveTriggerPolicy} gets an
- * instance of this interface as parameter. The describes adding of elements can
- * be done by the runnable using the methods provided in this interface.
- *
- */
-public interface ActiveTriggerCallback {
-
- /**
- * Submits a new fake data point to the element buffer. Such a fake element
- * might be used to trigger at any time, but will never be included in the
- * result of the reduce function. The submission of a fake element causes
- * notifications only at the {@link ActiveTriggerPolicy} and
- * {@link ActiveEvictionPolicy} implementations.
- *
- * @param datapoint
- * the fake data point to be added
- */
- public void sendFakeElement(Object datapoint);
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveTriggerPolicy.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveTriggerPolicy.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveTriggerPolicy.java
deleted file mode 100644
index b645c0f..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveTriggerPolicy.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.policy;
-
-import org.apache.flink.streaming.api.windowing.helper.Timestamp;
-
-/**
- * This interface extends the {@link TriggerPolicy} interface with functionality
- * for active triggers. Active triggers can act in two ways:
- *
- * 1) Whenever an element arrives at the operator, the
- * {@link ActiveTriggerPolicy#preNotifyTrigger(Object)} method gets called
- * first. It can return zero ore more fake data points which will be added
- * before the currently arrived real element gets processed. This allows to
- * handle empty windows in time based windowing with an user defined
- * {@link Timestamp}. Triggers are not called on fake datapoint. A fake
- * datapoint is always considered as triggered.
- *
- * 2) An active trigger has a factory method for a runnable. This factory method
- * gets called at the start up of the operator. The returned runnable will be
- * executed in its own thread and can submit fake elements at any time through an
- * {@link ActiveTriggerCallback}. This allows to have time based triggers based
- * on any system internal time measure. Triggers are not called on fake
- * datapoint. A fake datapoints is always considered as triggered.
- *
- * @param <DATA>
- * The data type which can be handled by this policy
- */
-public interface ActiveTriggerPolicy<DATA> extends TriggerPolicy<DATA> {
-
- /**
- * Whenever an element arrives at the operator, the
- * {@link ActiveTriggerPolicy#preNotifyTrigger(Object)} method gets called
- * first. It can return zero ore more fake data points which will be added
- * before the the currently arrived real element gets processed. This allows
- * to handle empty windows in time based windowing with an user defined
- * {@link Timestamp}. Triggers are not called on fake datapoints. A fake
- * datapoint is always considered as triggered.
- *
- * @param datapoint
- * the data point which arrived at the operator
- * @return zero ore more fake data points which will be added before the the
- * currently arrived real element gets processed.
- */
- public Object[] preNotifyTrigger(DATA datapoint);
-
- /**
- * This is the factory method for a runnable. This factory method gets
- * called at the start up of the operator. The returned runnable will be
- * executed in its own thread and can submit fake elements at any time through
- * an {@link ActiveTriggerCallback}. This allows to have time based triggers
- * based on any system internal time measure. Triggers are not called on
- * fake datapoints. A fake datapoint is always considered as triggered.
- *
- * @param callback
- * A callback object which allows to add fake elements from
- * within the returned {@link Runnable}.
- * @return The runnable implementation or null in case there is no. In case
- * an {@link ActiveTriggerPolicy} is used, it can implement own
- * {@link Runnable} classes. Such {@link Runnable} classes will be
- * executed as an own thread and can submit fake elements, to the
- * element buffer at any time.
- */
- public Runnable createActiveTriggerRunnable(ActiveTriggerCallback callback);
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CentralActiveTrigger.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CentralActiveTrigger.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CentralActiveTrigger.java
deleted file mode 100644
index 308f152..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CentralActiveTrigger.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.policy;
-
-/**
- * Interface for defining grouped windowing policies which can interact with
- * other groups to trigger on the latest available information globally
- * available to all groups.</p> At predefined time intervals the discretizers
- * takes the last globally seen element, and notifies all groups (but the one
- * that already have seen the object). This allows to trigger before an element
- * comes from the next window for a specific group. This pattern can be
- * used for instance in time policies to regularly broadcast the current time to
- * all groups.
- */
-public interface CentralActiveTrigger<DATA> extends CloneableTriggerPolicy<DATA> {
-
- /**
- * This method is called to broadcast information about the last globally
- * seen data point to all triggers. The number of elements returned in the
- * array will determine the number of triggers at that point, while the
- * elements themselves are used only for active eviction.
- *
- * @param datapoint
- * The last globally seen data
- * @return An object of fake elements. If returned null or empty list, no
- * triggers will occur.
- */
- public Object[] notifyOnLastGlobalElement(DATA datapoint);
-
-}