You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/09 12:16:42 UTC
[06/10] flink git commit: [FLINK-2780] Remove Old Windowing Logic and
API
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CloneableEvictionPolicy.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CloneableEvictionPolicy.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CloneableEvictionPolicy.java
deleted file mode 100644
index 6bc5072..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CloneableEvictionPolicy.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.policy;
-
-
-/**
- * When used in grouped windowing, eviction policies must
- * provide a clone method. Eviction policies get cloned to provide an own
- * instance for each group and respectively each individual element buffer as
- * groups maintain their own buffers with the elements belonging to the
- * respective group.
- *
- * This interface extends {@link EvictionPolicy} with such a clone method. It
- * also adds the Java {@link Cloneable} interface as flag.
- *
- * @param <DATA>
- * The data type handled by this policy
- */
-public interface CloneableEvictionPolicy<DATA> extends EvictionPolicy<DATA>, Cloneable {
-
- /**
- * This method should return an exact copy of the object it belongs to
- * including the current object state.
- *
- * @return a copy of this object
- */
- public CloneableEvictionPolicy<DATA> clone();
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CloneableMultiEvictionPolicy.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CloneableMultiEvictionPolicy.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CloneableMultiEvictionPolicy.java
deleted file mode 100644
index 5adddc4..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CloneableMultiEvictionPolicy.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.policy;
-
-import java.util.LinkedList;
-
-/**
- * This policy does the same as {@link MultiEvictionPolicy}. Additionally it is
- * cloneable and only cloneable policies can be passed to the constructor.
- *
- * @param <DATA>
- * The type of data handled by this policy
- */
-public class CloneableMultiEvictionPolicy<DATA> extends MultiEvictionPolicy<DATA> implements
- CloneableEvictionPolicy<DATA> {
-
- /**
- * Default version id.
- */
- private static final long serialVersionUID = 1L;
-
- private CloneableEvictionPolicy<DATA>[] allPolicies;
- private EvictionStrategy strategy;
-
- /**
- * This policy does the same as {@link MultiTriggerPolicy}. Additionally it
- * is cloneable and only cloneable policies can be passed to the
- * constructor.
- *
- * When using this constructor the MAX strategy is used by default. You can
- * select other strategies using
- * {@link CloneableMultiEvictionPolicy#CloneableMultiEvictionPolicy(EvictionStrategy, CloneableEvictionPolicy...)}
- * .
- *
- * @param evictionPolicies
- * some cloneable policies to be tied together.
- */
- public CloneableMultiEvictionPolicy(CloneableEvictionPolicy<DATA>... evictionPolicies) {
- this(EvictionStrategy.MAX, evictionPolicies);
- }
-
- /**
- * This policy does the same as {@link MultiTriggerPolicy}. Additionally it
- * is cloneable and only cloneable policies can be passed to the
- * constructor.
- *
- * @param strategy
- * the strategy to be used. See {@link MultiEvictionPolicy.EvictionStrategy} for a
- * list of possible options.
- * @param evictionPolicies
- * some cloneable policies to be tied together.
- */
- public CloneableMultiEvictionPolicy(EvictionStrategy strategy,
- CloneableEvictionPolicy<DATA>... evictionPolicies) {
- super(strategy, evictionPolicies);
- this.allPolicies = evictionPolicies;
- this.strategy = strategy;
- }
-
- @SuppressWarnings("unchecked")
- public CloneableEvictionPolicy<DATA> clone() {
- LinkedList<CloneableEvictionPolicy<DATA>> clonedPolicies = new LinkedList<CloneableEvictionPolicy<DATA>>();
- for (int i = 0; i < allPolicies.length; i++) {
- clonedPolicies.add(allPolicies[i].clone());
- }
- return new CloneableMultiEvictionPolicy<DATA>(strategy,
- clonedPolicies.toArray(new CloneableEvictionPolicy[allPolicies.length]));
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CloneableMultiTriggerPolicy.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CloneableMultiTriggerPolicy.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CloneableMultiTriggerPolicy.java
deleted file mode 100644
index aaecefb..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CloneableMultiTriggerPolicy.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.policy;
-
-import java.util.LinkedList;
-
-/**
- * This policy does the same as {@link MultiTriggerPolicy}. Additionally it is
- * cloneable and only cloneable policies can be passed to the constructor.
- *
- * @param <DATA>
- * The type of data handled by this policy
- */
-public class CloneableMultiTriggerPolicy<DATA> extends MultiTriggerPolicy<DATA> implements
- CloneableTriggerPolicy<DATA>, Cloneable {
-
- /**
- * Default version id.
- */
- private static final long serialVersionUID = 1L;
-
- private CloneableTriggerPolicy<DATA>[] allPolicies;
-
- /**
- * This policy does the same as {@link MultiTriggerPolicy}. Additionally it
- * is cloneable and only cloneable policies can be passed to the
- * constructor.
- *
- * @param policies
- * some cloneable policies to be tied together.
- */
- public CloneableMultiTriggerPolicy(CloneableTriggerPolicy<DATA>... policies) {
- super(policies);
- this.allPolicies = policies;
- }
-
- @SuppressWarnings("unchecked")
- public CloneableTriggerPolicy<DATA> clone() {
- LinkedList<CloneableTriggerPolicy<DATA>> clonedPolicies = new LinkedList<CloneableTriggerPolicy<DATA>>();
- for (int i = 0; i < allPolicies.length; i++) {
- clonedPolicies.add(allPolicies[i].clone());
- }
- return new CloneableMultiTriggerPolicy<DATA>(
- clonedPolicies.toArray(new CloneableTriggerPolicy[allPolicies.length]));
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CloneableTriggerPolicy.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CloneableTriggerPolicy.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CloneableTriggerPolicy.java
deleted file mode 100644
index 3f55f41..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CloneableTriggerPolicy.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.policy;
-
-
-/**
- * When used in grouped windowing, trigger policies can provide
- * a clone method. Cloneable triggers can be used in a distributed manner,
- * which means they get cloned to provide an own instance for each group. This
- * allows each group to trigger individually and only based on the elements
- * belonging to the respective group.
- *
- * This interface extends {@link TriggerPolicy} with such a clone method. It
- * also adds the Java {@link Cloneable} interface as flag.
- *
- * @param <DATA>
- * The data type handled by this policy
- */
-public interface CloneableTriggerPolicy<DATA> extends TriggerPolicy<DATA>, Cloneable {
-
- /**
- * This method should return an exact copy of the object it belongs to
- * including the current object state.
- *
- * @return a copy of this object
- */
- public CloneableTriggerPolicy<DATA> clone();
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountEvictionPolicy.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountEvictionPolicy.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountEvictionPolicy.java
deleted file mode 100644
index 9be25d3..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountEvictionPolicy.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.policy;
-
-/**
- * This eviction policy allows the eviction of data points from the buffer using
- * a counter of arriving elements and a threshold (maximal buffer size)
- *
- * By default this policy does not react on fake elements. Wrap it in an
- * {@link ActiveEvictionPolicyWrapper} to make it count even fake elements.
- *
- * @param <IN>
- * the type of the incoming data points
- */
-public class CountEvictionPolicy<IN> implements CloneableEvictionPolicy<IN> {
-
- /**
- * Auto generated version id
- */
- private static final long serialVersionUID = 2319201348806427996L;
-
- private int maxElements;
- private int counter;
- private int deleteOnEviction = 1;
- private int startValue;
-
- /**
- * This constructor allows the setup of the simplest possible count based
- * eviction. It keeps the size of the buffer according to the given
- * maxElements parameter by deleting the oldest element in the buffer.
- * Eviction only takes place if the counter of arriving elements would be
- * higher than maxElements without eviction.
- *
- * @param maxElements
- * The maximum number of elements before eviction. As soon as one
- * more element arrives, the oldest element will be deleted
- */
- public CountEvictionPolicy(int maxElements) {
- this(maxElements, 1);
- }
-
- /**
- * This constructor allows to set up both, the maximum number of elements
- * and the number of elements to be deleted in case of an eviction.
- *
- * Eviction only takes place if the counter of arriving elements would be
- * higher than maxElements without eviction. In such a case deleteOnEviction
- * elements will be removed from the buffer.
- *
- * The counter of arriving elements is adjusted respectively, but never set
- * below zero:
- * counter=(counter-deleteOnEviction<0)?0:counter-deleteOnEviction
- *
- * @param maxElements
- * maxElements The maximum number of elements before eviction.
- * @param deleteOnEviction
- * The number of elements to be deleted on eviction. The counter
- * will be adjusted respectively but never below zero.
- */
- public CountEvictionPolicy(int maxElements, int deleteOnEviction) {
- this(maxElements, deleteOnEviction, 0);
- }
-
- /**
- * The same as {@link CountEvictionPolicy#CountEvictionPolicy(int, int)}.
- * Additionally a custom start value for the counter of arriving elements
- * can be set. By setting a negative start value the first eviction can be
- * delayed.
- *
- * @param maxElements
- * maxElements The maximum number of elements before eviction.
- * @param deleteOnEviction
- * The number of elements to be deleted on eviction. The counter
- * will be adjusted respectively but never below zero.
- * @param startValue
- * A custom start value for the counter of arriving elements.
- * @see CountEvictionPolicy#CountEvictionPolicy(int, int)
- */
- public CountEvictionPolicy(int maxElements, int deleteOnEviction, int startValue) {
- this.counter = startValue;
- this.deleteOnEviction = deleteOnEviction;
- this.maxElements = maxElements;
- this.startValue = startValue;
- }
-
- @Override
- public int notifyEviction(IN datapoint, boolean triggered, int bufferSize) {
- // The comparison have to be >= and not == to cover case max=0
- if (counter >= maxElements) {
- // Adjust the counter according to the current eviction
- counter = (counter - deleteOnEviction < 0) ? 0 : counter - deleteOnEviction;
- // The current element will be added after the eviction
- // Therefore, increase counter in any case
- counter++;
- return deleteOnEviction;
- } else {
- counter++;
- return 0;
- }
- }
-
- @Override
- public CountEvictionPolicy<IN> clone() {
- return new CountEvictionPolicy<IN>(maxElements, deleteOnEviction, counter);
- }
-
- @Override
- public boolean equals(Object other) {
- if (other == null || !(other instanceof CountEvictionPolicy)) {
- return false;
- } else {
- try {
- @SuppressWarnings("unchecked")
- CountEvictionPolicy<IN> otherPolicy = (CountEvictionPolicy<IN>) other;
- return startValue == otherPolicy.startValue
- && deleteOnEviction == otherPolicy.deleteOnEviction
- && maxElements == otherPolicy.maxElements;
- } catch (ClassCastException e) {
- return false;
- }
- }
- }
-
- public int getWindowSize() {
- return maxElements;
- }
-
- public int getStart() {
- return startValue;
- }
-
- public int getDeleteOnEviction(){
- return deleteOnEviction;
- }
-
- @Override
- public String toString() {
- return "CountPolicy(" + maxElements + ")";
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountTriggerPolicy.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountTriggerPolicy.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountTriggerPolicy.java
deleted file mode 100644
index 9bd6f82..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountTriggerPolicy.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.policy;
-
-/**
- * This policy triggers at every n'th element.
- *
- * @param <IN>
- * The type of the data points which are handled by this policy
- */
-public class CountTriggerPolicy<IN> implements CloneableTriggerPolicy<IN> {
-
- /**
- * Auto generated version ID
- */
- private static final long serialVersionUID = -6357200688886103968L;
-
- public static final int DEFAULT_START_VALUE = 0;
-
- private int counter;
- private int max;
- private int startValue;
-
- /**
- * This constructor will set up a count based trigger, which triggers after
- * max elements have arrived.
- *
- * @param max
- * The number of arriving elements before the trigger occurs.
- */
- public CountTriggerPolicy(int max) {
- this(max, DEFAULT_START_VALUE);
- }
-
- /**
- * In addition to {@link CountTriggerPolicy#CountTriggerPolicy(int)} this
- * constructor allows to set a custom start value for the element counter.
- * This can be used to delay the first trigger by setting a negative start
- * value. Often the first trigger should be delayed in case of sliding
- * windows. For example if the size of a window should be 4 and a trigger
- * should happen every 2, a start value of -2 would allow to also have the
- * first window of size 4.
- *
- * @param max
- * The number of arriving elements before the trigger occurs.
- * @param startValue
- * The start value for the counter of arriving elements.
- * @see CountTriggerPolicy#CountTriggerPolicy(int)
- */
- public CountTriggerPolicy(int max, int startValue) {
- this.max = max;
- this.counter = startValue;
- this.startValue = startValue;
- }
-
- @Override
- public boolean notifyTrigger(IN datapoint) {
- // The comparison have to be >= and not == to cover case max=0
- if (counter >= max) {
- // The current data point will be part of the next window!
- // Therefore the counter needs to be set to one already.
- counter = 1;
- return true;
- } else {
- counter++;
- return false;
- }
- }
-
- @Override
- public CountTriggerPolicy<IN> clone() {
- return new CountTriggerPolicy<IN>(max, counter);
- }
-
- @Override
- public boolean equals(Object other) {
- if (other == null || !(other instanceof CountTriggerPolicy)) {
- return false;
- } else {
- try {
- @SuppressWarnings("unchecked")
- CountTriggerPolicy<IN> otherPolicy = (CountTriggerPolicy<IN>) other;
- return max == otherPolicy.max && startValue == otherPolicy.startValue;
- } catch (ClassCastException e) {
- return false;
- }
- }
- }
-
- public int getSlideSize() {
- return max;
- }
-
- public int getStart() {
- return startValue;
- }
-
- @Override
- public String toString() {
- return "CountPolicy(" + max + ")";
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/DeltaPolicy.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/DeltaPolicy.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/DeltaPolicy.java
deleted file mode 100644
index 0b6a493..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/DeltaPolicy.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.policy;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.util.LinkedList;
-import java.util.List;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.util.DataInputDeserializer;
-import org.apache.flink.runtime.util.DataOutputSerializer;
-import org.apache.flink.streaming.api.functions.windowing.delta.DeltaFunction;
-
-/**
- * This policy calculates a delta between the data point which triggered last
- * and the currently arrived data point. It triggers if the delta is higher than
- * a specified threshold.
- *
- * In case it gets used for eviction, this policy starts from the first element
- * of the buffer and removes all elements from the buffer which have a higher
- * delta then the threshold. As soon as there is an element with a lower delta,
- * the eviction stops.
- *
- * By default this policy does not react on fake elements. Wrap it in an
- * {@link ActiveEvictionPolicyWrapper} to make it calculate the delta even on
- * fake elements.
- *
- * @param <DATA>
- * The type of the data points which are handled by this policy
- */
-public class DeltaPolicy<DATA> implements CloneableTriggerPolicy<DATA>,
- CloneableEvictionPolicy<DATA> {
-
- /**
- * Auto generated version ID
- */
- private static final long serialVersionUID = -7797538922123394967L;
-
- //Used for serializing the threshold
- private final static int INITIAL_SERIALIZER_BYTES = 1024;
-
- protected DeltaFunction<DATA> deltaFuntion;
- private List<DATA> windowBuffer;
- protected double threshold;
- private TypeSerializer<DATA> typeSerializer;
- protected transient DATA triggerDataPoint;
-
- /**
- * Creates a delta policy which calculates a delta between the data point
- * which triggered last and the currently arrived data point. It triggers if
- * the delta is higher than a specified threshold. As the data may be sent to
- * the cluster a {@link TypeSerializer} is needed for the initial value.
- *
- * <p>
- * In case it gets used for eviction, this policy starts from the first
- * element of the buffer and removes all elements from the buffer which have
- * a higher delta then the threshold. As soon as there is an element with a
- * lower delta, the eviction stops.
- * </p>
- *
- * @param deltaFuntion
- * The delta function to be used.
- * @param init
- * The initial to be used for the calculation of a delta before
- * the first trigger.
- * @param threshold
- * The threshold upon which a triggering should happen.
- * @param typeSerializer
- * TypeSerializer to properly forward the initial value to
- * the cluster
- */
- @SuppressWarnings("unchecked")
- public DeltaPolicy(DeltaFunction<DATA> deltaFuntion, DATA init, double threshold, TypeSerializer typeSerializer) {
- this.deltaFuntion = deltaFuntion;
- this.triggerDataPoint = init;
- this.windowBuffer = new LinkedList<DATA>();
- this.threshold = threshold;
- this.typeSerializer = typeSerializer;
- }
-
- @Override
- public boolean notifyTrigger(DATA datapoint) {
- if (deltaFuntion.getDelta(this.triggerDataPoint, datapoint) > this.threshold) {
- this.triggerDataPoint = datapoint;
- return true;
- } else {
- return false;
- }
- }
-
- @Override
- public int notifyEviction(DATA datapoint, boolean triggered, int bufferSize) {
- windowBuffer = windowBuffer.subList(windowBuffer.size() - bufferSize, bufferSize);
- int evictCount = 0;
- for (DATA bufferPoint : windowBuffer) {
- if (deltaFuntion.getDelta(bufferPoint, datapoint) < this.threshold) {
- break;
- }
- evictCount++;
- }
-
- if (evictCount > 0) {
- windowBuffer = windowBuffer.subList(evictCount, windowBuffer.size());
- }
- windowBuffer.add(datapoint);
- return evictCount;
- }
-
- @Override
- public DeltaPolicy<DATA> clone() {
- return new DeltaPolicy<DATA>(deltaFuntion, triggerDataPoint, threshold, typeSerializer);
- }
-
- @Override
- public boolean equals(Object other) {
- if (other == null || !(other instanceof DeltaPolicy)) {
- return false;
- } else {
- try {
- @SuppressWarnings("unchecked")
- DeltaPolicy<DATA> otherPolicy = (DeltaPolicy<DATA>) other;
- return threshold == otherPolicy.threshold
- && deltaFuntion.getClass() == otherPolicy.deltaFuntion.getClass()
- && triggerDataPoint.equals(otherPolicy.triggerDataPoint);
- } catch (ClassCastException e) {
- return false;
- }
- }
- }
-
- @Override
- public String toString() {
- return "DeltaPolicy(" + threshold + ", " + deltaFuntion.getClass().getSimpleName() + ")";
- }
-
- private void writeObject(ObjectOutputStream stream) throws IOException{
- stream.defaultWriteObject();
- DataOutputSerializer dataOutputSerializer = new DataOutputSerializer(INITIAL_SERIALIZER_BYTES);
- typeSerializer.serialize(triggerDataPoint, dataOutputSerializer);
- stream.write(dataOutputSerializer.getByteArray());
- }
-
- @SuppressWarnings("unchecked")
- private void readObject(ObjectInputStream stream) throws IOException, ClassNotFoundException {
- stream.defaultReadObject();
- byte[] bytes = new byte[stream.available()];
- stream.readFully(bytes);
- triggerDataPoint = typeSerializer.deserialize(new DataInputDeserializer(bytes, 0, bytes.length));
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/EvictionPolicy.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/EvictionPolicy.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/EvictionPolicy.java
deleted file mode 100644
index b95053a..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/EvictionPolicy.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.policy;
-
-import java.io.Serializable;
-
-/**
- * An eviction policy specifies under which condition data points should be
- * deleted from the buffer. Deletions must be done only in the order the
- * elements arrived. Therefore, the policy only returns the number of elements
- * to evict on each element arrival.
- *
- * @param <DATA>
- * the type of the data handled by this policy
- */
-public interface EvictionPolicy<DATA> extends Serializable {
-
- /**
- * Proves if and how many elements should be deleted from the element
- * buffer. The eviction takes place after the trigger and after the call to
- * the UDF but before the adding of the new data point.
- *
- * @param datapoint
- * data point the data point which arrived
- * @param triggered
- * Information whether the UDF was triggered or not
- * @param bufferSize
- * The current size of the element buffer at the operator
- * @return The number of elements to be deleted from the buffer
- */
- public int notifyEviction(DATA datapoint, boolean triggered, int bufferSize);
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/KeepAllEvictionPolicy.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/KeepAllEvictionPolicy.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/KeepAllEvictionPolicy.java
deleted file mode 100644
index 6fad749..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/KeepAllEvictionPolicy.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.policy;
-
-public class KeepAllEvictionPolicy<T> implements EvictionPolicy<T> {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public int notifyEviction(T datapoint, boolean triggered, int bufferSize) {
- return 0;
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/MultiEvictionPolicy.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/MultiEvictionPolicy.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/MultiEvictionPolicy.java
deleted file mode 100644
index 79e8119..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/MultiEvictionPolicy.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.policy;
-
-import java.util.LinkedList;
-import java.util.List;
-
-/**
- * This policy provides the ability to use multiple eviction policies at the
- * same time. It allows to use both, active and not active evictions.
- *
- * @param <DATA>
- * The type of data-items handled by the policies
- */
-public class MultiEvictionPolicy<DATA> implements ActiveEvictionPolicy<DATA> {
-
- /**
- * Default version id.
- */
- private static final long serialVersionUID = 1L;
-
- /**
- * This enum provides the different options for the eviction strategy.
- *
- * You can choose from the following:
- * <ul>
- * <li>MIN: The number of elements to evict will be the smallest one which
- * is greater than 0 and was returned by any of the given policies. If all
- * policies return 0, the result is 0.</li>
- * <li>MAX: The number of elements to evict will be the greatest one which
- * was returned by any of the given policies.</li>
- * <li>SUM: The number of elements to evict will be the sum of all values
- * returned by the nested eviction policies.</li>
- * <li>PRIORITY: Depending on the order in which the policies have been
- * passed to the constructor, the first return value greater than 0 will be
- * the the number of elements to evict. If all policies return 0, the result
- * is 0.</li>
- * </ul>
- */
- public enum EvictionStrategy {
- MIN, MAX, SUM, PRIORITY
- }
-
- private List<EvictionPolicy<DATA>> allEvictionPolicies;
- private List<ActiveEvictionPolicy<DATA>> activeEvictionPolicies;
- private EvictionStrategy selectedStrategy;
-
- /**
- * This policy provides the ability to use multiple eviction policies at the
- * same time. It allows to use both, active and not active evictions.
- *
- * When using this constructor the MAX strategy is used by default. You can
- * select other strategies using
- * {@link MultiEvictionPolicy#MultiEvictionPolicy(EvictionStrategy, EvictionPolicy...)}
- * .
- *
- * @param evictionPolicies
- * Any active or not active eviction policies. Both types can be
- * used at the same time.
- */
- public MultiEvictionPolicy(EvictionPolicy<DATA>... evictionPolicies) {
- this(EvictionStrategy.MAX, evictionPolicies);
- }
-
- /**
- * This policy provides the ability to use multiple eviction policies at the
- * same time. It allows to use both, active and not active evictions.
- *
- * @param strategy
- * the strategy to be used. See {@link EvictionStrategy} for a
- * list of possible options.
- * @param evictionPolicies
- * Any active or not active eviction policies. Both types can be
- * used at the same time.
- */
- public MultiEvictionPolicy(EvictionStrategy strategy, EvictionPolicy<DATA>... evictionPolicies) {
- // initialize lists of policies
- this.allEvictionPolicies = new LinkedList<EvictionPolicy<DATA>>();
- this.activeEvictionPolicies = new LinkedList<ActiveEvictionPolicy<DATA>>();
-
- // iterate over policies and add them to the lists
- for (EvictionPolicy<DATA> ep : evictionPolicies) {
- this.allEvictionPolicies.add(ep);
- if (ep instanceof ActiveEvictionPolicy) {
- this.activeEvictionPolicies.add((ActiveEvictionPolicy<DATA>) ep);
- }
- }
-
- // Remember eviction strategy
- this.selectedStrategy = strategy;
- }
-
- @Override
- public int notifyEviction(DATA datapoint, boolean triggered, int bufferSize) {
- LinkedList<Integer> results = new LinkedList<Integer>();
- for (EvictionPolicy<DATA> policy : allEvictionPolicies) {
- results.add(policy.notifyEviction(datapoint, triggered, bufferSize));
- }
- return getNumToEvict(results);
- }
-
- @Override
- public int notifyEvictionWithFakeElement(Object datapoint, int bufferSize) {
- LinkedList<Integer> results = new LinkedList<Integer>();
- for (ActiveEvictionPolicy<DATA> policy : activeEvictionPolicies) {
- results.add(policy.notifyEvictionWithFakeElement(datapoint, bufferSize));
- }
- return getNumToEvict(results);
- }
-
- private int getNumToEvict(LinkedList<Integer> items) {
- int result;
- switch (selectedStrategy) {
-
- case MIN:
- result = Integer.MAX_VALUE;
- for (Integer item : items) {
- if (result > item) {
- result = item;
- }
- }
- return result;
-
- case MAX:
- result = 0;
- for (Integer item : items) {
- if (result < item) {
- result = item;
- }
- }
- return result;
-
- case SUM:
- result = 0;
- for (Integer item : items) {
- result += item;
- }
- return result;
-
- case PRIORITY:
- for (Integer item : items) {
- if (item > 0) {
- return item;
- }
- }
- return 0;
-
- default:
- // The following line should never be reached. Just for the
- // compiler.
- return 0;
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/MultiTriggerPolicy.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/MultiTriggerPolicy.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/MultiTriggerPolicy.java
deleted file mode 100644
index a3c6a22..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/MultiTriggerPolicy.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.policy;
-
-import java.util.LinkedList;
-import java.util.List;
-
-/**
- * This class allows to use multiple trigger policies at the same time. It
- * allows to use both, active and not active triggers.
- *
- * @param <DATA>
- * the data type handled by this policy
- */
-public class MultiTriggerPolicy<DATA> implements ActiveTriggerPolicy<DATA> {
-
- /**
- * Default version id.
- */
- private static final long serialVersionUID = 1L;
-
- private List<TriggerPolicy<DATA>> allTriggerPolicies;
- private List<ActiveTriggerPolicy<DATA>> activeTriggerPolicies;
-
- /**
- * This policy allows to use multiple trigger policies at the same time. It
- * allows to use both, active and not active triggers.
- *
- * This policy triggers in case at least one of the nested policies
- * triggered. If active policies are nested all produces fake elements will
- * be returned.
- *
- * @param policies
- * Any active or not active trigger policies. Both types can be
- * used at the same time.
- */
- public MultiTriggerPolicy(TriggerPolicy<DATA>... policies) {
- allTriggerPolicies = new LinkedList<TriggerPolicy<DATA>>();
- activeTriggerPolicies = new LinkedList<ActiveTriggerPolicy<DATA>>();
-
- for (TriggerPolicy<DATA> policy : policies) {
- this.allTriggerPolicies.add(policy);
- if (policy instanceof ActiveTriggerPolicy) {
- this.activeTriggerPolicies.add((ActiveTriggerPolicy<DATA>) policy);
- }
- }
- }
-
- @Override
- public boolean notifyTrigger(DATA datapoint) {
- boolean trigger = false;
- for (TriggerPolicy<DATA> policy : allTriggerPolicies) {
- if (policy.notifyTrigger(datapoint)) {
- trigger = true;
- // Do not at a break here. All trigger must see the element!
- }
- }
- return trigger;
- }
-
- @Override
- public Object[] preNotifyTrigger(DATA datapoint) {
- List<Object> fakeElements = new LinkedList<Object>();
- for (ActiveTriggerPolicy<DATA> policy : activeTriggerPolicies) {
- for (Object fakeElement : policy.preNotifyTrigger(datapoint)) {
- fakeElements.add(fakeElement);
- }
- }
- return fakeElements.toArray();
- }
-
- @Override
- public Runnable createActiveTriggerRunnable(ActiveTriggerCallback callback) {
- List<Runnable> runnables = new LinkedList<Runnable>();
- for (ActiveTriggerPolicy<DATA> policy : activeTriggerPolicies) {
- Runnable tmp = policy.createActiveTriggerRunnable(callback);
- if (tmp != null) {
- runnables.add(tmp);
- }
- }
- if (runnables.size() == 0) {
- return null;
- } else {
- return new MultiActiveTriggerRunnable(runnables);
- }
- }
-
- /**
- * This class serves a nest for all active trigger runnables. Once the run
- * method gets executed, all the runnables are started in own threads.
- */
- private class MultiActiveTriggerRunnable implements Runnable {
-
- List<Runnable> runnables;
-
- MultiActiveTriggerRunnable(List<Runnable> runnables) {
- this.runnables = runnables;
- }
-
- @Override
- public void run() {
- for (Runnable runnable : runnables) {
- new Thread(runnable).start();
- }
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/PunctuationPolicy.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/PunctuationPolicy.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/PunctuationPolicy.java
deleted file mode 100644
index eaa8063..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/PunctuationPolicy.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.policy;
-
-import org.apache.flink.streaming.api.windowing.extractor.Extractor;
-
-/**
- * This policy can be used to trigger and evict based on a punctuation which is
- * present within the arriving data. Using this policy, one can react on an
- * externally defined arbitrary windowing semantic.
- *
- * In case this policy is used for eviction, the complete buffer will get
- * deleted in case the punctuation is detected.
- *
- * By default this policy does not react on fake elements. Wrap it in an
- * {@link ActiveEvictionPolicyWrapper} to make it react on punctuation even in
- * fake elements.
- *
- * @param <IN>
- * The type of the input data handled by this policy. An
- * {@link Extractor} can be used to extract DATA for IN.
- * @param <DATA>
- * The type of the punctuation. An {@link Extractor} can be used to
- * extract DATA for IN.
- */
-public class PunctuationPolicy<IN, DATA> implements CloneableTriggerPolicy<IN>,
- CloneableEvictionPolicy<IN> {
-
- /**
- * auto generated version id
- */
- private static final long serialVersionUID = -8845130188912602498L;
- private int counter = 0;
- private Extractor<IN, DATA> extractor;
- private DATA punctuation;
-
- /**
- * Creates the punctuation policy without using any extractor. To make this
- * work IN and DATA must not be different types.
- *
- * @param punctuation
- * the punctuation which leads to trigger/evict.
- */
- public PunctuationPolicy(DATA punctuation) {
- this(punctuation, null);
- }
-
- /**
- * Creates the punctuation policy which uses the specified extractor to
- * isolate the punctuation from the data.
- *
- * @param punctuation
- * the punctuation which leads to trigger/evict.
- * @param extractor
- * An {@link Extractor} which converts IN to DATA.
- */
- public PunctuationPolicy(DATA punctuation, Extractor<IN, DATA> extractor) {
- this.punctuation = punctuation;
- this.extractor = extractor;
- }
-
- @Override
- public int notifyEviction(IN datapoint, boolean triggered, int bufferSize) {
- if (notifyTrigger(datapoint)) {
- int tmp = counter;
- // As the current will be add after the eviction the counter needs
- // to be set to one already
- counter = 1;
- return tmp;
- } else {
- counter++;
- return 0;
- }
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public boolean notifyTrigger(IN datapoint) {
- DATA tmp;
-
- // eventually extract data
- if (extractor == null) {
- // unchecked convert (cannot check it here)
- tmp = (DATA) datapoint;
- } else {
- tmp = extractor.extract(datapoint);
- }
-
- // compare data with punctuation
- if (punctuation.equals(tmp)) {
- return true;
- } else {
- return false;
- }
- }
-
- @Override
- public PunctuationPolicy<IN, DATA> clone() {
- return new PunctuationPolicy<IN, DATA>(punctuation, extractor);
- }
-
- @Override
- public boolean equals(Object other) {
- if (other == null || !(other instanceof PunctuationPolicy)) {
- return false;
- } else {
- try {
- @SuppressWarnings("unchecked")
- PunctuationPolicy<IN, DATA> otherPolicy = (PunctuationPolicy<IN, DATA>) other;
- if (extractor != null) {
- return extractor.getClass() == otherPolicy.extractor.getClass()
- && punctuation.equals(otherPolicy.punctuation);
- } else {
- return punctuation.equals(otherPolicy.punctuation)
- && otherPolicy.extractor == null;
- }
-
- } catch (Exception e) {
- return false;
- }
- }
- }
-
- @Override
- public String toString() {
- return "PunctuationPolicy(" + punctuation
- + (extractor != null
- ? ", " + extractor.getClass().getSimpleName()
- : "")
- + ")";
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicy.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicy.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicy.java
deleted file mode 100644
index ae17e29..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicy.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.policy;
-
-import java.util.LinkedList;
-
-import org.apache.flink.streaming.api.windowing.helper.Timestamp;
-import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
-
-/**
- * This eviction policy evicts all elements which are older then a specified
- * time. The time is measured using a given {@link Timestamp} implementation. A
- * point in time is always represented as long. Therefore, the granularity can
- * be set as long value as well.
- *
- * @param <DATA>
- * The type of the incoming data points which are processed by this
- * policy.
- */
-public class TimeEvictionPolicy<DATA> implements ActiveEvictionPolicy<DATA>,
- CloneableEvictionPolicy<DATA> {
-
- /**
- * auto generated version id
- */
- private static final long serialVersionUID = -1457476766124518220L;
-
- private long granularity;
- private TimestampWrapper<DATA> timestampWrapper;
- private LinkedList<Long> buffer = new LinkedList<Long>();
-
- /**
- * This eviction policy evicts all elements which are older than a specified
- * time. The time is measured using a given {@link Timestamp}
- * implementation. A point in time is always represented as long. Therefore,
- * the granularity can be set as long value as well. If this value is set to
- * 2 the policy will evict all elements which are older as 2.
- *
- * <code>
- * while (time(firstInBuffer)<current time-granularity){
- * evict firstInBuffer;
- * }
- * </code>
- *
- * @param granularity
- * The granularity of the eviction. If this value is set to 2 the
- * policy will evict all elements which are older as 2(if
- * (time(X)<current time-granularity) evict X).
- * @param timestampWrapper
- * The {@link TimestampWrapper} to measure the time with. This
- * can be either user defined of provided by the API.
- */
- public TimeEvictionPolicy(long granularity, TimestampWrapper<DATA> timestampWrapper) {
- this.timestampWrapper = timestampWrapper;
- this.granularity = granularity;
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public int notifyEvictionWithFakeElement(Object datapoint, int bufferSize) {
- checkForDeleted(bufferSize);
-
- long threshold;
- try {
- threshold = (Long) datapoint - granularity;
- } catch (ClassCastException e) {
- threshold = timestampWrapper.getTimestamp((DATA) datapoint) - granularity;
- }
-
- // return result
- return deleteAndCountExpired(threshold);
-
- }
-
- @Override
- public int notifyEviction(DATA datapoint, boolean triggered, int bufferSize) {
-
- checkForDeleted(bufferSize);
-
- // remember timestamp
- long time = timestampWrapper.getTimestamp(datapoint);
-
- // delete and count expired tuples
- long threshold = time - granularity;
- int counter = deleteAndCountExpired(threshold);
-
- // Add current element to buffer
- buffer.add(time);
-
- // return result
- return counter;
-
- }
-
- private void checkForDeleted(int bufferSize) {
- // check for deleted tuples (deletes by other policies)
- while (bufferSize < this.buffer.size()) {
- this.buffer.removeFirst();
- }
- }
-
- private int deleteAndCountExpired(long threshold) {
- int counter = 0;
- while (!buffer.isEmpty()) {
-
- if (buffer.getFirst() <= threshold) {
- buffer.removeFirst();
- counter++;
- } else {
- break;
- }
- }
- return counter;
-
- }
-
- @Override
- public TimeEvictionPolicy<DATA> clone() {
- return new TimeEvictionPolicy<DATA>(granularity, timestampWrapper);
- }
-
- @Override
- public boolean equals(Object other) {
- if (other == null || !(other instanceof TimeEvictionPolicy)) {
- return false;
- } else {
- try {
- @SuppressWarnings("unchecked")
- TimeEvictionPolicy<DATA> otherPolicy = (TimeEvictionPolicy<DATA>) other;
- return granularity == otherPolicy.granularity
- && timestampWrapper.equals(otherPolicy.timestampWrapper);
- } catch (ClassCastException e) {
- return false;
- }
- }
- }
-
- public long getWindowSize() {
- return granularity;
- }
-
- @Override
- public String toString() {
- return "TimePolicy(" + granularity + ", " + timestampWrapper.getClass().getSimpleName()
- + ")";
- }
-
- public TimestampWrapper<DATA> getTimeStampWrapper() {
- return timestampWrapper;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java
deleted file mode 100644
index 03984a9..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java
+++ /dev/null
@@ -1,209 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.policy;
-
-import java.util.LinkedList;
-
-import org.apache.flink.streaming.api.windowing.helper.SystemTimestamp;
-import org.apache.flink.streaming.api.windowing.helper.Timestamp;
-import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
-
-/**
- * This trigger policy triggers with regard to the time. The is measured using a
- * given {@link Timestamp} implementation. A point in time is always represented
- * as long. Therefore, parameters such as granularity and delay can be set as
- * long value as well.
- *
- * @param <DATA>
- * The type of the incoming data points which are processed by this
- * policy.
- */
-public class TimeTriggerPolicy<DATA> implements ActiveTriggerPolicy<DATA>,
- CloneableTriggerPolicy<DATA>, CentralActiveTrigger<DATA> {
-
- /**
- * auto generated version id
- */
- private static final long serialVersionUID = -5122753802440196719L;
-
- protected long startTime;
- public long granularity;
- public TimestampWrapper<DATA> timestampWrapper;
-
- /**
- * This is mostly the same as
- * {@link TimeTriggerPolicy#TimeTriggerPolicy(long, TimestampWrapper)}. In addition
- * to granularity and timestamp a delay can be specified for the first
- * trigger. If the start time given by the timestamp is x, the delay is y,
- * and the granularity is z, the first trigger will happen at x+y+z.
- *
- * @param granularity
- * The granularity of the trigger. If this value is set to 2 the
- * policy will trigger at every second time point
- * @param timestampWrapper
- * The {@link TimestampWrapper} to measure the time with. This
- * can be either user defined of provided by the API.
- */
- public TimeTriggerPolicy(long granularity, TimestampWrapper<DATA> timestampWrapper) {
- this.startTime = timestampWrapper.getStartTime();
- this.timestampWrapper = timestampWrapper;
- this.granularity = granularity;
- }
-
- /**
- * This method checks if we missed a window end. If this is the case we
- * trigger the missed windows using fake elements.
- */
- @Override
- public synchronized Object[] preNotifyTrigger(DATA datapoint) {
- LinkedList<Object> fakeElements = new LinkedList<Object>();
- // check if there is more then one window border missed
- // use > here. In case >= would fit, the regular call will do the job.
- while (timestampWrapper.getTimestamp(datapoint) >= startTime + granularity) {
- startTime += granularity;
- fakeElements.add(startTime - 1);
- }
- return (Object[]) fakeElements.toArray();
- }
-
- /**
- * In case {@link SystemTimestamp} is used, a runnable is returned which
- * triggers based on the current system time. If any other time measure is
- * used the method returns null.
- *
- * @param callback
- * The object which is takes the callbacks for adding fake
- * elements out of the runnable.
- * @return A runnable is returned which triggers based on the current system
- * time. If any other time measure is used the method return null.
- */
- @Override
- public Runnable createActiveTriggerRunnable(ActiveTriggerCallback callback) {
- if (this.timestampWrapper.isDefaultTimestamp()) {
- return new TimeCheck(callback);
- } else {
- return null;
- }
- }
-
- /**
- * This method is only called in case the runnable triggers a window end
- * according to the {@link SystemTimestamp}.
- *
- * @param callback
- * The callback object.
- */
- public synchronized Object activeFakeElementEmission(ActiveTriggerCallback callback) {
-
- // start time is excluded, but end time is included: >=
- if (System.currentTimeMillis() >= startTime + granularity) {
- startTime += granularity;
- if (callback != null) {
- callback.sendFakeElement(startTime - 1);
- }
- return startTime - 1;
- }
- return null;
-
- }
-
- private class TimeCheck implements Runnable {
- ActiveTriggerCallback callback;
-
- public TimeCheck(ActiveTriggerCallback callback) {
- this.callback = callback;
- }
-
- @Override
- public void run() {
- while (true) {
- // wait for the specified granularity
- try {
- Thread.sleep(granularity);
- } catch (InterruptedException e) {
- // ignore it...
- }
- // Trigger using the respective methods. Methods are
- // synchronized to prevent race conditions between real and fake
- // elements at the policy.
- activeFakeElementEmission(callback);
- }
- }
- }
-
- @Override
- public synchronized boolean notifyTrigger(DATA datapoint) {
- long recordTime = timestampWrapper.getTimestamp(datapoint);
- if (recordTime >= startTime + granularity) {
- if (granularity != 0) {
- startTime = recordTime - ((recordTime - startTime) % granularity);
- }
- return true;
- } else {
- return false;
- }
- }
-
- @Override
- public TimeTriggerPolicy<DATA> clone() {
- return new TimeTriggerPolicy<DATA>(granularity, timestampWrapper);
- }
-
- @Override
- public boolean equals(Object other) {
- if (other == null || !(other instanceof TimeTriggerPolicy)) {
- return false;
- } else {
- try {
- @SuppressWarnings("unchecked")
- TimeTriggerPolicy<DATA> otherPolicy = (TimeTriggerPolicy<DATA>) other;
- return startTime == otherPolicy.startTime && granularity == otherPolicy.granularity
- && timestampWrapper.equals(otherPolicy.timestampWrapper);
- } catch (ClassCastException e) {
- return false;
- }
- }
- }
-
- public long getSlideSize() {
- return granularity;
- }
-
- @Override
- public String toString() {
- return "TimePolicy(" + granularity + ", " + timestampWrapper.getClass().getSimpleName()
- + ")";
- }
-
- public TimestampWrapper<DATA> getTimeStampWrapper() {
- return timestampWrapper;
- }
-
- @Override
- public Object[] notifyOnLastGlobalElement(DATA datapoint) {
- LinkedList<Object> fakeElements = new LinkedList<Object>();
- // check if there is more then one window border missed
- // use > here. In case >= would fit, the regular call will do the job.
- while (timestampWrapper.getTimestamp(datapoint) >= startTime + granularity) {
- startTime += granularity;
- fakeElements.add(startTime - 1);
- }
- return (Object[]) fakeElements.toArray();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TriggerPolicy.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TriggerPolicy.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TriggerPolicy.java
deleted file mode 100644
index c212df6..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TriggerPolicy.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.policy;
-
-import java.io.Serializable;
-
-/**
- * Proves and returns if a new window should be started. In case the trigger
- * occurs (return value true) the UDF will be executed on the current element
- * buffer without the last added element which is provided as parameter. This
- * element will be added to the buffer after the execution of the UDF.
- *
- * @param <DATA>
- * The data type which can be handled by this policy
- */
-public interface TriggerPolicy<DATA> extends Serializable {
-
- /**
- * Proves and returns if a new window should be started. In case the trigger
- * occurs (return value true) the UDF will be executed on the current
- * element buffer without the last added element which is provided as
- * parameter. This element will be added to the buffer after the execution
- * of the UDF.
- *
- * There are possibly different strategies for eviction and triggering: 1)
- * including last data point: Better/faster for count eviction 2) excluding
- * last data point: Essentially required for time based eviction and delta
- * rules As 2) is required for some policies and the benefit of using 1) is
- * small for the others, policies are implemented according to 2).
- *
- * @param datapoint
- * the data point which arrived
- * @return true if the current windows should be closed, otherwise false. In
- * true case the given data point will be part of the next window
- * and will not be included in the current one.
- */
- public boolean notifyTrigger(DATA datapoint);
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TumblingEvictionPolicy.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TumblingEvictionPolicy.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TumblingEvictionPolicy.java
deleted file mode 100644
index 08c49e9..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TumblingEvictionPolicy.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.policy;
-
-/**
- * This eviction policy deletes all elements from the buffer in case a trigger
- * occurred. Therefore, it is the default eviction policy to be used for any
- * tumbling window.
- *
- * By default this policy does not react on fake elements. Wrap it in an
- * {@link ActiveEvictionPolicyWrapper} to make it clearing the buffer even on
- * fake elements.
- *
- * @param <DATA>
- * The type of the data points which is handled by this policy
- */
-public class TumblingEvictionPolicy<DATA> implements CloneableEvictionPolicy<DATA> {
-
- /**
- * Auto generated version ID
- */
- private static final long serialVersionUID = -4018019069267281155L;
-
- /**
- * Counter for the current number of elements in the buffer
- */
- private int counter = 0;
-
- /**
- * This is the default constructor providing no special functionality. This
- * eviction policy deletes all elements from the buffer in case a trigger
- * occurred. Therefore, it is the default eviction policy to be used for any
- * tumbling window.
- */
- public TumblingEvictionPolicy() {
- // default constructor, no further logic needed
- }
-
- /**
- * This constructor allows to set a custom start value for the element
- * counter.
- *
- * This eviction policy deletes all elements from the buffer in case a
- * trigger occurred. Therefore, it is the default eviction policy to be used
- * for any tumbling window.
- *
- * @param startValue
- * A start value for the element counter
- */
- public TumblingEvictionPolicy(int startValue) {
- this.counter = startValue;
- }
-
- /**
- * Deletes all elements from the buffer in case the trigger occurred.
- */
- @Override
- public int notifyEviction(Object datapoint, boolean triggered, int bufferSize) {
- if (triggered) {
- // The current data point will be part of the next window!
- // Therefore the counter needs to be set to one already.
- int tmpCounter = counter;
- counter = 1;
- return tmpCounter;
- } else {
- counter++;
- return 0;
- }
- }
-
- @Override
- public TumblingEvictionPolicy<DATA> clone() {
- return new TumblingEvictionPolicy<DATA>(counter);
- }
-
- @Override
- public boolean equals(Object other) {
- if (other == null || !(other instanceof TumblingEvictionPolicy)) {
- return false;
- } else {
- return true;
- }
- }
-
- @Override
- public String toString() {
- return "TumblingPolicy";
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/BasicWindowBuffer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/BasicWindowBuffer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/BasicWindowBuffer.java
deleted file mode 100644
index 33fb29d..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/BasicWindowBuffer.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.windowbuffer;
-
-import java.util.LinkedList;
-import java.util.NoSuchElementException;
-
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.util.Collector;
-
-/**
- * Basic window buffer that stores the elements in a simple list without any
- * pre-aggregation.
- */
-public class BasicWindowBuffer<T> extends WindowBuffer<T> {
-
- private static final long serialVersionUID = 1L;
- protected LinkedList<T> buffer;
-
- public BasicWindowBuffer() {
- this.buffer = new LinkedList<T>();
- }
-
- public void emitWindow(Collector<StreamRecord<StreamWindow<T>>> collector) {
- if (emitEmpty || !buffer.isEmpty()) {
- StreamWindow<T> currentWindow = createEmptyWindow();
- currentWindow.addAll(buffer);
- collector.collect(new StreamRecord<StreamWindow<T>>(currentWindow));
- }
- }
-
- public void store(T element) throws Exception {
- buffer.add(element);
- }
-
- public void evict(int n) {
- for (int i = 0; i < n; i++) {
- try {
- buffer.removeFirst();
- } catch (NoSuchElementException e) {
- // In case no more elements are in the buffer:
- // Prevent failure and stop deleting.
- break;
- }
- }
- }
-
- @Override
- public BasicWindowBuffer<T> clone() {
- return new BasicWindowBuffer<T>();
- }
-
- @Override
- public String toString() {
- return buffer.toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingCountGroupedPreReducer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingCountGroupedPreReducer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingCountGroupedPreReducer.java
deleted file mode 100644
index 195a966..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingCountGroupedPreReducer.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.windowbuffer;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.util.Collector;
-
-public class JumpingCountGroupedPreReducer<T> extends TumblingGroupedPreReducer<T> {
-
- private static final long serialVersionUID = 1L;
-
- private final long countToSkip; // How many elements should be jumped over
- private long skipped = 0; // How many elements have we skipped since the last emitWindow
-
- public JumpingCountGroupedPreReducer(ReduceFunction<T> reducer, KeySelector<T, ?> keySelector,
- TypeSerializer<T> serializer, long countToSkip) {
- super(reducer, keySelector, serializer);
- this.countToSkip = countToSkip;
- }
-
- @Override
- public void emitWindow(Collector<StreamRecord<StreamWindow<T>>> collector) {
- super.emitWindow(collector);
- skipped = 0;
- }
-
- @Override
- public void store(T element) throws Exception {
- if(skipped == countToSkip){
- super.store(element);
- } else {
- skipped++;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingCountPreReducer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingCountPreReducer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingCountPreReducer.java
deleted file mode 100644
index 17fe408..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingCountPreReducer.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.windowbuffer;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.util.Collector;
-
-/**
- * Non-grouped pre-reducer for jumping time eviction policy
- * (the policies are based on count, and the slide size is larger than the window size).
- */
-public class JumpingCountPreReducer<T> extends TumblingPreReducer<T> {
-
- private static final long serialVersionUID = 1L;
-
- private final long countToSkip; // How many elements should be jumped over
- private long skipped = 0; // How many elements have we skipped since the last emitWindow
-
- public JumpingCountPreReducer(ReduceFunction<T> reducer, TypeSerializer<T> serializer, long countToSkip){
- super(reducer, serializer);
- this.countToSkip = countToSkip;
- }
-
- @Override
- public void emitWindow(Collector<StreamRecord<StreamWindow<T>>> collector) {
- super.emitWindow(collector);
- skipped = 0;
- }
-
- @Override
- public void store(T element) throws Exception {
- if(skipped == countToSkip){
- super.store(element);
- } else {
- skipped++;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingTimeGroupedPreReducer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingTimeGroupedPreReducer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingTimeGroupedPreReducer.java
deleted file mode 100644
index a92fc98..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingTimeGroupedPreReducer.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.windowbuffer;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.util.Collector;
-
-public class JumpingTimeGroupedPreReducer<T> extends TumblingGroupedPreReducer<T> {
-
- private static final long serialVersionUID = 1L;
-
- private TimestampWrapper<T> timestampWrapper;
- protected long windowStartTime;
- private long slideSize;
-
- public JumpingTimeGroupedPreReducer(ReduceFunction<T> reducer, KeySelector<T, ?> keySelector,
- TypeSerializer<T> serializer,
- long slideSize, long windowSize, TimestampWrapper<T> timestampWrapper){
- super(reducer, keySelector, serializer);
- this.timestampWrapper = timestampWrapper;
- this.windowStartTime = timestampWrapper.getStartTime() + slideSize - windowSize;
- this.slideSize = slideSize;
- }
-
- @Override
- public void emitWindow(Collector<StreamRecord<StreamWindow<T>>> collector) {
- super.emitWindow(collector);
- windowStartTime += slideSize;
- }
-
- public void store(T element) throws Exception {
- if(timestampWrapper.getTimestamp(element) >= windowStartTime) {
- super.store(element);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingTimePreReducer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingTimePreReducer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingTimePreReducer.java
deleted file mode 100644
index 1a47bc8..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingTimePreReducer.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.windowbuffer;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.util.Collector;
-
-/**
- * Non-grouped pre-reducer for jumping time eviction policy
- * (the policies are based on time, and the slide size is larger than the window size).
- */
-public class JumpingTimePreReducer<T> extends TumblingPreReducer<T> {
-
- private static final long serialVersionUID = 1L;
-
- private TimestampWrapper<T> timestampWrapper;
- protected long windowStartTime;
- private long slideSize;
-
- public JumpingTimePreReducer(ReduceFunction<T> reducer, TypeSerializer<T> serializer,
- long slideSize, long windowSize, TimestampWrapper<T> timestampWrapper){
- super(reducer, serializer);
- this.timestampWrapper = timestampWrapper;
- this.windowStartTime = timestampWrapper.getStartTime() + slideSize - windowSize;
- this.slideSize = slideSize;
- }
-
- @Override
- public void emitWindow(Collector<StreamRecord<StreamWindow<T>>> collector) {
- super.emitWindow(collector);
- windowStartTime += slideSize;
- }
-
- public void store(T element) throws Exception {
- if(timestampWrapper.getTimestamp(element) >= windowStartTime) {
- super.store(element);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/PreAggregator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/PreAggregator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/PreAggregator.java
deleted file mode 100644
index 1b95248..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/PreAggregator.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUStreamRecord<?>WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.windowbuffer;
-
-/**
- * Interface for marking window pre-aggregators that fully process the window so
- * that no further reduce step is necessary afterwards.
- */
-public interface PreAggregator {
-
-}