You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/21 11:03:50 UTC
[34/51] [partial] flink git commit: [FLINK-2877] Move Streaming API
out of Staging package
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/UnionTransformation.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/UnionTransformation.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/UnionTransformation.java
deleted file mode 100644
index 3e1ff57..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/UnionTransformation.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.api.transformations;
-
-import com.google.common.collect.Lists;
-import org.apache.flink.streaming.api.operators.ChainingStrategy;
-
-import java.util.Collection;
-import java.util.List;
-
-/**
- * This transformation represents a union of several input
- * {@link StreamTransformation StreamTransformations}.
- *
- * <p>
- * This does not create a physical operation, it only affects how upstream operations are
- * connected to downstream operations.
- *
- * @param <T> The type of the elements that result from this {@code UnionTransformation}
- */
-public class UnionTransformation<T> extends StreamTransformation<T> {
- private final List<StreamTransformation<T>> inputs;
-
- /**
- * Creates a new {@code UnionTransformation} from the given input {@code StreamTransformations}.
- *
- * <p>
- * The input {@code StreamTransformations} must all have the same type.
- *
- * @param inputs The list of input {@code StreamTransformations}
- */
- public UnionTransformation(List<StreamTransformation<T>> inputs) {
- super("Union", inputs.get(0).getOutputType(), inputs.get(0).getParallelism());
-
- for (StreamTransformation<T> input: inputs) {
- if (!input.getOutputType().equals(getOutputType())) {
- throw new UnsupportedOperationException("Type mismatch in input " + input);
- }
- }
-
- this.inputs = Lists.newArrayList(inputs);
- }
-
- /**
- * Returns the list of input {@code StreamTransformations}.
- */
- public List<StreamTransformation<T>> getInputs() {
- return inputs;
- }
-
- @Override
- public Collection<StreamTransformation<?>> getTransitivePredecessors() {
- List<StreamTransformation<?>> result = Lists.newArrayList();
- result.add(this);
- for (StreamTransformation<T> input: inputs) {
- result.addAll(input.getTransitivePredecessors());
- }
- return result;
- }
-
- @Override
- public final void setChainingStrategy(ChainingStrategy strategy) {
- throw new UnsupportedOperationException("Cannot set chaining strategy on Union Transformation.");
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/watermark/Watermark.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/watermark/Watermark.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/watermark/Watermark.java
deleted file mode 100644
index 838c24a..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/watermark/Watermark.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.api.watermark;
-
-import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
-
-/**
- * A Watermark tells operators that receive it that no elements with a timestamp older or equal
- * to the watermark timestamp should arrive at the operator. Watermarks are emitted at the
- * sources and propagate through the operators of the topology. Operators must themselves emit
- * watermarks to downstream operators using
- * {@link org.apache.flink.streaming.api.operators.Output#emitWatermark(Watermark)}. Operators that
- * do not internally buffer elements can always forward the watermark that they receive. Operators
- * that buffer elements, such as window operators, must forward a watermark after emission of
- * elements that is triggered by the arriving watermark.
- *
- * <p>
- * In some cases a watermark is only a heuristic and operators should be able to deal with
- * late elements. They can either discard those or update the result and emit updates/retractions
- * to downstream operations.
- *
- * <p>
- * When a source closes it will emit a final watermark with timestamp {@code Long.MAX_VALUE}. When
- * an operator receives this it will know that no more input will be arriving in the future.
- *
- */
-public class Watermark extends StreamElement {
-
- /** The timestamp of the watermark */
- private final long timestamp;
-
- /**
- * Creates a new watermark with the given timestamp.
- */
- public Watermark(long timestamp) {
- this.timestamp = timestamp;
- }
-
- /**
- * Returns the timestamp associated with this {@link Watermark} in milliseconds.
- */
- public long getTimestamp() {
- return timestamp;
- }
-
- @Override
- public boolean equals(Object o) {
- return this == o ||
- o != null && o.getClass() == Watermark.class && ((Watermark) o).timestamp == this.timestamp;
- }
-
- @Override
- public int hashCode() {
- return (int) (timestamp ^ (timestamp >>> 32));
- }
-
- @Override
- public String toString() {
- return "Watermark @ " + timestamp;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
deleted file mode 100644
index 4d5b9d7..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.api.windowing.assigners;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.windowing.triggers.Trigger;
-import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
-
-import java.util.Collection;
-import java.util.Collections;
-
-/**
- * A {@link WindowAssigner} that assigns all elements to the same global window.
- *
- * <p>
- * Use this if you want to use a {@link Trigger} and
- * {@link org.apache.flink.streaming.api.windowing.evictors.Evictor} to do flexible, policy based
- * windows.
- */
-public class GlobalWindows extends WindowAssigner<Object, GlobalWindow> {
- private static final long serialVersionUID = 1L;
-
- private GlobalWindows() {}
-
- @Override
- public Collection<GlobalWindow> assignWindows(Object element, long timestamp) {
- return Collections.singletonList(GlobalWindow.get());
- }
-
- @Override
- public Trigger<Object, GlobalWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
- return new NeverTrigger();
- }
-
- @Override
- public String toString() {
- return "GlobalWindows()";
- }
-
- /**
- * Creates a new {@code GlobalWindows} {@link WindowAssigner} that assigns
- * all elements to the same {@link GlobalWindow}.
- *
- * @return The global window policy.
- */
- public static GlobalWindows create() {
- return new GlobalWindows();
- }
-
- /**
- * A trigger that never fires, as default Trigger for GlobalWindows.
- */
- private static class NeverTrigger implements Trigger<Object, GlobalWindow> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public TriggerResult onElement(Object element,
- long timestamp,
- GlobalWindow window,
- TriggerContext ctx) {
- return TriggerResult.CONTINUE;
- }
-
- @Override
- public TriggerResult onEventTime(long time, TriggerContext ctx) {
- return TriggerResult.CONTINUE;
- }
-
- @Override
- public TriggerResult onProcessingTime(long time, TriggerContext ctx) {
- return TriggerResult.CONTINUE;
- }
- }
-
- @Override
- public TypeSerializer<GlobalWindow> getWindowSerializer(ExecutionConfig executionConfig) {
- return new GlobalWindow.Serializer();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java
deleted file mode 100644
index 5f7ab45..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.api.windowing.assigners;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.windowing.time.AbstractTime;
-import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
-import org.apache.flink.streaming.api.windowing.triggers.Trigger;
-import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-
-/**
- * A {@link WindowAssigner} that windows elements into sliding windows based on the timestamp of the
- * elements. Windows can possibly overlap.
- *
- * <p>
- * For example, in order to window into windows of 1 minute, every 10 seconds:
- * <pre> {@code
- * DataStream<Tuple2<String, Integer>> in = ...;
- * KeyedStream<String, Tuple2<String, Integer>> keyed = in.keyBy(...);
- * WindowedStream<Tuple2<String, Integer>, String, TimeWindows> windowed =
- * keyed.window(SlidingTimeWindows.of(Time.of(1, MINUTES), Time.of(10, SECONDS));
- * } </pre>
- */
-public class SlidingTimeWindows extends WindowAssigner<Object, TimeWindow> {
- private static final long serialVersionUID = 1L;
-
- private final long size;
-
- private final long slide;
-
- private SlidingTimeWindows(long size, long slide) {
- this.size = size;
- this.slide = slide;
- }
-
- @Override
- public Collection<TimeWindow> assignWindows(Object element, long timestamp) {
- List<TimeWindow> windows = new ArrayList<>((int) (size / slide));
- long lastStart = timestamp - timestamp % slide;
- for (long start = lastStart;
- start > timestamp - size;
- start -= slide) {
- windows.add(new TimeWindow(start, start + size));
- }
- return windows;
- }
-
- public long getSize() {
- return size;
- }
-
- public long getSlide() {
- return slide;
- }
-
- @Override
- public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
- if (env.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) {
- return ProcessingTimeTrigger.create();
- } else {
- return EventTimeTrigger.create();
- }
- }
-
- @Override
- public String toString() {
- return "SlidingTimeWindows(" + size + ", " + slide + ")";
- }
-
- /**
- * Creates a new {@code SlidingTimeWindows} {@link WindowAssigner} that assigns
- * elements to sliding time windows based on the element timestamp.
- *
- * @param size The size of the generated windows.
- * @param slide The slide interval of the generated windows.
- * @return The time policy.
- */
- public static SlidingTimeWindows of(AbstractTime size, AbstractTime slide) {
- return new SlidingTimeWindows(size.toMilliseconds(), slide.toMilliseconds());
- }
-
- @Override
- public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) {
- return new TimeWindow.Serializer();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java
deleted file mode 100644
index 463b2c4..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.api.windowing.assigners;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.windowing.time.AbstractTime;
-import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
-import org.apache.flink.streaming.api.windowing.triggers.Trigger;
-import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-
-import java.util.Collection;
-import java.util.Collections;
-
-/**
- * A {@link WindowAssigner} that windows elements into windows based on the timestamp of the
- * elements. Windows cannot overlap.
- *
- * <p>
- * For example, in order to window into windows of 1 minute, every 10 seconds:
- * <pre> {@code
- * DataStream<Tuple2<String, Integer>> in = ...;
- * KeyedStream<String, Tuple2<String, Integer>> keyed = in.keyBy(...);
- * WindowedStream<Tuple2<String, Integer>, String, TimeWindows> windowed =
- * keyed.window(TumblingTimeWindows.of(Time.of(1, MINUTES), Time.of(10, SECONDS));
- * } </pre>
- */
-public class TumblingTimeWindows extends WindowAssigner<Object, TimeWindow> {
- private static final long serialVersionUID = 1L;
-
- private long size;
-
- private TumblingTimeWindows(long size) {
- this.size = size;
- }
-
- @Override
- public Collection<TimeWindow> assignWindows(Object element, long timestamp) {
- long start = timestamp - (timestamp % size);
- return Collections.singletonList(new TimeWindow(start, start + size));
- }
-
- public long getSize() {
- return size;
- }
-
- @Override
- public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
- if (env.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) {
- return ProcessingTimeTrigger.create();
- } else {
- return EventTimeTrigger.create();
- }
- }
-
- @Override
- public String toString() {
- return "TumblingTimeWindows(" + size + ")";
- }
-
- /**
- * Creates a new {@code TumblingTimeWindows} {@link WindowAssigner} that assigns
- * elements to time windows based on the element timestamp.
- *
- * @param size The size of the generated windows.
- * @return The time policy.
- */
- public static TumblingTimeWindows of(AbstractTime size) {
- return new TumblingTimeWindows(size.toMilliseconds());
- }
-
- @Override
- public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) {
- return new TimeWindow.Serializer();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowAssigner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowAssigner.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowAssigner.java
deleted file mode 100644
index 4b4b1ea..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowAssigner.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.api.windowing.assigners;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.windowing.triggers.Trigger;
-import org.apache.flink.streaming.api.windowing.windows.Window;
-import java.io.Serializable;
-
-import java.util.Collection;
-
-/**
- * A {@code WindowAssigner} assigns zero or more {@link Window Windows} to an element.
- *
- * <p>
- * In a window operation, elements are grouped by their key (if available) and by the windows to
- * which it was assigned. The set of elements with the same key and window is called a pane.
- * When a {@link Trigger} decides that a certain pane should fire the
- * {@link org.apache.flink.streaming.api.functions.windowing.WindowFunction} is applied
- * to produce output elements for that pane.
- *
- * @param <T> The type of elements that this WindowAssigner can assign windows to.
- * @param <W> The type of {@code Window} that this assigner assigns.
- */
-public abstract class WindowAssigner<T, W extends Window> implements Serializable {
- private static final long serialVersionUID = 1L;
-
- /**
- * Returns a {@code Collection} of windows that should be assigned to the element.
- *
- * @param element The element to which windows should be assigned.
- * @param timestamp The timestamp of the element.
- */
- public abstract Collection<W> assignWindows(T element, long timestamp);
-
- /**
- * Returns the default trigger associated with this {@code WindowAssigner}.
- */
- public abstract Trigger<T, W> getDefaultTrigger(StreamExecutionEnvironment env);
-
- /**
- * Returns a {@link TypeSerializer} for serializing windows that are assigned by
- * this {@code WindowAssigner}.
- */
- public abstract TypeSerializer<W> getWindowSerializer(ExecutionConfig executionConfig);
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/CountEvictor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/CountEvictor.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/CountEvictor.java
deleted file mode 100644
index 0a078e2..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/CountEvictor.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.api.windowing.evictors;
-
-import org.apache.flink.streaming.api.windowing.windows.Window;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-/**
- * An {@link Evictor} that keeps only a certain amount of elements.
- *
- * @param <W> The type of {@link Window Windows} on which this {@code Evictor} can operate.
- */
-public class CountEvictor<W extends Window> implements Evictor<Object, W> {
- private static final long serialVersionUID = 1L;
-
- private final long maxCount;
-
- private CountEvictor(long count) {
- this.maxCount = count;
- }
-
- @Override
- public int evict(Iterable<StreamRecord<Object>> elements, int size, W window) {
- if (size > maxCount) {
- return (int) (size - maxCount);
- } else {
- return 0;
- }
- }
-
- /**
- * Creates a {@code CountEvictor} that keeps the given number of elements.
- *
- * @param maxCount The number of elements to keep in the pane.
- */
- public static <W extends Window> CountEvictor<W> of(long maxCount) {
- return new CountEvictor<>(maxCount);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/DeltaEvictor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/DeltaEvictor.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/DeltaEvictor.java
deleted file mode 100644
index 0083a04..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/DeltaEvictor.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.api.windowing.evictors;
-
-import com.google.common.collect.Iterables;
-import org.apache.flink.streaming.api.functions.windowing.delta.DeltaFunction;
-import org.apache.flink.streaming.api.windowing.windows.Window;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-/**
- * An {@link Evictor} that keeps elements based on a {@link DeltaFunction} and a threshold.
- *
- * <p>
- * Eviction starts from the first element of the buffer and removes all elements from the buffer
- * which have a higher delta then the threshold. As soon as there is an element with a lower delta,
- * the eviction stops.
- *
- * @param <W> The type of {@link Window Windows} on which this {@code Evictor} can operate.
- */
-public class DeltaEvictor<T, W extends Window> implements Evictor<T, W> {
- private static final long serialVersionUID = 1L;
-
- DeltaFunction<T> deltaFunction;
- private double threshold;
-
- private DeltaEvictor(double threshold, DeltaFunction<T> deltaFunction) {
- this.deltaFunction = deltaFunction;
- this.threshold = threshold;
- }
-
- @Override
- public int evict(Iterable<StreamRecord<T>> elements, int size, W window) {
- StreamRecord<T> lastElement = Iterables.getLast(elements);
- int toEvict = 0;
- for (StreamRecord<T> element : elements) {
- if (deltaFunction.getDelta(element.getValue(), lastElement.getValue()) < this.threshold) {
- break;
- }
- toEvict++;
- }
-
- return toEvict;
- }
-
- @Override
- public String toString() {
- return "DeltaEvictor(" + deltaFunction + ", " + threshold + ")";
- }
-
- /**
- * Creates a {@code DeltaEvictor} from the given threshold and {@code DeltaFunction}.
- *
- * @param threshold The threshold
- * @param deltaFunction The {@code DeltaFunction}
- */
- public static <T, W extends Window> DeltaEvictor<T, W> of(double threshold, DeltaFunction<T> deltaFunction) {
- return new DeltaEvictor<>(threshold, deltaFunction);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/Evictor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/Evictor.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/Evictor.java
deleted file mode 100644
index 5ceaf2f..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/Evictor.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.api.windowing.evictors;
-
-import org.apache.flink.streaming.api.windowing.windows.Window;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import java.io.Serializable;
-
-/**
- * An {@code Evictor} can remove elements from a pane before it is being processed and after
- * window evaluation was triggered by a
- * {@link org.apache.flink.streaming.api.windowing.triggers.Trigger}.
- *
- * <p>
- * A pane is the bucket of elements that have the same key (assigned by the
- * {@link org.apache.flink.api.java.functions.KeySelector}) and same {@link Window}. An element can
- * be in multiple panes of it was assigned to multiple windows by the
- * {@link org.apache.flink.streaming.api.windowing.assigners.WindowAssigner}. These panes all
- * have their own instance of the {@code Evictor}.
- *
- * @param <T> The type of elements that this {@code Evictor} can evict.
- * @param <W> The type of {@link Window Windows} on which this {@code Evictor} can operate.
- */
-public interface Evictor<T, W extends Window> extends Serializable {
-
- /**
- * Computes how many elements should be removed from the pane. The result specifies how
- * many elements should be removed from the beginning.
- *
- * @param elements The elements currently in the pane.
- * @param size The current number of elements in the pane.
- * @param window The {@link Window}
- */
- int evict(Iterable<StreamRecord<T>> elements, int size, W window);
-}
-
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/TimeEvictor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/TimeEvictor.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/TimeEvictor.java
deleted file mode 100644
index 5776d8d..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/TimeEvictor.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.api.windowing.evictors;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Iterables;
-import org.apache.flink.streaming.api.windowing.time.AbstractTime;
-import org.apache.flink.streaming.api.windowing.windows.Window;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-/**
- * An {@link Evictor} that keeps elements for a certain amount of time. Elements older
- * than {@code current_time - keep_time} are evicted.
- *
- * @param <W> The type of {@link Window Windows} on which this {@code Evictor} can operate.
- */
-public class TimeEvictor<W extends Window> implements Evictor<Object, W> {
- private static final long serialVersionUID = 1L;
-
- private final long windowSize;
-
- public TimeEvictor(long windowSize) {
- this.windowSize = windowSize;
- }
-
- @Override
- public int evict(Iterable<StreamRecord<Object>> elements, int size, W window) {
- int toEvict = 0;
- long currentTime = Iterables.getLast(elements).getTimestamp();
- long evictCutoff = currentTime - windowSize;
- for (StreamRecord<Object> record: elements) {
- if (record.getTimestamp() > evictCutoff) {
- break;
- }
- toEvict++;
- }
- return toEvict;
- }
-
- @Override
- public String toString() {
- return "TimeEvictor(" + windowSize + ")";
- }
-
- @VisibleForTesting
- public long getWindowSize() {
- return windowSize;
- }
-
- /**
- * Creates a {@code TimeEvictor} that keeps the given number of elements.
- *
- * @param windowSize The amount of time for which to keep elements.
- */
- public static <W extends Window> TimeEvictor<W> of(AbstractTime windowSize) {
- return new TimeEvictor<>(windowSize.toMilliseconds());
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/time/AbstractTime.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/time/AbstractTime.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/time/AbstractTime.java
deleted file mode 100644
index 3f8fb60..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/time/AbstractTime.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.time;
-
-import org.apache.flink.streaming.api.TimeCharacteristic;
-
-import java.util.concurrent.TimeUnit;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-/**
- * Base class for {@link Time} implementations.
- */
-public abstract class AbstractTime {
-
- /** The time unit for this policy's time interval */
- private final TimeUnit unit;
-
- /** The size of the windows generated by this policy */
- private final long size;
-
-
- protected AbstractTime(long size, TimeUnit unit) {
- this.unit = checkNotNull(unit, "time unit may not be null");
- this.size = size;
- }
-
- // ------------------------------------------------------------------------
- // Properties
- // ------------------------------------------------------------------------
-
- /**
- * Gets the time unit for this policy's time interval.
- * @return The time unit for this policy's time interval.
- */
- public TimeUnit getUnit() {
- return unit;
- }
-
- /**
- * Gets the length of this policy's time interval.
- * @return The length of this policy's time interval.
- */
- public long getSize() {
- return size;
- }
-
- /**
- * Converts the time interval to milliseconds.
- * @return The time interval in milliseconds.
- */
- public long toMilliseconds() {
- return unit.toMillis(size);
- }
-
- // ------------------------------------------------------------------------
- // Utilities
- // ------------------------------------------------------------------------
-
- public abstract AbstractTime makeSpecificBasedOnTimeCharacteristic(TimeCharacteristic characteristic);
-
- @Override
- public int hashCode() {
- return 31 * (int) (size ^ (size >>> 32)) + unit.hashCode();
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj != null && obj.getClass() == getClass()) {
- AbstractTime that = (AbstractTime) obj;
- return this.size == that.size && this.unit.equals(that.unit);
- }
- else {
- return false;
- }
- }
-
- @Override
- public String toString() {
- return getClass().getSimpleName() + " (" + size + ' ' + unit.name() + ')';
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/time/EventTime.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/time/EventTime.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/time/EventTime.java
deleted file mode 100644
index 6a4349c..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/time/EventTime.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.time;
-
-import org.apache.flink.api.common.InvalidProgramException;
-import org.apache.flink.streaming.api.TimeCharacteristic;
-
-import java.util.concurrent.TimeUnit;
-
-/**
- * The definition of an event time interval for windowing. See
- * {@link org.apache.flink.streaming.api.TimeCharacteristic#EventTime} for a definition
- * of event time.
- */
-public final class EventTime extends AbstractTime {
-
- /** Instantiation only via factory method */
- private EventTime(long size, TimeUnit unit) {
- super(size, unit);
- }
-
- @Override
- public EventTime makeSpecificBasedOnTimeCharacteristic(TimeCharacteristic characteristic) {
- if (characteristic == TimeCharacteristic.EventTime || characteristic == TimeCharacteristic.IngestionTime) {
- return this;
- }
- else {
- throw new InvalidProgramException(
- "Cannot use EventTime policy in a dataflow that runs on " + characteristic);
- }
- }
- // ------------------------------------------------------------------------
- // Factory
- // ------------------------------------------------------------------------
-
- /**
- * Creates an event time policy describing an event time interval.
- *
- * @param size The size of the generated windows.
- * @param unit The init (seconds, milliseconds) of the time interval.
- * @return The event time policy.
- */
- public static EventTime of(long size, TimeUnit unit) {
- return new EventTime(size, unit);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/time/ProcessingTime.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/time/ProcessingTime.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/time/ProcessingTime.java
deleted file mode 100644
index 4be6ed0..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/time/ProcessingTime.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.time;
-
-import org.apache.flink.api.common.InvalidProgramException;
-import org.apache.flink.streaming.api.TimeCharacteristic;
-
-import java.util.concurrent.TimeUnit;
-
-/**
- * The definition of a processing time interval for windowing. See
- * {@link org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime} for a definition
- * of processing time.
- */
-public final class ProcessingTime extends AbstractTime {
-
- /** Instantiation only via factory method */
- private ProcessingTime(long size, TimeUnit unit) {
- super(size, unit);
- }
-
- @Override
- public ProcessingTime makeSpecificBasedOnTimeCharacteristic(TimeCharacteristic characteristic) {
- if (characteristic == TimeCharacteristic.ProcessingTime) {
- return this;
- }
- else {
- throw new InvalidProgramException(
- "Cannot use ProcessingTime policy in a dataflow that runs on " + characteristic);
- }
- }
-
- // ------------------------------------------------------------------------
- // Factory
- // ------------------------------------------------------------------------
-
- /**
- * Creates a processing time policy describing a processing time interval.
- *
- * @param size The size of the generated windows.
- * @param unit The init (seconds, milliseconds) of the time interval.
- * @return The processing time policy.
- */
- public static ProcessingTime of(long size, TimeUnit unit) {
- return new ProcessingTime(size, unit);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/time/Time.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/time/Time.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/time/Time.java
deleted file mode 100644
index d1b3fe3..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/time/Time.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.time;
-
-import org.apache.flink.streaming.api.TimeCharacteristic;
-
-import java.util.concurrent.TimeUnit;
-
-/**
- * The definition of a time interval for windowing. The time characteristic referred
- * to is the default time characteristic set on the execution environment.
- */
-public final class Time extends AbstractTime {
-
- /** Instantiation only via factory method */
- private Time(long size, TimeUnit unit) {
- super(size, unit);
- }
-
- @Override
- public AbstractTime makeSpecificBasedOnTimeCharacteristic(TimeCharacteristic timeCharacteristic) {
- switch (timeCharacteristic) {
- case ProcessingTime:
- return ProcessingTime.of(getSize(), getUnit());
- case IngestionTime:
- case EventTime:
- return EventTime.of(getSize(), getUnit());
- default:
- throw new IllegalArgumentException("Unknown time characteristic");
- }
- }
-
- // ------------------------------------------------------------------------
- // Factory
- // ------------------------------------------------------------------------
-
- /**
- * Creates a time policy describing a processing time interval. The policy refers to the
- * time characteristic that is set on the dataflow via
- * {@link org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#
- * setStreamTimeCharacteristic(org.apache.flink.streaming.api.TimeCharacteristic)}.
- *
- * @param size The size of the generated windows.
- * @param unit The init (seconds, milliseconds) of the time interval.
- * @return The time policy.
- */
- public static Time of(long size, TimeUnit unit) {
- return new Time(size, unit);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
deleted file mode 100644
index ea26309..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.api.windowing.triggers;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.flink.api.common.state.OperatorState;
-import org.apache.flink.streaming.api.windowing.time.AbstractTime;
-import org.apache.flink.streaming.api.windowing.windows.Window;
-
-/**
- * A {@link Trigger} that continuously fires based on a given time interval. This fires based
- * on {@link org.apache.flink.streaming.api.watermark.Watermark Watermarks}.
- *
- * @see org.apache.flink.streaming.api.watermark.Watermark
- *
- * @param <W> The type of {@link Window Windows} on which this trigger can operate.
- */
-public class ContinuousEventTimeTrigger<W extends Window> implements Trigger<Object, W> {
- private static final long serialVersionUID = 1L;
-
- private final long interval;
-
- private ContinuousEventTimeTrigger(long interval) {
- this.interval = interval;
- }
-
- @Override
- public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {
-
- OperatorState<Boolean> first = ctx.getKeyValueState("first", true);
-
- if (first.value()) {
- long start = timestamp - (timestamp % interval);
- long nextFireTimestamp = start + interval;
-
- ctx.registerEventTimeTimer(nextFireTimestamp);
-
- first.update(false);
- return TriggerResult.CONTINUE;
- }
- return TriggerResult.CONTINUE;
- }
-
- @Override
- public TriggerResult onEventTime(long time, TriggerContext ctx) {
- ctx.registerEventTimeTimer(time + interval);
- return TriggerResult.FIRE;
- }
-
- @Override
- public TriggerResult onProcessingTime(long time,
- TriggerContext ctx) throws Exception {
- return TriggerResult.CONTINUE;
- }
-
- @Override
- public String toString() {
- return "ContinuousProcessingTimeTrigger(" + interval + ")";
- }
-
- @VisibleForTesting
- public long getInterval() {
- return interval;
- }
-
- /**
- * Creates a trigger that continuously fires based on the given interval.
- *
- * @param interval The time interval at which to fire.
- * @param <W> The type of {@link Window Windows} on which this trigger can operate.
- */
- public static <W extends Window> ContinuousEventTimeTrigger<W> of(AbstractTime interval) {
- return new ContinuousEventTimeTrigger<>(interval.toMilliseconds());
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
deleted file mode 100644
index be56738..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.api.windowing.triggers;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.flink.api.common.state.OperatorState;
-import org.apache.flink.streaming.api.windowing.time.AbstractTime;
-import org.apache.flink.streaming.api.windowing.windows.Window;
-
-/**
- * A {@link Trigger} that continuously fires based on a given time interval. The time is the current
- * system time.
- *
- * @param <W> The type of {@link Window Windows} on which this trigger can operate.
- */
-public class ContinuousProcessingTimeTrigger<W extends Window> implements Trigger<Object, W> {
- private static final long serialVersionUID = 1L;
-
- private final long interval;
-
- private ContinuousProcessingTimeTrigger(long interval) {
- this.interval = interval;
- }
-
- @Override
- public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {
- long currentTime = System.currentTimeMillis();
-
- OperatorState<Long> fireState = ctx.getKeyValueState("fire-timestamp", 0L);
- long nextFireTimestamp = fireState.value();
-
- if (nextFireTimestamp == 0) {
- long start = currentTime - (currentTime % interval);
- fireState.update(start + interval);
-
- ctx.registerProcessingTimeTimer(nextFireTimestamp);
- return TriggerResult.CONTINUE;
- }
- if (currentTime > nextFireTimestamp) {
- long start = currentTime - (currentTime % interval);
- fireState.update(start + interval);
-
- ctx.registerProcessingTimeTimer(nextFireTimestamp);
-
- return TriggerResult.FIRE;
- }
- return TriggerResult.CONTINUE;
- }
-
- @Override
- public TriggerResult onEventTime(long time,
- TriggerContext ctx) throws Exception {
- return TriggerResult.CONTINUE;
- }
-
- @Override
- public TriggerResult onProcessingTime(long time, TriggerContext ctx) throws Exception {
-
- OperatorState<Long> fireState = ctx.getKeyValueState("fire-timestamp", 0L);
- long nextFireTimestamp = fireState.value();
-
- // only fire if an element didn't already fire
- long currentTime = System.currentTimeMillis();
- if (currentTime > nextFireTimestamp) {
- long start = currentTime - (currentTime % interval);
- fireState.update(start + interval);
- return TriggerResult.FIRE;
- }
- return TriggerResult.CONTINUE;
- }
-
- @VisibleForTesting
- public long getInterval() {
- return interval;
- }
-
- @Override
- public String toString() {
- return "ContinuousProcessingTimeTrigger(" + interval + ")";
- }
-
- /**
- * Creates a trigger that continuously fires based on the given interval.
- *
- * @param interval The time interval at which to fire.
- * @param <W> The type of {@link Window Windows} on which this trigger can operate.
- */
- public static <W extends Window> ContinuousProcessingTimeTrigger<W> of(AbstractTime interval) {
- return new ContinuousProcessingTimeTrigger<>(interval.toMilliseconds());
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java
deleted file mode 100644
index 8512989..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.api.windowing.triggers;
-
-import org.apache.flink.api.common.state.OperatorState;
-import org.apache.flink.streaming.api.windowing.windows.Window;
-
-import java.io.IOException;
-
-/**
- * A {@link Trigger} that fires once the count of elements in a pane reaches the given count.
- *
- * @param <W> The type of {@link Window Windows} on which this trigger can operate.
- */
-public class CountTrigger<W extends Window> implements Trigger<Object, W> {
- private static final long serialVersionUID = 1L;
-
- private final long maxCount;
-
- private CountTrigger(long maxCount) {
- this.maxCount = maxCount;
- }
-
- @Override
- public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws IOException {
- OperatorState<Long> count = ctx.getKeyValueState("count", 0L);
- long currentCount = count.value() + 1;
- count.update(currentCount);
- if (currentCount >= maxCount) {
- count.update(0L);
- return TriggerResult.FIRE;
- }
- return TriggerResult.CONTINUE;
- }
-
- @Override
- public TriggerResult onEventTime(long time, TriggerContext ctx) {
- return TriggerResult.CONTINUE;
- }
-
- @Override
- public TriggerResult onProcessingTime(long time,
- TriggerContext ctx) throws Exception {
- return TriggerResult.CONTINUE;
- }
-
- @Override
- public String toString() {
- return "CountTrigger(" + maxCount + ")";
- }
-
- /**
- * Creates a trigger that fires once the number of elements in a pane reaches the given count.
- *
- * @param maxCount The count of elements at which to fire.
- * @param <W> The type of {@link Window Windows} on which this trigger can operate.
- */
- public static <W extends Window> CountTrigger<W> of(long maxCount) {
- return new CountTrigger<>(maxCount);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java
deleted file mode 100644
index 1c6523d..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.api.windowing.triggers;
-
-import org.apache.flink.api.common.state.OperatorState;
-import org.apache.flink.streaming.api.functions.windowing.delta.DeltaFunction;
-import org.apache.flink.streaming.api.windowing.windows.Window;
-
-import java.io.Serializable;
-
-/**
- * A {@link Trigger} that fires based on a {@link DeltaFunction} and a threshold.
- *
- * <p>
- * This trigger calculates a delta between the data point which triggered last
- * and the currently arrived data point. It triggers if the delta is higher than
- * a specified threshold.
- *
- * @param <W> The type of {@link Window Windows} on which this trigger can operate.
- */
-public class DeltaTrigger<T extends Serializable, W extends Window> implements Trigger<T, W> {
- private static final long serialVersionUID = 1L;
-
- private final DeltaFunction<T> deltaFunction;
- private final double threshold;
-
- private DeltaTrigger(double threshold, DeltaFunction<T> deltaFunction) {
- this.deltaFunction = deltaFunction;
- this.threshold = threshold;
- }
-
- @Override
- public TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception {
- OperatorState<T> lastElementState = ctx.getKeyValueState("last-element", null);
- if (lastElementState.value() == null) {
- lastElementState.update(element);
- return TriggerResult.CONTINUE;
- }
- if (deltaFunction.getDelta(lastElementState.value(), element) > this.threshold) {
- lastElementState.update(element);
- return TriggerResult.FIRE;
- }
- return TriggerResult.CONTINUE;
- }
-
- @Override
- public TriggerResult onEventTime(long time, TriggerContext ctx) {
- return TriggerResult.CONTINUE;
- }
-
- @Override
- public TriggerResult onProcessingTime(long time,
- TriggerContext ctx) throws Exception {
- return TriggerResult.CONTINUE;
- }
-
- @Override
- public String toString() {
- return "DeltaTrigger(" + deltaFunction + ", " + threshold + ")";
- }
-
- /**
- * Creates a delta trigger from the given threshold and {@code DeltaFunction}.
- *
- * @param threshold The threshold at which to trigger.
- * @param deltaFunction The delta function to use
- *
- * @param <T> The type of elements on which this trigger can operate.
- * @param <W> The type of {@link Window Windows} on which this trigger can operate.
- */
- public static <T extends Serializable, W extends Window> DeltaTrigger<T, W> of(double threshold, DeltaFunction<T> deltaFunction) {
- return new DeltaTrigger<>(threshold, deltaFunction);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java
deleted file mode 100644
index 4b6613c..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.api.windowing.triggers;
-
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-
-/**
- * A {@link Trigger} that fires once the watermark passes the end of the window
- * to which a pane belongs.
- *
- * @see org.apache.flink.streaming.api.watermark.Watermark
- */
-public class EventTimeTrigger implements Trigger<Object, TimeWindow> {
- private static final long serialVersionUID = 1L;
-
- private EventTimeTrigger() {}
-
- @Override
- public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
- ctx.registerEventTimeTimer(window.maxTimestamp());
- return TriggerResult.CONTINUE;
- }
-
- @Override
- public TriggerResult onEventTime(long time, TriggerContext ctx) {
- return TriggerResult.FIRE_AND_PURGE;
- }
-
- @Override
- public TriggerResult onProcessingTime(long time,
- TriggerContext ctx) throws Exception {
- return TriggerResult.CONTINUE;
- }
-
- @Override
- public String toString() {
- return "EventTimeTrigger()";
- }
-
- /**
- * Creates trigger that fires once the watermark passes the end of the window.
- */
- public static EventTimeTrigger create() {
- return new EventTimeTrigger();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java
deleted file mode 100644
index 6278ba6..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.api.windowing.triggers;
-
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-
-/**
- * A {@link Trigger} that fires once the current system time passes the end of the window
- * to which a pane belongs.
- */
-public class ProcessingTimeTrigger implements Trigger<Object, TimeWindow> {
- private static final long serialVersionUID = 1L;
-
- private ProcessingTimeTrigger() {}
-
- @Override
- public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) {
- ctx.registerProcessingTimeTimer(window.maxTimestamp());
- return TriggerResult.CONTINUE;
- }
-
- @Override
- public TriggerResult onEventTime(long time,
- TriggerContext ctx) throws Exception {
- return TriggerResult.CONTINUE;
- }
-
- @Override
- public TriggerResult onProcessingTime(long time, TriggerContext ctx) {
- return TriggerResult.FIRE_AND_PURGE;
- }
-
- @Override
- public String toString() {
- return "ProcessingTimeTrigger()";
- }
-
- /**
- * Creates a new trigger that fires once system time passes the end of the window.
- */
- public static ProcessingTimeTrigger create() {
- return new ProcessingTimeTrigger();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java
deleted file mode 100644
index eaca336..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.api.windowing.triggers;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.flink.streaming.api.windowing.windows.Window;
-
-/**
- * A trigger that can turn any {@link Trigger} into a purging {@code Trigger}.
- *
- * <p>
- * When the nested trigger fires, this will return a {@code FIRE_AND_PURGE}
- * {@link org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerResult}
- *
- * @param <T> The type of elements on which this trigger can operate.
- * @param <W> The type of {@link Window Windows} on which this trigger can operate.
- */
-public class PurgingTrigger<T, W extends Window> implements Trigger<T, W> {
- private static final long serialVersionUID = 1L;
-
- private Trigger<T, W> nestedTrigger;
-
- private PurgingTrigger(Trigger<T, W> nestedTrigger) {
- this.nestedTrigger = nestedTrigger;
- }
-
- @Override
- public TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception {
- TriggerResult triggerResult = nestedTrigger.onElement(element, timestamp, window, ctx);
- switch (triggerResult) {
- case FIRE:
- return TriggerResult.FIRE_AND_PURGE;
- case FIRE_AND_PURGE:
- return TriggerResult.FIRE_AND_PURGE;
- default:
- return TriggerResult.CONTINUE;
- }
- }
-
- @Override
- public TriggerResult onEventTime(long time, TriggerContext ctx) throws Exception {
- TriggerResult triggerResult = nestedTrigger.onEventTime(time, ctx);
- switch (triggerResult) {
- case FIRE:
- return TriggerResult.FIRE_AND_PURGE;
- case FIRE_AND_PURGE:
- return TriggerResult.FIRE_AND_PURGE;
- default:
- return TriggerResult.CONTINUE;
- }
- }
-
- @Override
- public TriggerResult onProcessingTime(long time, TriggerContext ctx) throws Exception {
- TriggerResult triggerResult = nestedTrigger.onProcessingTime(time, ctx);
- switch (triggerResult) {
- case FIRE:
- return TriggerResult.FIRE_AND_PURGE;
- case FIRE_AND_PURGE:
- return TriggerResult.FIRE_AND_PURGE;
- default:
- return TriggerResult.CONTINUE;
- }
- }
-
- @Override
- public String toString() {
- return "PurgingTrigger(" + nestedTrigger.toString() + ")";
- }
-
- /**
- * Creates a new purging trigger from the given {@code Trigger}.
- *
- * @param nestedTrigger The trigger that is wrapped by this purging trigger
- */
- public static <T, W extends Window> PurgingTrigger<T, W> of(Trigger<T, W> nestedTrigger) {
- return new PurgingTrigger<>(nestedTrigger);
- }
-
- @VisibleForTesting
- public Trigger<T, W> getNestedTrigger() {
- return nestedTrigger;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
deleted file mode 100644
index ef8110b..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.api.windowing.triggers;
-
-import org.apache.flink.api.common.state.OperatorState;
-import org.apache.flink.streaming.api.windowing.windows.Window;
-
-import java.io.Serializable;
-
-/**
- * A {@code Trigger} determines when a pane of a window should be evaluated to emit the
- * results for that part of the window.
- *
- * <p>
- * A pane is the bucket of elements that have the same key (assigned by the
- * {@link org.apache.flink.api.java.functions.KeySelector}) and same {@link Window}. An element can
- * be in multiple panes of it was assigned to multiple windows by the
- * {@link org.apache.flink.streaming.api.windowing.assigners.WindowAssigner}. These panes all
- * have their own instance of the {@code Trigger}.
- *
- * <p>
- * Triggers must not maintain state internally since they can be re-created or reused for
- * different keys. All necessary state should be persisted using the state abstraction
- * available on the {@link org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext}.
- *
- * @param <T> The type of elements on which this {@code Trigger} works.
- * @param <W> The type of {@link Window Windows} on which this {@code Trigger} can operate.
- */
-public interface Trigger<T, W extends Window> extends Serializable {
-
- /**
- * Called for every element that gets added to a pane. The result of this will determine
- * whether the pane is evaluated to emit results.
- *
- * @param element The element that arrived.
- * @param timestamp The timestamp of the element that arrived.
- * @param window The window to which this pane belongs.
- * @param ctx A context object that can be used to register timer callbacks.
- */
- TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception;
-
- /**
- * Called when a processing-time timer that was set using the trigger context fires.
- *
- * @param time The timestamp at which the timer fired.
- * @param ctx A context object that can be used to register timer callbacks.
- */
- TriggerResult onProcessingTime(long time, TriggerContext ctx) throws Exception;
-
- /**
- * Called when an event-time timer that was set using the trigger context fires.
- *
- * @param time The timestamp at which the timer fired.
- * @param ctx A context object that can be used to register timer callbacks.
- */
- TriggerResult onEventTime(long time, TriggerContext ctx) throws Exception;
-
-
- /**
- * Result type for trigger methods. This determines what happens which the window.
- *
- * <p>
- * On {@code FIRE} the pane is evaluated and results are emitted. The contents of the window
- * are kept. {@code FIRE_AND_PURGE} acts like {@code FIRE} but the contents of the pane
- * are purged. On {@code CONTINUE} nothing happens, processing continues.
- */
- enum TriggerResult {
- CONTINUE, FIRE_AND_PURGE, FIRE
- }
-
- /**
- * A context object that is given to {@code Trigger} methods to allow them to register timer
- * callbacks and deal with state.
- */
- interface TriggerContext {
-
- /**
- * Register a system time callback. When the current system time passes the specified
- * time {@link #onProcessingTime(long, TriggerContext)} is called with the time specified here.
- *
- * @param time The time at which to invoke {@link #onProcessingTime(long, TriggerContext)}
- */
- void registerProcessingTimeTimer(long time);
-
- /**
- * Register an event-time callback. When the current watermark passes the specified
- * time {@link #onEventTime(long, TriggerContext)} is called with the time specified here.
- *
- * @see org.apache.flink.streaming.api.watermark.Watermark
- *
- * @param time The watermark at which to invoke {@link #onEventTime(long, TriggerContext)}
- */
- void registerEventTimeTimer(long time);
-
- /**
- * Retrieves an {@link OperatorState} object that can be used to interact with
- * fault-tolerant state that is scoped to the window and key of the current
- * trigger invocation.
- *
- * @param name A unique key for the state.
- * @param defaultState The default value of the state.
- */
- <S extends Serializable> OperatorState<S> getKeyValueState(final String name, final S defaultState);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windows/GlobalWindow.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windows/GlobalWindow.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windows/GlobalWindow.java
deleted file mode 100644
index f20c779..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windows/GlobalWindow.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.api.windowing.windows;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-
-import java.io.IOException;
-
-public class GlobalWindow extends Window {
-
- private static GlobalWindow INSTANCE = new GlobalWindow();
-
- private GlobalWindow() { }
-
- public static GlobalWindow get() {
- return INSTANCE;
- }
-
- @Override
- public long maxTimestamp() {
- return Long.MAX_VALUE;
- }
-
- @Override
- public boolean equals(Object o) {
- return this == o || !(o == null || getClass() != o.getClass());
- }
-
- @Override
- public int hashCode() {
- return 0;
- }
-
- @Override
- public String toString() {
- return "GlobalWindow";
- }
-
- public static class Serializer extends TypeSerializer<GlobalWindow> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public boolean isImmutableType() {
- return true;
- }
-
- @Override
- public TypeSerializer<GlobalWindow> duplicate() {
- return this;
- }
-
- @Override
- public GlobalWindow createInstance() {
- return GlobalWindow.INSTANCE;
- }
-
- @Override
- public GlobalWindow copy(GlobalWindow from) {
- return from;
- }
-
- @Override
- public GlobalWindow copy(GlobalWindow from, GlobalWindow reuse) {
- return from;
- }
-
- @Override
- public int getLength() {
- return 0;
- }
-
- @Override
- public void serialize(GlobalWindow record, DataOutputView target) throws IOException {
- target.writeByte(0);
- }
-
- @Override
- public GlobalWindow deserialize(DataInputView source) throws IOException {
- source.readByte();
- return GlobalWindow.INSTANCE;
- }
-
- @Override
- public GlobalWindow deserialize(GlobalWindow reuse,
- DataInputView source) throws IOException {
- source.readByte();
- return GlobalWindow.INSTANCE;
- }
-
- @Override
- public void copy(DataInputView source, DataOutputView target) throws IOException {
- source.readByte();
- target.writeByte(0);
- }
-
- @Override
- public boolean equals(Object obj) {
- return obj instanceof Serializer;
- }
-
- @Override
- public boolean canEqual(Object obj) {
- return obj instanceof Serializer;
- }
-
- @Override
- public int hashCode() {
- return 0;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java
deleted file mode 100644
index 0c4c2a8..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.api.windowing.windows;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-
-import java.io.IOException;
-
-/**
- * A {@link Window} that represents a time interval from {@code start} (inclusive) to
- * {@code start + size} (exclusive).
- */
-public class TimeWindow extends Window {
-
- private final long start;
- private final long end;
-
- public TimeWindow(long start, long end) {
- this.start = start;
- this.end = end;
- }
-
- public long getStart() {
- return start;
- }
-
- public long getEnd() {
- return end;
- }
-
- @Override
- public long maxTimestamp() {
- return end - 1;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
-
- TimeWindow window = (TimeWindow) o;
-
- return end == window.end && start == window.start;
- }
-
- @Override
- public int hashCode() {
- int result = (int) (start ^ (start >>> 32));
- result = 31 * result + (int) (end ^ (end >>> 32));
- return result;
- }
-
- @Override
- public String toString() {
- return "TimeWindow{" +
- "start=" + start +
- ", end=" + end +
- '}';
- }
-
- public static class Serializer extends TypeSerializer<TimeWindow> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public boolean isImmutableType() {
- return true;
- }
-
- @Override
- public TypeSerializer<TimeWindow> duplicate() {
- return this;
- }
-
- @Override
- public TimeWindow createInstance() {
- return null;
- }
-
- @Override
- public TimeWindow copy(TimeWindow from) {
- return from;
- }
-
- @Override
- public TimeWindow copy(TimeWindow from, TimeWindow reuse) {
- return from;
- }
-
- @Override
- public int getLength() {
- return 0;
- }
-
- @Override
- public void serialize(TimeWindow record, DataOutputView target) throws IOException {
- target.writeLong(record.start);
- target.writeLong(record.end);
- }
-
- @Override
- public TimeWindow deserialize(DataInputView source) throws IOException {
- long start = source.readLong();
- long end = source.readLong();
- return new TimeWindow(start, end);
- }
-
- @Override
- public TimeWindow deserialize(TimeWindow reuse, DataInputView source) throws IOException {
- long start = source.readLong();
- long end = source.readLong();
- return new TimeWindow(start, end);
- }
-
- @Override
- public void copy(DataInputView source, DataOutputView target) throws IOException {
- target.writeLong(source.readLong());
- target.writeLong(source.readLong());
- }
-
- @Override
- public boolean equals(Object obj) {
- return obj instanceof Serializer;
- }
-
- @Override
- public boolean canEqual(Object obj) {
- return obj instanceof Serializer;
- }
-
- @Override
- public int hashCode() {
- return 0;
- }
- }
-
-}