You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/09 12:16:41 UTC
[05/10] flink git commit: [FLINK-2780] Remove Old Windowing Logic and
API
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountGroupedPreReducer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountGroupedPreReducer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountGroupedPreReducer.java
deleted file mode 100644
index 8d690cc..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountGroupedPreReducer.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.windowbuffer;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.functions.KeySelector;
-
-public class SlidingCountGroupedPreReducer<T> extends SlidingGroupedPreReducer<T> {
-
- private static final long serialVersionUID = 1L;
-
- private long windowSize;
- private long slideSize;
- private int start;
-
- protected long index = 0;
-
- public SlidingCountGroupedPreReducer(ReduceFunction<T> reducer, TypeSerializer<T> serializer,
- KeySelector<T, ?> key, long windowSize, long slideSize, int start) {
- super(reducer, serializer, key);
- if (windowSize > slideSize) {
- this.windowSize = windowSize;
- this.slideSize = slideSize;
- this.start = start;
- } else {
- throw new RuntimeException(
- "Window size needs to be larger than slide size for the sliding pre-reducer");
- }
- index = index - start;
- }
-
- @Override
- protected void afterStore() {
- index++;
- }
-
- @Override
- public void store(T element) throws Exception {
- if (index >= 0) {
- super.store(element);
- } else {
- index++;
- }
- }
-
- @Override
- protected boolean currentEligible(T next) {
- if (index <= slideSize) {
- return true;
- } else {
- return index == windowSize;
- }
- }
-
- @Override
- protected void afterEmit() {
- if (index >= slideSize) {
- index = index - slideSize;
- }
- }
-
- @Override
- public SlidingCountGroupedPreReducer<T> clone() {
- return new SlidingCountGroupedPreReducer<T>(reducer, serializer, key, windowSize,
- slideSize, start);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountPreReducer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountPreReducer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountPreReducer.java
deleted file mode 100644
index db14eb0..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountPreReducer.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.windowbuffer;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-
-public class SlidingCountPreReducer<T> extends SlidingPreReducer<T> {
-
- private static final long serialVersionUID = 1L;
-
- private long windowSize;
- private long slideSize;
- private int start;
-
- protected long index = 0;
-
- public SlidingCountPreReducer(ReduceFunction<T> reducer, TypeSerializer<T> serializer,
- long windowSize, long slideSize, int start) {
- super(reducer, serializer);
- if (windowSize > slideSize) {
- this.windowSize = windowSize;
- this.slideSize = slideSize;
- this.start = start;
- } else {
- throw new RuntimeException(
- "Window size needs to be larger than slide size for the sliding pre-reducer");
- }
- index = index - start;
- }
-
- @Override
- protected void afterStore() {
- index++;
- }
-
- @Override
- public void store(T element) throws Exception {
- if (index >= 0) {
- super.store(element);
- } else {
- index++;
- }
- }
-
- @Override
- protected boolean currentEligible(T next) {
- if (index <= slideSize) {
- return true;
- } else {
- return index == windowSize;
- }
- }
-
- @Override
- protected void afterEmit() {
- if (index >= slideSize) {
- index = index - slideSize;
- }
- }
-
- @Override
- public SlidingCountPreReducer<T> clone() {
- return new SlidingCountPreReducer<T>(reducer, serializer, windowSize, slideSize, start);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingGroupedPreReducer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingGroupedPreReducer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingGroupedPreReducer.java
deleted file mode 100644
index 6e5462c..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingGroupedPreReducer.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.windowbuffer;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-
-/**
- * Grouped pre-reducer for sliding eviction policy
- * (the slide size is smaller than the window size).
- */
-public abstract class SlidingGroupedPreReducer<T> extends SlidingPreReducer<T> {
-
- private static final long serialVersionUID = 1L;
-
- protected Map<Object, T> currentReducedMap = new HashMap<Object, T>();
- protected LinkedList<Map<Object, T>> reducedMap = new LinkedList<Map<Object, T>>();
-
- protected KeySelector<T, ?> key;
-
- public SlidingGroupedPreReducer(ReduceFunction<T> reducer, TypeSerializer<T> serializer,
- KeySelector<T, ?> key) {
- super(reducer, serializer);
- this.key = key;
- }
-
- public boolean addFinalAggregate(StreamWindow<T> currentWindow) throws Exception {
- Map<Object, T> finalReduce = null;
-
- if (!reducedMap.isEmpty()) {
- finalReduce = reducedMap.get(0);
- for (int i = 1; i < reducedMap.size(); i++) {
- finalReduce = reduceMaps(finalReduce, reducedMap.get(i));
-
- }
- if (currentReducedMap != null) {
- finalReduce = reduceMaps(finalReduce, currentReducedMap);
- }
-
- } else {
- finalReduce = currentReducedMap;
- }
-
- if (finalReduce != null) {
- currentWindow.addAll(finalReduce.values());
- return true;
- } else {
- return false;
- }
-
- }
-
- private Map<Object, T> reduceMaps(Map<Object, T> first, Map<Object, T> second) throws Exception {
-
- Map<Object, T> reduced = new HashMap<Object, T>();
-
- // Get the common keys in the maps
- Set<Object> interSection = new HashSet<Object>();
- Set<Object> diffFirst = new HashSet<Object>();
- Set<Object> diffSecond = new HashSet<Object>();
-
- for (Object key : first.keySet()) {
- if (second.containsKey(key)) {
- interSection.add(key);
- } else {
- diffFirst.add(key);
- }
- }
-
- for (Object key : second.keySet()) {
- if (!interSection.contains(key)) {
- diffSecond.add(key);
- }
- }
-
- // Reduce the common keys
- for (Object key : interSection) {
- reduced.put(
- key,
- reducer.reduce(serializer.copy(first.get(key)),
- serializer.copy(second.get(key))));
- }
-
- for (Object key : diffFirst) {
- reduced.put(key, first.get(key));
- }
-
- for (Object key : diffSecond) {
- reduced.put(key, second.get(key));
- }
-
- return reduced;
- }
-
- protected void updateCurrent(T element) throws Exception {
- if (currentReducedMap == null) {
- currentReducedMap = new HashMap<Object, T>();
- currentReducedMap.put(key.getKey(element), element);
- } else {
- Object nextKey = key.getKey(element);
- T last = currentReducedMap.get(nextKey);
- if (last == null) {
- currentReducedMap.put(nextKey, element);
- } else {
- currentReducedMap.put(nextKey, reducer.reduce(serializer.copy(last), element));
- }
- }
- }
-
- @Override
- protected void removeLastReduced() {
- reducedMap.removeFirst();
- }
-
- @Override
- protected void addCurrentToBuffer(T element) throws Exception {
- reducedMap.add(currentReducedMap);
- }
-
- @Override
- protected void resetCurrent() {
- currentReducedMap = null;
- elementsSinceLastPreAggregate = 0;
- }
-
- @Override
- protected boolean currentNotEmpty() {
- return currentReducedMap != null;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingPreReducer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingPreReducer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingPreReducer.java
deleted file mode 100644
index e2c46a3..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingPreReducer.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.windowbuffer;
-
-import java.util.LinkedList;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.util.Collector;
-
-/**
- * Non-grouped pre-reducer for sliding eviction policy
- * (the slide size is smaller than the window size).
- */
-public abstract class SlidingPreReducer<T> extends WindowBuffer<T> implements PreAggregator {
-
- private static final long serialVersionUID = 1L;
-
- protected ReduceFunction<T> reducer;
-
- protected T currentReduced;
- protected LinkedList<T> reduced = new LinkedList<T>();
- protected LinkedList<Integer> elementsPerPreAggregate = new LinkedList<Integer>();
-
- protected TypeSerializer<T> serializer;
-
- protected int toRemove = 0;
-
- protected int elementsSinceLastPreAggregate = 0;
-
- public SlidingPreReducer(ReduceFunction<T> reducer, TypeSerializer<T> serializer) {
- this.reducer = reducer;
- this.serializer = serializer;
- }
-
- public void emitWindow(Collector<StreamRecord<StreamWindow<T>>> collector) {
- StreamWindow<T> currentWindow = createEmptyWindow();
-
- try {
- if (addFinalAggregate(currentWindow) || emitEmpty) {
- collector.collect(new StreamRecord<StreamWindow<T>>(currentWindow));
- }
- afterEmit();
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
-
- }
-
- protected void afterEmit() {
- // Do nothing by default
- }
-
- public boolean addFinalAggregate(StreamWindow<T> currentWindow) throws Exception {
- T finalReduce = null;
-
- if (!reduced.isEmpty()) {
- finalReduce = reduced.get(0);
- for (int i = 1; i < reduced.size(); i++) {
- finalReduce = reducer.reduce(finalReduce, serializer.copy(reduced.get(i)));
-
- }
- if (currentReduced != null) {
- finalReduce = reducer.reduce(finalReduce, serializer.copy(currentReduced));
- }
-
- } else {
- finalReduce = currentReduced;
- }
-
- if (finalReduce != null) {
- currentWindow.add(finalReduce);
- return true;
- } else {
- return false;
- }
-
- }
-
- public void store(T element) throws Exception {
- addToBufferIfEligible(element);
- afterStore();
- }
-
- protected void afterStore() {
- // Do nothing by default
- }
-
- protected void addToBufferIfEligible(T element) throws Exception {
- if (currentEligible(element) && currentNotEmpty()) {
- addCurrentToBuffer(element);
- elementsPerPreAggregate.add(elementsSinceLastPreAggregate);
- elementsSinceLastPreAggregate = 0;
- resetCurrent();
- }
- updateCurrent(element);
-
- elementsSinceLastPreAggregate++;
- }
-
- protected void resetCurrent() {
- currentReduced = null;
- }
-
- protected boolean currentNotEmpty() {
- return currentReduced != null;
- }
-
- protected void updateCurrent(T element) throws Exception {
- if (currentReduced == null) {
- currentReduced = element;
- } else {
- currentReduced = reducer.reduce(serializer.copy(currentReduced), element);
- }
- }
-
- protected void addCurrentToBuffer(T element) throws Exception {
- reduced.add(currentReduced);
- }
-
- protected abstract boolean currentEligible(T next);
-
- public void evict(int n) {
- toRemove += n;
-
- Integer lastPreAggregateSize = elementsPerPreAggregate.peek();
- while (lastPreAggregateSize != null && lastPreAggregateSize <= toRemove) {
- toRemove = max(toRemove - elementsPerPreAggregate.removeFirst(), 0);
- removeLastReduced();
- lastPreAggregateSize = elementsPerPreAggregate.peek();
- }
-
- if (lastPreAggregateSize == null) {
- toRemove = 0;
- }
- }
-
- protected void removeLastReduced() {
- reduced.removeFirst();
- }
-
- public static int max(int a, int b) {
- if (a > b) {
- return a;
- } else {
- return b;
- }
- }
-
- @Override
- public abstract SlidingPreReducer<T> clone();
-
- @Override
- public String toString() {
- return currentReduced.toString();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimeGroupedPreReducer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimeGroupedPreReducer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimeGroupedPreReducer.java
deleted file mode 100644
index cdb4207..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimeGroupedPreReducer.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.windowbuffer;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
-
-/**
- * Non-grouped pre-reducer for sliding time eviction policy.
- */
-public class SlidingTimeGroupedPreReducer<T> extends SlidingGroupedPreReducer<T> {
-
- private static final long serialVersionUID = 1L;
-
- private long windowSize;
- private long slideSize;
- private TimestampWrapper<T> timestampWrapper;
- private T lastStored;
- protected long windowStartTime;
-
- public SlidingTimeGroupedPreReducer(ReduceFunction<T> reducer, TypeSerializer<T> serializer,
- KeySelector<T, ?> key, long windowSize, long slideSize,
- TimestampWrapper<T> timestampWrapper) {
- super(reducer, serializer, key);
- if (windowSize > slideSize) {
- this.windowSize = windowSize;
- this.slideSize = slideSize;
- } else {
- throw new RuntimeException(
- "Window size needs to be larger than slide size for the sliding pre-reducer");
- }
- this.timestampWrapper = timestampWrapper;
- this.windowStartTime = timestampWrapper.getStartTime();
- }
-
- @Override
- public void store(T element) throws Exception {
- super.store(element);
- lastStored = element;
- }
-
- @Override
- public SlidingTimeGroupedPreReducer<T> clone() {
- return new SlidingTimeGroupedPreReducer<T>(reducer, serializer, key, windowSize, slideSize,
- timestampWrapper);
- }
-
- @Override
- public String toString() {
- return currentReducedMap.toString();
- }
-
- @Override
- protected void afterEmit() {
- if (lastStored != null) {
- long lastTime = timestampWrapper.getTimestamp(lastStored);
- if (lastTime - windowStartTime >= slideSize) {
- windowStartTime = windowStartTime + slideSize;
- }
- }
- }
-
- @Override
- public void evict(int n) {
- toRemove += n;
- Integer lastPreAggregateSize = elementsPerPreAggregate.peek();
-
- while (lastPreAggregateSize != null && lastPreAggregateSize <= toRemove) {
- toRemove = max(toRemove - elementsPerPreAggregate.removeFirst(), 0);
- removeLastReduced();
- lastPreAggregateSize = elementsPerPreAggregate.peek();
- }
-
- if (toRemove > 0 && lastPreAggregateSize == null) {
- resetCurrent();
- toRemove = 0;
- }
- }
-
- @Override
- protected boolean currentEligible(T next) {
- return windowStartTime == timestampWrapper.getStartTime()
- || timestampWrapper.getTimestamp(next) - windowStartTime >= slideSize;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducer.java
deleted file mode 100644
index d84505c..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducer.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.windowbuffer;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
-
-/**
- * Non-grouped pre-reducer for sliding time eviction policy
- * (the policies are based on time, and the slide size is smaller than the window size).
- */
-public class SlidingTimePreReducer<T> extends SlidingPreReducer<T> {
-
- private static final long serialVersionUID = 1L;
-
- private long windowSize;
- private long slideSize;
- private TimestampWrapper<T> timestampWrapper;
- private T lastStored;
- protected long windowStartTime;
-
- public SlidingTimePreReducer(ReduceFunction<T> reducer, TypeSerializer<T> serializer,
- long windowSize, long slideSize, TimestampWrapper<T> timestampWrapper) {
- super(reducer, serializer);
- if (windowSize > slideSize) {
- this.windowSize = windowSize;
- this.slideSize = slideSize;
- } else {
- throw new RuntimeException(
- "Window size needs to be larger than slide size for the sliding pre-reducer");
- }
- this.timestampWrapper = timestampWrapper;
- this.windowStartTime = timestampWrapper.getStartTime();
- }
-
- @Override
- public void store(T element) throws Exception {
- super.store(element);
- lastStored = element;
- }
-
- @Override
- public SlidingTimePreReducer<T> clone() {
- return new SlidingTimePreReducer<T>(reducer, serializer, windowSize, slideSize,
- timestampWrapper);
- }
-
- @Override
- public String toString() {
- return currentReduced.toString();
- }
-
- @Override
- protected void afterEmit() {
- if (lastStored != null) {
- long lastTime = timestampWrapper.getTimestamp(lastStored);
- if (lastTime - windowStartTime >= slideSize) {
- windowStartTime = windowStartTime + slideSize;
- }
- }
- }
-
- @Override
- public void evict(int n) {
- toRemove += n;
- Integer lastPreAggregateSize = elementsPerPreAggregate.peek();
-
- while (lastPreAggregateSize != null && lastPreAggregateSize <= toRemove) {
- toRemove = max(toRemove - elementsPerPreAggregate.removeFirst(), 0);
- reduced.removeFirst();
- lastPreAggregateSize = elementsPerPreAggregate.peek();
- }
-
- if (toRemove > 0 && lastPreAggregateSize == null) {
- currentReduced = null;
- elementsSinceLastPreAggregate = 0;
- toRemove = 0;
- }
- }
-
- @Override
- protected boolean currentEligible(T next) {
- return windowStartTime == timestampWrapper.getStartTime()
- || timestampWrapper.getTimestamp(next) - windowStartTime >= slideSize;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingGroupedPreReducer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingGroupedPreReducer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingGroupedPreReducer.java
deleted file mode 100644
index 37d3aae..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingGroupedPreReducer.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.windowbuffer;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.util.Collector;
-
-/**
- * Grouped pre-reducer for tumbling eviction polciy.
- */
-public class TumblingGroupedPreReducer<T> extends WindowBuffer<T> implements PreAggregator {
-
- private static final long serialVersionUID = 1L;
-
- private ReduceFunction<T> reducer;
- private KeySelector<T, ?> keySelector;
-
- private Map<Object, T> reducedValues;
-
- private TypeSerializer<T> serializer;
-
- private boolean evict = true;
-
- public TumblingGroupedPreReducer(ReduceFunction<T> reducer, KeySelector<T, ?> keySelector,
- TypeSerializer<T> serializer) {
- this(reducer, keySelector, serializer, true);
- }
-
- public TumblingGroupedPreReducer(ReduceFunction<T> reducer, KeySelector<T, ?> keySelector,
- TypeSerializer<T> serializer, boolean evict) {
- this.reducer = reducer;
- this.serializer = serializer;
- this.keySelector = keySelector;
- this.reducedValues = new HashMap<Object, T>();
- this.evict = evict;
- }
-
- public void emitWindow(Collector<StreamRecord<StreamWindow<T>>> collector) {
-
- if (!reducedValues.isEmpty()) {
- StreamWindow<T> currentWindow = createEmptyWindow();
- currentWindow.addAll(reducedValues.values());
- collector.collect(new StreamRecord<StreamWindow<T>>(currentWindow));
- } else if (emitEmpty) {
- collector.collect(new StreamRecord<StreamWindow<T>>(createEmptyWindow()));
- }
- if (evict) {
- reducedValues.clear();
- }
- }
-
- public void store(T element) throws Exception {
- Object key = keySelector.getKey(element);
-
- T reduced = reducedValues.get(key);
-
- if (reduced == null) {
- reduced = element;
- } else {
- reduced = reducer.reduce(serializer.copy(reduced), element);
- }
-
- reducedValues.put(key, reduced);
- }
-
- @Override
- public void evict(int n) {
- }
-
- @Override
- public TumblingGroupedPreReducer<T> clone() {
- return new TumblingGroupedPreReducer<T>(reducer, keySelector, serializer, evict);
- }
-
- @Override
- public String toString() {
- return reducedValues.toString();
- }
-
- public TumblingGroupedPreReducer<T> noEvict() {
- this.evict = false;
- return this;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingPreReducer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingPreReducer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingPreReducer.java
deleted file mode 100644
index 3a10be7..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingPreReducer.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.windowbuffer;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.util.Collector;
-
-/**
- * Non-grouped pre-reducer for tumbling eviction policy (the slide size is the
- * same as the window size).
- */
-public class TumblingPreReducer<T> extends WindowBuffer<T> implements PreAggregator {
-
- private static final long serialVersionUID = 1L;
-
- private ReduceFunction<T> reducer;
-
- private T reduced;
- private TypeSerializer<T> serializer;
-
- private boolean evict = true;
-
- public TumblingPreReducer(ReduceFunction<T> reducer, TypeSerializer<T> serializer) {
- this(reducer, serializer, true);
- }
-
- private TumblingPreReducer(ReduceFunction<T> reducer, TypeSerializer<T> serializer,
- boolean evict) {
- this.reducer = reducer;
- this.serializer = serializer;
- this.evict = evict;
- }
-
- public void emitWindow(Collector<StreamRecord<StreamWindow<T>>> collector) {
- if (reduced != null) {
- StreamWindow<T> currentWindow = createEmptyWindow();
- currentWindow.add(reduced);
- collector.collect(new StreamRecord<StreamWindow<T>>(currentWindow));
- } else if (emitEmpty) {
- collector.collect(new StreamRecord<StreamWindow<T>>(createEmptyWindow()));
- }
-
- if (evict) {
- reduced = null;
- }
- }
-
- public void store(T element) throws Exception {
- if (reduced == null) {
- reduced = element;
- } else {
- reduced = reducer.reduce(serializer.copy(reduced), element);
- }
- }
-
- public void evict(int n) {
- }
-
- @Override
- public TumblingPreReducer<T> clone() {
- return new TumblingPreReducer<T>(reducer, serializer, evict);
- }
-
- @Override
- public String toString() {
- return reduced.toString();
- }
-
- @Override
- public WindowBuffer<T> emitEmpty() {
- emitEmpty = true;
- return this;
- }
-
- public TumblingPreReducer<T> noEvict() {
- this.evict = false;
- return this;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/WindowBuffer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/WindowBuffer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/WindowBuffer.java
deleted file mode 100644
index 6e87d0b..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/WindowBuffer.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.windowbuffer;
-
-import java.io.Serializable;
-
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.util.Collector;
-
-/**
- * Class for defining specialized buffers to store/emit window data.
- * Pre-aggregators should be implemented using this interface.
- */
-public abstract class WindowBuffer<T> implements Serializable, Cloneable {
-
- private static final long serialVersionUID = 1L;
-
- protected Integer nextID = 1;
- protected boolean sequentialID = false;
- protected boolean emitEmpty = false;
- protected boolean emitPerGroup = false;
-
- public abstract void store(T element) throws Exception;
-
- public abstract void evict(int n);
-
- public abstract void emitWindow(Collector<StreamRecord<StreamWindow<T>>> collector);
-
- public abstract WindowBuffer<T> clone();
-
- public WindowBuffer<T> emitEmpty() {
- emitEmpty = true;
- return this;
- }
-
- public WindowBuffer<T> sequentialID() {
- sequentialID = true;
- return this;
- }
-
- protected StreamWindow<T> createEmptyWindow() {
- return sequentialID ? new StreamWindow<T>(nextID++) : new StreamWindow<T>();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
index 0b8482d..3a224e4 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
@@ -41,17 +41,19 @@ import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.SplitStream;
-import org.apache.flink.streaming.api.datastream.WindowedDataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.WindowMapFunction;
import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
import org.apache.flink.streaming.api.graph.StreamEdge;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperator;
-import org.apache.flink.streaming.api.windowing.helper.Count;
+import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
+import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
+import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
import org.apache.flink.streaming.runtime.partitioner.CustomPartitionerWrapper;
import org.apache.flink.streaming.runtime.partitioner.HashPartitioner;
@@ -95,6 +97,8 @@ public class DataStreamTest extends StreamingMultipleProgramsTestBase {
DataStreamSink<Long> connected = dataStream1.connect(dataStream2)
.flatMap(new CoFlatMapFunction<Long, Long, Long>() {
+ private static final long serialVersionUID = 1L;
+
@Override
public void flatMap1(Long value, Collector<Long> out) throws Exception {
}
@@ -103,14 +107,17 @@ public class DataStreamTest extends StreamingMultipleProgramsTestBase {
public void flatMap2(Long value, Collector<Long> out) throws Exception {
}
}).name("testCoFlatMap")
- .window(Count.of(10))
- .foldWindow(0L, new FoldFunction<Long, Long>() {
+ .windowAll(GlobalWindows.create())
+ .trigger(PurgingTrigger.of(CountTrigger.of(10)))
+ .fold(0L, new FoldFunction<Long, Long>() {
+ private static final long serialVersionUID = 1L;
+
@Override
public Long fold(Long accumulator, Long value) throws Exception {
return null;
}
- }).name("testWindowFold")
- .flatten()
+ })
+ .name("testWindowFold")
.print();
//test functionality through the operator names in the execution plan
@@ -133,15 +140,15 @@ public class DataStreamTest extends StreamingMultipleProgramsTestBase {
public void testPartitioning() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- DataStream src1 = env.fromElements(new Tuple2<Long, Long>(0L, 0L));
- DataStream src2 = env.fromElements(new Tuple2<Long, Long>(0L, 0L));
- ConnectedStreams connected = src1.connect(src2);
+ DataStream<Tuple2<Long, Long>> src1 = env.fromElements(new Tuple2<>(0L, 0L));
+ DataStream<Tuple2<Long, Long>> src2 = env.fromElements(new Tuple2<>(0L, 0L));
+ ConnectedStreams<Tuple2<Long, Long>, Tuple2<Long, Long>> connected = src1.connect(src2);
//Testing DataStream grouping
- DataStream group1 = src1.keyBy(0);
- DataStream group2 = src1.keyBy(1, 0);
- DataStream group3 = src1.keyBy("f0");
- DataStream group4 = src1.keyBy(new FirstSelector());
+ DataStream<Tuple2<Long, Long>> group1 = src1.keyBy(0);
+ DataStream<Tuple2<Long, Long>> group2 = src1.keyBy(1, 0);
+ DataStream<Tuple2<Long, Long>> group3 = src1.keyBy("f0");
+ DataStream<Tuple2<Long, Long>> group4 = src1.keyBy(new FirstSelector());
int id1 = createDownStreamId(group1);
int id2 = createDownStreamId(group2);
@@ -159,10 +166,10 @@ public class DataStreamTest extends StreamingMultipleProgramsTestBase {
assertTrue(isKeyed(group4));
//Testing DataStream partitioning
- DataStream partition1 = src1.partitionByHash(0);
- DataStream partition2 = src1.partitionByHash(1, 0);
- DataStream partition3 = src1.partitionByHash("f0");
- DataStream partition4 = src1.partitionByHash(new FirstSelector());
+ DataStream<Tuple2<Long, Long>> partition1 = src1.partitionByHash(0);
+ DataStream<Tuple2<Long, Long>> partition2 = src1.partitionByHash(1, 0);
+ DataStream<Tuple2<Long, Long>> partition3 = src1.partitionByHash("f0");
+ DataStream<Tuple2<Long, Long>> partition4 = src1.partitionByHash(new FirstSelector());
int pid1 = createDownStreamId(partition1);
int pid2 = createDownStreamId(partition2);
@@ -187,9 +194,9 @@ public class DataStreamTest extends StreamingMultipleProgramsTestBase {
}
};
- DataStream customPartition1 = src1.partitionCustom(longPartitioner, 0);
- DataStream customPartition3 = src1.partitionCustom(longPartitioner, "f0");
- DataStream customPartition4 = src1.partitionCustom(longPartitioner, new FirstSelector());
+ DataStream<Tuple2<Long, Long>> customPartition1 = src1.partitionCustom(longPartitioner, 0);
+ DataStream<Tuple2<Long, Long>> customPartition3 = src1.partitionCustom(longPartitioner, "f0");
+ DataStream<Tuple2<Long, Long>> customPartition4 = src1.partitionCustom(longPartitioner, new FirstSelector());
int cid1 = createDownStreamId(customPartition1);
int cid2 = createDownStreamId(customPartition3);
@@ -204,19 +211,19 @@ public class DataStreamTest extends StreamingMultipleProgramsTestBase {
assertFalse(isKeyed(customPartition4));
//Testing ConnectedStreams grouping
- ConnectedStreams connectedGroup1 = connected.keyBy(0, 0);
+ ConnectedStreams<Tuple2<Long, Long>, Tuple2<Long, Long>> connectedGroup1 = connected.keyBy(0, 0);
Integer downStreamId1 = createDownStreamId(connectedGroup1);
- ConnectedStreams connectedGroup2 = connected.keyBy(new int[]{0}, new int[]{0});
+ ConnectedStreams<Tuple2<Long, Long>, Tuple2<Long, Long>> connectedGroup2 = connected.keyBy(new int[]{0}, new int[]{0});
Integer downStreamId2 = createDownStreamId(connectedGroup2);
- ConnectedStreams connectedGroup3 = connected.keyBy("f0", "f0");
+ ConnectedStreams<Tuple2<Long, Long>, Tuple2<Long, Long>> connectedGroup3 = connected.keyBy("f0", "f0");
Integer downStreamId3 = createDownStreamId(connectedGroup3);
- ConnectedStreams connectedGroup4 = connected.keyBy(new String[]{"f0"}, new String[]{"f0"});
+ ConnectedStreams<Tuple2<Long, Long>, Tuple2<Long, Long>> connectedGroup4 = connected.keyBy(new String[]{"f0"}, new String[]{"f0"});
Integer downStreamId4 = createDownStreamId(connectedGroup4);
- ConnectedStreams connectedGroup5 = connected.keyBy(new FirstSelector(), new FirstSelector());
+ ConnectedStreams<Tuple2<Long, Long>, Tuple2<Long, Long>> connectedGroup5 = connected.keyBy(new FirstSelector(), new FirstSelector());
Integer downStreamId5 = createDownStreamId(connectedGroup5);
assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src1.getId(), downStreamId1)));
@@ -241,19 +248,19 @@ public class DataStreamTest extends StreamingMultipleProgramsTestBase {
assertTrue(isKeyed(connectedGroup5));
//Testing ConnectedStreams partitioning
- ConnectedStreams connectedPartition1 = connected.partitionByHash(0, 0);
+ ConnectedStreams<Tuple2<Long, Long>, Tuple2<Long, Long>> connectedPartition1 = connected.partitionByHash(0, 0);
Integer connectDownStreamId1 = createDownStreamId(connectedPartition1);
- ConnectedStreams connectedPartition2 = connected.partitionByHash(new int[]{0}, new int[]{0});
+ ConnectedStreams<Tuple2<Long, Long>, Tuple2<Long, Long>> connectedPartition2 = connected.partitionByHash(new int[]{0}, new int[]{0});
Integer connectDownStreamId2 = createDownStreamId(connectedPartition2);
- ConnectedStreams connectedPartition3 = connected.partitionByHash("f0", "f0");
+ ConnectedStreams<Tuple2<Long, Long>, Tuple2<Long, Long>> connectedPartition3 = connected.partitionByHash("f0", "f0");
Integer connectDownStreamId3 = createDownStreamId(connectedPartition3);
- ConnectedStreams connectedPartition4 = connected.partitionByHash(new String[]{"f0"}, new String[]{"f0"});
+ ConnectedStreams<Tuple2<Long, Long>, Tuple2<Long, Long>> connectedPartition4 = connected.partitionByHash(new String[]{"f0"}, new String[]{"f0"});
Integer connectDownStreamId4 = createDownStreamId(connectedPartition4);
- ConnectedStreams connectedPartition5 = connected.partitionByHash(new FirstSelector(), new FirstSelector());
+ ConnectedStreams<Tuple2<Long, Long>, Tuple2<Long, Long>> connectedPartition5 = connected.partitionByHash(new FirstSelector(), new FirstSelector());
Integer connectDownStreamId5 = createDownStreamId(connectedPartition5);
assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src1.getId(),
@@ -295,7 +302,7 @@ public class DataStreamTest extends StreamingMultipleProgramsTestBase {
public void testParallelism() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- DataStreamSource<Tuple2<Long, Long>> src = env.fromElements(new Tuple2<Long, Long>(0L, 0L));
+ DataStreamSource<Tuple2<Long, Long>> src = env.fromElements(new Tuple2<>(0L, 0L));
env.setParallelism(10);
SingleOutputStreamOperator<Long, ?> map = src.map(new MapFunction<Tuple2<Long, Long>, Long>() {
@@ -306,18 +313,20 @@ public class DataStreamTest extends StreamingMultipleProgramsTestBase {
}).name("MyMap");
DataStream<Long> windowed = map
- .window(Count.of(10))
- .foldWindow(0L, new FoldFunction<Long, Long>() {
+ .windowAll(GlobalWindows.create())
+ .trigger(PurgingTrigger.of(CountTrigger.of(10)))
+ .fold(0L, new FoldFunction<Long, Long>() {
@Override
public Long fold(Long accumulator, Long value) throws Exception {
return null;
}
- })
- .flatten();
+ });
windowed.addSink(new NoOpSink<Long>());
DataStreamSink<Long> sink = map.addSink(new SinkFunction<Long>() {
+ private static final long serialVersionUID = 1L;
+
@Override
public void invoke(Long value) throws Exception {
}
@@ -343,6 +352,7 @@ public class DataStreamTest extends StreamingMultipleProgramsTestBase {
src.setParallelism(3);
fail();
} catch (IllegalArgumentException success) {
+ // do nothing
}
DataStreamSource<Long> parallelSource = env.generateSequence(0, 0);
@@ -373,26 +383,33 @@ public class DataStreamTest extends StreamingMultipleProgramsTestBase {
}
});
- assertEquals(TypeExtractor.getForObject(new Tuple2<Integer, String>(0, "")), map.getType());
+ assertEquals(TypeExtractor.getForObject(new Tuple2<>(0, "")), map.getType());
- WindowedDataStream<String> window = map
- .window(Count.of(5))
- .mapWindow(new WindowMapFunction<Tuple2<Integer, String>, String>() {
+ DataStream<String> window = map
+ .windowAll(GlobalWindows.create())
+ .trigger(PurgingTrigger.of(CountTrigger.of(5)))
+ .apply(new AllWindowFunction<Tuple2<Integer, String>, String, GlobalWindow>() {
@Override
- public void mapWindow(Iterable<Tuple2<Integer, String>> values, Collector<String> out) throws Exception {
+ public void apply(GlobalWindow window,
+ Iterable<Tuple2<Integer, String>> values,
+ Collector<String> out) throws Exception {
+
}
});
assertEquals(TypeExtractor.getForClass(String.class), window.getType());
DataStream<CustomPOJO> flatten = window
- .foldWindow(new CustomPOJO(), new FoldFunction<String, CustomPOJO>() {
+ .windowAll(GlobalWindows.create())
+ .trigger(PurgingTrigger.of(CountTrigger.of(5)))
+ .fold(new CustomPOJO(), new FoldFunction<String, CustomPOJO>() {
+ private static final long serialVersionUID = 1L;
+
@Override
public CustomPOJO fold(CustomPOJO accumulator, String value) throws Exception {
return null;
}
- })
- .flatten();
+ });
assertEquals(TypeExtractor.getForClass(CustomPOJO.class), flatten.getType());
}
@@ -415,6 +432,8 @@ public class DataStreamTest extends StreamingMultipleProgramsTestBase {
FlatMapFunction<Long, Integer> flatMapFunction = new FlatMapFunction<Long, Integer>() {
+ private static final long serialVersionUID = 1L;
+
@Override
public void flatMap(Long value, Collector<Integer> out) throws Exception {
}
@@ -430,8 +449,7 @@ public class DataStreamTest extends StreamingMultipleProgramsTestBase {
}
};
- DataStream<Integer> unionFilter = map
- .union(flatMap)
+ DataStream<Integer> unionFilter = map.union(flatMap)
.filter(filterFunction);
unionFilter.addSink(new NoOpSink<Integer>());
@@ -471,6 +489,8 @@ public class DataStreamTest extends StreamingMultipleProgramsTestBase {
ConnectedStreams<Integer, Integer> connect = map.connect(flatMap);
CoMapFunction<Integer, Integer, String> coMapper = new CoMapFunction<Integer, Integer, String>() {
+ private static final long serialVersionUID = 1L;
+
@Override
public String map1(Integer value) {
return null;
@@ -597,16 +617,19 @@ public class DataStreamTest extends StreamingMultipleProgramsTestBase {
return operator.getUserFunction();
}
- private static Integer createDownStreamId(DataStream dataStream) {
+ private static Integer createDownStreamId(DataStream<?> dataStream) {
return dataStream.print().getTransformation().getId();
}
- private static boolean isKeyed(DataStream dataStream) {
+ private static boolean isKeyed(DataStream<?> dataStream) {
return dataStream instanceof KeyedStream;
}
+ @SuppressWarnings("rawtypes,unchecked")
private static Integer createDownStreamId(ConnectedStreams dataStream) {
- SingleOutputStreamOperator coMap = dataStream.map(new CoMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Object>() {
+ SingleOutputStreamOperator<?, ?> coMap = dataStream.map(new CoMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Object>() {
+ private static final long serialVersionUID = 1L;
+
@Override
public Object map1(Tuple2<Long, Long> value) {
return null;
@@ -621,7 +644,7 @@ public class DataStreamTest extends StreamingMultipleProgramsTestBase {
return coMap.getId();
}
- private static boolean isKeyed(ConnectedStreams dataStream) {
+ private static boolean isKeyed(ConnectedStreams<?, ?> dataStream) {
return (dataStream.getFirstInput() instanceof KeyedStream && dataStream.getSecondInput() instanceof KeyedStream);
}
@@ -634,6 +657,8 @@ public class DataStreamTest extends StreamingMultipleProgramsTestBase {
}
private static class FirstSelector implements KeySelector<Tuple2<Long, Long>, Long> {
+ private static final long serialVersionUID = 1L;
+
@Override
public Long getKey(Tuple2<Long, Long> value) throws Exception {
return value.f0;
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
index 5e46508..2775299 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
@@ -33,15 +33,18 @@ import org.apache.flink.streaming.api.datastream.IterativeStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.SplitStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.WindowMapFunction;
+import org.apache.flink.streaming.api.functions.TimestampExtractor;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.delta.DeltaFunction;
-import org.apache.flink.streaming.api.windowing.helper.Count;
-import org.apache.flink.streaming.api.windowing.helper.Delta;
-import org.apache.flink.streaming.api.windowing.helper.Time;
-import org.apache.flink.streaming.api.windowing.helper.Timestamp;
+import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.DeltaTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
+import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
import org.apache.flink.util.Collector;
import org.junit.After;
@@ -59,6 +62,7 @@ import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
+import java.util.concurrent.TimeUnit;
@SuppressWarnings("serial")
public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase {
@@ -117,8 +121,8 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase {
IterativeStream<Tuple2<Long, Tuple2<String, Long>>> it = sourceStream1.map(new MapFunction<Tuple2<Long, Tuple2<String, Long>>,Tuple2<Long, Tuple2<String, Long>>>(){
- Tuple2<Long, Tuple2<String, Long>> result = new Tuple2<Long, Tuple2<String, Long>>(
- 0L, new Tuple2<String, Long>("", 0L));
+ Tuple2<Long, Tuple2<String, Long>> result = new Tuple2<>(
+ 0L, new Tuple2<>("", 0L));
@Override
public Tuple2<Long, Tuple2<String, Long>> map(
@@ -167,38 +171,38 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase {
"peach-d\n" + "peach-d\n";
List<Tuple5<Integer, String, Character, Double, Boolean>> input = Arrays.asList(
- new Tuple5<Integer, String, Character, Double, Boolean>(1, "apple", 'j', 0.1, false),
- new Tuple5<Integer, String, Character, Double, Boolean>(1, "peach", 'b', 0.8, false),
- new Tuple5<Integer, String, Character, Double, Boolean>(1, "orange", 'c', 0.7, true),
- new Tuple5<Integer, String, Character, Double, Boolean>(2, "apple", 'd', 0.5, false),
- new Tuple5<Integer, String, Character, Double, Boolean>(2, "peach", 'j', 0.6, false),
- new Tuple5<Integer, String, Character, Double, Boolean>(3, "orange", 'b', 0.2, true),
- new Tuple5<Integer, String, Character, Double, Boolean>(6, "apple", 'c', 0.1, false),
- new Tuple5<Integer, String, Character, Double, Boolean>(7, "peach", 'd', 0.4, false),
- new Tuple5<Integer, String, Character, Double, Boolean>(8, "orange", 'j', 0.2, true),
- new Tuple5<Integer, String, Character, Double, Boolean>(10, "apple", 'b', 0.1, false),
- new Tuple5<Integer, String, Character, Double, Boolean>(10, "peach", 'c', 0.5, false),
- new Tuple5<Integer, String, Character, Double, Boolean>(11, "orange", 'd', 0.3, true),
- new Tuple5<Integer, String, Character, Double, Boolean>(11, "apple", 'j', 0.3, false),
- new Tuple5<Integer, String, Character, Double, Boolean>(12, "peach", 'b', 0.9, false),
- new Tuple5<Integer, String, Character, Double, Boolean>(13, "orange", 'c', 0.7, true),
- new Tuple5<Integer, String, Character, Double, Boolean>(15, "apple", 'd', 0.2, false),
- new Tuple5<Integer, String, Character, Double, Boolean>(16, "peach", 'j', 0.8, false),
- new Tuple5<Integer, String, Character, Double, Boolean>(16, "orange", 'b', 0.8, true),
- new Tuple5<Integer, String, Character, Double, Boolean>(16, "apple", 'c', 0.1, false),
- new Tuple5<Integer, String, Character, Double, Boolean>(17, "peach", 'd', 1.0, true));
+ new Tuple5<>(1, "apple", 'j', 0.1, false),
+ new Tuple5<>(1, "peach", 'b', 0.8, false),
+ new Tuple5<>(1, "orange", 'c', 0.7, true),
+ new Tuple5<>(2, "apple", 'd', 0.5, false),
+ new Tuple5<>(2, "peach", 'j', 0.6, false),
+ new Tuple5<>(3, "orange", 'b', 0.2, true),
+ new Tuple5<>(6, "apple", 'c', 0.1, false),
+ new Tuple5<>(7, "peach", 'd', 0.4, false),
+ new Tuple5<>(8, "orange", 'j', 0.2, true),
+ new Tuple5<>(10, "apple", 'b', 0.1, false),
+ new Tuple5<>(10, "peach", 'c', 0.5, false),
+ new Tuple5<>(11, "orange", 'd', 0.3, true),
+ new Tuple5<>(11, "apple", 'j', 0.3, false),
+ new Tuple5<>(12, "peach", 'b', 0.9, false),
+ new Tuple5<>(13, "orange", 'c', 0.7, true),
+ new Tuple5<>(15, "apple", 'd', 0.2, false),
+ new Tuple5<>(16, "peach", 'j', 0.8, false),
+ new Tuple5<>(16, "orange", 'b', 0.8, true),
+ new Tuple5<>(16, "apple", 'c', 0.1, false),
+ new Tuple5<>(17, "peach", 'd', 1.0, true));
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.getConfig().enableTimestamps();
SingleOutputStreamOperator<Tuple5<Integer, String, Character, Double, Boolean>, DataStreamSource<Tuple5<Integer, String, Character, Double, Boolean>>> sourceStream21 = env.fromCollection(input);
DataStream<OuterPojo> sourceStream22 = env.addSource(new PojoSource());
sourceStream21
+ .extractTimestamp(new MyTimestampExtractor())
.keyBy(2, 2)
- .window(Time.of(10, new MyTimestamp(), 0))
- .every(Time.of(4, new MyTimestamp(), 0))
+ .timeWindow(Time.of(10, TimeUnit.MILLISECONDS), Time.of(4, TimeUnit.MILLISECONDS))
.maxBy(3)
- .flatten()
.map(new MyMapFunction2())
.flatMap(new MyFlatMapFunction())
.connect(sourceStream22)
@@ -244,11 +248,13 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase {
DataStream<Long> sourceStream32 = env.generateSequence(10001, 20000);
sourceStream31.filter(new PrimeFilterFunction())
- .window(Count.of(100))
- .max(0).flatten()
+ .windowAll(GlobalWindows.create())
+ .trigger(PurgingTrigger.of(CountTrigger.of(100)))
+ .max(0)
.union(sourceStream32.filter(new PrimeFilterFunction())
- .window(Count.of(100))
- .max(0).flatten())
+ .windowAll(GlobalWindows.create())
+ .trigger(PurgingTrigger.of(CountTrigger.of(100)))
+ .max(0))
.writeAsText(resultPath1, FileSystem.WriteMode.OVERWRITE);
sourceStream31.flatMap(new DivisorsFlatMapFunction())
@@ -257,11 +263,13 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase {
@Override
public Tuple2<Long, Integer> map(Long value) throws Exception {
- return new Tuple2<Long, Integer>(value, 1);
+ return new Tuple2<>(value, 1);
}
})
.keyBy(0)
- .window(Count.of(10000)).sum(1).flatten()
+ .window(GlobalWindows.create())
+ .trigger(PurgingTrigger.of(CountTrigger.of(10_000)))
+ .sum(1)
.filter(new FilterFunction<Tuple2<Long, Integer>>() {
@Override
@@ -275,6 +283,7 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase {
}
@Test
+ @Ignore
public void complexIntegrationTest4() throws Exception {
//Testing mapping and delta-policy windowing with custom class
@@ -290,13 +299,14 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase {
"((499,587),90)\n" + "((516,606),93)\n" + "((517,609),94)\n" + "((534,628),97)\n" + "((535,631),98)";
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
env.addSource(new RectangleSource())
.global()
.map(new RectangleMapFunction())
- .window(Delta.of(0.0, new MyDelta(), new Tuple2<Rectangle, Integer>(new Rectangle(100, 100), 0)))
- .mapWindow(new MyWindowMapFunction())
- .flatten()
+ .windowAll(GlobalWindows.create())
+ .trigger(PurgingTrigger.of(DeltaTrigger.of(0.0, new MyDelta())))
+ .apply(new MyWindowMapFunction())
.writeAsText(resultPath1, FileSystem.WriteMode.OVERWRITE);
env.execute();
@@ -361,6 +371,7 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase {
@Test
+ @Ignore
public void complexIntegrationTest6() throws Exception {
//Testing java collections and date-time types
@@ -376,88 +387,89 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase {
SimpleDateFormat ft = new SimpleDateFormat("dd-MM-yyyy");
- ArrayList<Tuple2<Date, HashMap<Character, Integer>>> sales = new ArrayList<Tuple2<Date, HashMap<Character,
- Integer>>>();
- HashMap<Character, Integer> sale1 = new HashMap<Character, Integer>();
+ ArrayList<Tuple2<Date, HashMap<Character, Integer>>> sales = new ArrayList<>();
+ HashMap<Character, Integer> sale1 = new HashMap<>();
sale1.put('a', 2);
sale1.put('c', 2);
sale1.put('d', 1);
sale1.put('f', 1);
- sales.add(new Tuple2<Date, HashMap<Character, Integer>>(ft.parse("03-06-2014"), sale1));
+ sales.add(new Tuple2<>(ft.parse("03-06-2014"), sale1));
- HashMap<Character, Integer> sale2 = new HashMap<Character, Integer>();
+ HashMap<Character, Integer> sale2 = new HashMap<>();
sale2.put('a', 1);
sale2.put('b', 2);
sale2.put('d', 1);
- sales.add(new Tuple2<Date, HashMap<Character, Integer>>(ft.parse("10-06-2014"), sale2));
+ sales.add(new Tuple2<>(ft.parse("10-06-2014"), sale2));
- HashMap<Character, Integer> sale3 = new HashMap<Character, Integer>();
+ HashMap<Character, Integer> sale3 = new HashMap<>();
sale3.put('a', 3);
sale3.put('b', 1);
sale3.put('c', 2);
sale3.put('f', 1);
- sales.add(new Tuple2<Date, HashMap<Character, Integer>>(ft.parse("29-06-2014"), sale3));
+ sales.add(new Tuple2<>(ft.parse("29-06-2014"), sale3));
- HashMap<Character, Integer> sale4 = new HashMap<Character, Integer>();
+ HashMap<Character, Integer> sale4 = new HashMap<>();
sale4.put('a', 1);
sale4.put('d', 1);
sale4.put('e', 1);
- sales.add(new Tuple2<Date, HashMap<Character, Integer>>(ft.parse("15-07-2014"), sale4));
+ sales.add(new Tuple2<>(ft.parse("15-07-2014"), sale4));
- HashMap<Character, Integer> sale5 = new HashMap<Character, Integer>();
+ HashMap<Character, Integer> sale5 = new HashMap<>();
sale5.put('b', 2);
sale5.put('c', 3);
sale5.put('f', 1);
- sales.add(new Tuple2<Date, HashMap<Character, Integer>>(ft.parse("24-07-2014"), sale5));
+ sales.add(new Tuple2<>(ft.parse("24-07-2014"), sale5));
- HashMap<Character, Integer> sale6 = new HashMap<Character, Integer>();
+ HashMap<Character, Integer> sale6 = new HashMap<>();
sale6.put('a', 4);
sale6.put('b', 2);
sale6.put('c', 2);
sale6.put('e', 1);
- sales.add(new Tuple2<Date, HashMap<Character, Integer>>(ft.parse("17-08-2014"), sale6));
+ sales.add(new Tuple2<>(ft.parse("17-08-2014"), sale6));
- HashMap<Character, Integer> sale7 = new HashMap<Character, Integer>();
+ HashMap<Character, Integer> sale7 = new HashMap<>();
sale7.put('a', 2);
sale7.put('b', 2);
sale7.put('c', 3);
sale7.put('d', 1);
sale7.put('e', 1);
sale7.put('f', 2);
- sales.add(new Tuple2<Date, HashMap<Character, Integer>>(ft.parse("27-08-2014"), sale7));
+ sales.add(new Tuple2<>(ft.parse("27-08-2014"), sale7));
- HashMap<Character, Integer> sale8 = new HashMap<Character, Integer>();
+ HashMap<Character, Integer> sale8 = new HashMap<>();
sale8.put('a', 3);
sale8.put('b', 1);
sale8.put('c', 3);
sale8.put('d', 2);
sale8.put('f', 1);
- sales.add(new Tuple2<Date, HashMap<Character, Integer>>(ft.parse("16-09-2014"), sale8));
+ sales.add(new Tuple2<>(ft.parse("16-09-2014"), sale8));
- HashMap<Character, Integer> sale9 = new HashMap<Character, Integer>();
+ HashMap<Character, Integer> sale9 = new HashMap<>();
sale9.put('a', 1);
sale9.put('b', 3);
sale9.put('c', 4);
sale9.put('d', 1);
sale9.put('e', 1);
sale9.put('f', 1);
- sales.add(new Tuple2<Date, HashMap<Character, Integer>>(ft.parse("25-09-2014"), sale9));
+ sales.add(new Tuple2<>(ft.parse("25-09-2014"), sale9));
- HashMap<Character, Integer> sale10 = new HashMap<Character, Integer>();
+ HashMap<Character, Integer> sale10 = new HashMap<>();
sale10.put('a', 3);
sale10.put('b', 2);
sale10.put('c', 3);
sale10.put('d', 2);
sale10.put('e', 1);
sale10.put('f', 1);
- sales.add(new Tuple2<Date, HashMap<Character, Integer>>(ft.parse("01-10-2014"), sale10));
+ sales.add(new Tuple2<>(ft.parse("01-10-2014"), sale10));
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.getConfig().enableTimestamps();
DataStream<Tuple2<Date, HashMap<Character, Integer>>> sourceStream6 = env.fromCollection(sales);
- sourceStream6.window(Time.of(1, new Timestamp6()))
- .reduceWindow(new SalesReduceFunction())
- .flatten()
+ sourceStream6
+ .extractTimestamp(new Timestamp6())
+ .timeWindowAll(Time.of(1, TimeUnit.MILLISECONDS))
+ .reduce(new SalesReduceFunction())
.flatMap(new FlatMapFunction6())
.writeAsText(resultPath1, FileSystem.WriteMode.OVERWRITE);
@@ -478,7 +490,7 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase {
@Override
public Tuple4<Integer, String, Double, Boolean> map(Tuple5<Integer, String, Character, Double,
Boolean> value) throws Exception {
- return new Tuple4<Integer, String, Double, Boolean>(value.f0, value.f1 + "-" + value.f2,
+ return new Tuple4<>(value.f0, value.f1 + "-" + value.f2,
value.f3, value.f4);
}
@@ -509,7 +521,7 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase {
@Override
public void run(SourceContext<Tuple2<Long, Tuple2<String, Long>>> ctx) throws Exception {
for (int i = 0; i < 20; i++) {
- Tuple2<Long, Tuple2<String, Long>> result = new Tuple2<Long, Tuple2<String, Long>>(1L, new Tuple2<String, Long>("a", 1L));
+ Tuple2<Long, Tuple2<String, Long>> result = new Tuple2<>(1L, new Tuple2<>("a", 1L));
ctx.collect(result);
}
}
@@ -526,17 +538,28 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase {
@Override
public Tuple2<Long, Tuple2<String, Long>> map(Tuple2<Long, Tuple2<String, Long>> value) throws Exception {
- return new Tuple2<Long, Tuple2<String, Long>>(value.f0 + 1, value.f1);
+ return new Tuple2<>(value.f0 + 1, value.f1);
}
}
- private static class MyTimestamp implements Timestamp<Tuple5<Integer, String, Character, Double, Boolean>> {
+ private static class MyTimestampExtractor implements TimestampExtractor<Tuple5<Integer, String, Character, Double, Boolean>> {
private static final long serialVersionUID = 1L;
@Override
- public long getTimestamp(Tuple5<Integer, String, Character, Double, Boolean> value) {
+ public long extractTimestamp(Tuple5<Integer, String, Character, Double, Boolean> value, long currentTimestamp) {
return (long) value.f0;
}
+
+ @Override
+ public long emitWatermark(Tuple5<Integer, String, Character, Double, Boolean> value,
+ long currentTimestamp) {
+ return (long) value.f0 - 1;
+ }
+
+ @Override
+ public long getCurrentWatermark() {
+ return Long.MIN_VALUE;
+ }
}
private static class MyFlatMapFunction implements FlatMapFunction<Tuple4<Integer, String, Double,
@@ -573,7 +596,7 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase {
@Override
public Iterable<String> select(Tuple2<Long, Tuple2<String, Long>> value) {
- List<String> output = new ArrayList<String>();
+ List<String> output = new ArrayList<>();
if (value.f0 == 10) {
output.add("iterate");
output.add("firstOutput");
@@ -627,6 +650,8 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase {
@Override
public void run(SourceContext<Rectangle> ctx) throws Exception {
+ // emit once as the initializer of the delta trigger
+ ctx.collect(rectangle);
for (int i = 0; i < 100; i++) {
ctx.collect(rectangle);
rectangle = rectangle.next();
@@ -644,16 +669,15 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase {
@Override
public Tuple2<Rectangle, Integer> map(Rectangle value) throws Exception {
- return new Tuple2<Rectangle, Integer>(value, counter++);
+ return new Tuple2<>(value, counter++);
}
}
- private static class MyWindowMapFunction implements WindowMapFunction<Tuple2<Rectangle, Integer>,
- Tuple2<Rectangle, Integer>> {
+ private static class MyWindowMapFunction implements AllWindowFunction<Tuple2<Rectangle, Integer>, Tuple2<Rectangle, Integer>, GlobalWindow> {
private static final long serialVersionUID = 1L;
@Override
- public void mapWindow(Iterable<Tuple2<Rectangle, Integer>> values, Collector<Tuple2<Rectangle,
+ public void apply(GlobalWindow window, Iterable<Tuple2<Rectangle, Integer>> values, Collector<Tuple2<Rectangle,
Integer>> out) throws Exception {
out.collect(values.iterator().next());
}
@@ -670,14 +694,28 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase {
}
}
- private static class Timestamp6 implements Timestamp<Tuple2<Date, HashMap<Character, Integer>>> {
+ private static class Timestamp6 implements TimestampExtractor<Tuple2<Date, HashMap<Character, Integer>>> {
@Override
- public long getTimestamp(Tuple2<Date, HashMap<Character, Integer>> value) {
+ public long extractTimestamp(Tuple2<Date, HashMap<Character, Integer>> value,
+ long currentTimestamp) {
Calendar cal = Calendar.getInstance();
cal.setTime(value.f0);
return 12 * (cal.get(Calendar.YEAR)) + cal.get(Calendar.MONTH);
}
+
+ @Override
+ public long emitWatermark(Tuple2<Date, HashMap<Character, Integer>> value,
+ long currentTimestamp) {
+ Calendar cal = Calendar.getInstance();
+ cal.setTime(value.f0);
+ return 12 * (cal.get(Calendar.YEAR)) + cal.get(Calendar.MONTH) - 1;
+ }
+
+ @Override
+ public long getCurrentWatermark() {
+ return 0;
+ }
}
private static class SalesReduceFunction implements ReduceFunction<Tuple2<Date, HashMap<Character, Integer>>> {
@@ -697,7 +735,7 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase {
}
map1.put(key, volume1 + volume2);
}
- return new Tuple2<Date, HashMap<Character, Integer>>(value2.f0, map1);
+ return new Tuple2<>(value2.f0, map1);
}
}
@@ -710,9 +748,9 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase {
Calendar cal = Calendar.getInstance();
cal.setTime(value.f0);
for (Character key : value.f1.keySet()) {
- out.collect(new Tuple2<Integer, Tuple2<Character, Integer>>(cal.get(Calendar.MONTH)
+ out.collect(new Tuple2<>(cal.get(Calendar.MONTH)
+ 1,
- new Tuple2<Character, Integer>(key, value.f1.get(key))));
+ new Tuple2<>(key, value.f1.get(key))));
}
}
}
@@ -722,7 +760,7 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase {
@Override
public ArrayList<Character> map(Tuple2<Date, HashMap<Character, Integer>> value)
throws Exception {
- ArrayList<Character> list = new ArrayList<Character>();
+ ArrayList<Character> list = new ArrayList<>();
for (Character ch : value.f1.keySet()) {
for (int i = 0; i < value.f1.get(ch); i++) {
list.add(ch);
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/ArrayFromTupleTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/ArrayFromTupleTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/ArrayFromTupleTest.java
new file mode 100644
index 0000000..c98a659
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/ArrayFromTupleTest.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.windowing.delta.extractor;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple10;
+import org.apache.flink.api.java.tuple.Tuple11;
+import org.apache.flink.api.java.tuple.Tuple12;
+import org.apache.flink.api.java.tuple.Tuple13;
+import org.apache.flink.api.java.tuple.Tuple14;
+import org.apache.flink.api.java.tuple.Tuple15;
+import org.apache.flink.api.java.tuple.Tuple16;
+import org.apache.flink.api.java.tuple.Tuple17;
+import org.apache.flink.api.java.tuple.Tuple18;
+import org.apache.flink.api.java.tuple.Tuple19;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple20;
+import org.apache.flink.api.java.tuple.Tuple21;
+import org.apache.flink.api.java.tuple.Tuple22;
+import org.apache.flink.api.java.tuple.Tuple23;
+import org.apache.flink.api.java.tuple.Tuple24;
+import org.apache.flink.api.java.tuple.Tuple25;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.java.tuple.Tuple6;
+import org.apache.flink.api.java.tuple.Tuple7;
+import org.apache.flink.api.java.tuple.Tuple8;
+import org.apache.flink.api.java.tuple.Tuple9;
+import org.apache.flink.streaming.api.functions.windowing.delta.extractor.ArrayFromTuple;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ArrayFromTupleTest {
+
+ private String[] testStrings;
+
+ @Before
+ public void init() {
+ testStrings = new String[Tuple.MAX_ARITY];
+ for (int i = 0; i < Tuple.MAX_ARITY; i++) {
+ testStrings[i] = Integer.toString(i);
+ }
+ }
+
+ @Test
+ public void testConvertFromTupleToArray() throws InstantiationException, IllegalAccessException {
+ for (int i = 0; i < Tuple.MAX_ARITY; i++) {
+ Tuple currentTuple = (Tuple) CLASSES[i].newInstance();
+ String[] currentArray = new String[i + 1];
+ for (int j = 0; j <= i; j++) {
+ currentTuple.setField(testStrings[j], j);
+ currentArray[j] = testStrings[j];
+ }
+ arrayEqualityCheck(currentArray, new ArrayFromTuple().extract(currentTuple));
+ }
+ }
+
+ @Test
+ public void testUserSpecifiedOrder() throws InstantiationException, IllegalAccessException {
+ Tuple currentTuple = (Tuple) CLASSES[Tuple.MAX_ARITY - 1].newInstance();
+ for (int i = 0; i < Tuple.MAX_ARITY; i++) {
+ currentTuple.setField(testStrings[i], i);
+ }
+
+ String[] expected = { testStrings[5], testStrings[3], testStrings[6], testStrings[7],
+ testStrings[0] };
+ arrayEqualityCheck(expected, new ArrayFromTuple(5, 3, 6, 7, 0).extract(currentTuple));
+
+ String[] expected2 = { testStrings[0], testStrings[Tuple.MAX_ARITY - 1] };
+ arrayEqualityCheck(expected2,
+ new ArrayFromTuple(0, Tuple.MAX_ARITY - 1).extract(currentTuple));
+
+ String[] expected3 = { testStrings[Tuple.MAX_ARITY - 1], testStrings[0] };
+ arrayEqualityCheck(expected3,
+ new ArrayFromTuple(Tuple.MAX_ARITY - 1, 0).extract(currentTuple));
+
+ String[] expected4 = { testStrings[13], testStrings[4], testStrings[5], testStrings[4],
+ testStrings[2], testStrings[8], testStrings[6], testStrings[2], testStrings[8],
+ testStrings[3], testStrings[5], testStrings[2], testStrings[16], testStrings[4],
+ testStrings[3], testStrings[2], testStrings[6], testStrings[4], testStrings[7],
+ testStrings[4], testStrings[2], testStrings[8], testStrings[7], testStrings[2] };
+ arrayEqualityCheck(expected4, new ArrayFromTuple(13, 4, 5, 4, 2, 8, 6, 2, 8, 3, 5, 2, 16,
+ 4, 3, 2, 6, 4, 7, 4, 2, 8, 7, 2).extract(currentTuple));
+ }
+
+ private void arrayEqualityCheck(Object[] array1, Object[] array2) {
+ assertEquals("The result arrays must have the same length", array1.length, array2.length);
+ for (int i = 0; i < array1.length; i++) {
+ assertEquals("Unequal fields at position " + i, array1[i], array2[i]);
+ }
+ }
+
+ private static final Class<?>[] CLASSES = new Class<?>[] { Tuple1.class, Tuple2.class,
+ Tuple3.class, Tuple4.class, Tuple5.class, Tuple6.class, Tuple7.class, Tuple8.class,
+ Tuple9.class, Tuple10.class, Tuple11.class, Tuple12.class, Tuple13.class,
+ Tuple14.class, Tuple15.class, Tuple16.class, Tuple17.class, Tuple18.class,
+ Tuple19.class, Tuple20.class, Tuple21.class, Tuple22.class, Tuple23.class,
+ Tuple24.class, Tuple25.class };
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/ConcatenatedExtractTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/ConcatenatedExtractTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/ConcatenatedExtractTest.java
new file mode 100644
index 0000000..3b098c3
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/ConcatenatedExtractTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.windowing.delta.extractor;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ConcatenatedExtractTest {
+
+ private String[] testStringArray1 = { "1", "2", "3" };
+ private int[] testIntArray1 = { 1, 2, 3 };
+ private String[] testStringArray2 = { "4", "5", "6" };
+ private int[] testIntArray2 = { 4, 5, 6 };
+ private String[] testStringArray3 = { "7", "8", "9" };
+ private int[] testIntArray3 = { 7, 8, 9 };
+ private Tuple2<String[], int[]>[] testTuple2Array;
+ private Tuple2<String[], int[]> testTuple2;
+ private Tuple2<Tuple2<String[], int[]>, Tuple2<String[], int[]>[]> testData;
+
+ @SuppressWarnings("unchecked")
+ @Before
+ public void setupData() {
+ testTuple2Array = new Tuple2[2];
+ testTuple2Array[0] = new Tuple2<String[], int[]>(testStringArray1, testIntArray2);
+ testTuple2Array[1] = new Tuple2<String[], int[]>(testStringArray2, testIntArray1);
+
+ testTuple2 = new Tuple2<String[], int[]>(testStringArray3, testIntArray3);
+
+ testData = new Tuple2<Tuple2<String[], int[]>, Tuple2<String[], int[]>[]>(testTuple2,
+ testTuple2Array);
+ }
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ @Test
+ public void test1() {
+ Extractor ext = new ConcatenatedExtract(new FieldFromTuple(0), new FieldFromTuple(1))
+ .add(new FieldsFromArray(Integer.class, 2, 1, 0));
+ int[] expected = { testIntArray3[2], testIntArray3[1], testIntArray3[0] };
+ assertEquals(new Integer(expected[0]), ((Integer[]) ext.extract(testData))[0]);
+ assertEquals(new Integer(expected[1]), ((Integer[]) ext.extract(testData))[1]);
+ assertEquals(new Integer(expected[2]), ((Integer[]) ext.extract(testData))[2]);
+ }
+
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ @Test
+ public void test2() {
+ Extractor ext = new ConcatenatedExtract(new FieldFromTuple(1), // Tuple2<String[],int[]>[]
+ new FieldsFromArray(Tuple2.class, 1)) // Tuple2<String[],int[]>[]
+ .add(new FieldFromArray(0)) // Tuple2<String[],int[]>
+ .add(new ArrayFromTuple(0)) // Object[] (Containing String[])
+ .add(new FieldFromArray(0)) // String[]
+ .add(new FieldFromArray(1)); // String
+
+ String expected2 = testStringArray2[1];
+ assertEquals(expected2, ext.extract(testData));
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldFromArrayTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldFromArrayTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldFromArrayTest.java
new file mode 100644
index 0000000..d274f4e
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldFromArrayTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.windowing.delta.extractor;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class FieldFromArrayTest {
+
+ String[] testStringArray = { "0", "1", "2", "3", "4" };
+ Integer[] testIntegerArray = { 10, 11, 12, 13, 14 };
+ int[] testIntArray = { 20, 21, 22, 23, 24 };
+
+ @Test
+ public void testStringArray() {
+ for (int i = 0; i < this.testStringArray.length; i++) {
+ assertEquals(this.testStringArray[i],
+ new FieldFromArray<String>(i).extract(testStringArray));
+ }
+ }
+
+ @Test
+ public void testIntegerArray() {
+ for (int i = 0; i < this.testIntegerArray.length; i++) {
+ assertEquals(this.testIntegerArray[i],
+ new FieldFromArray<String>(i).extract(testIntegerArray));
+ }
+ }
+
+ @Test
+ public void testIntArray() {
+ for (int i = 0; i < this.testIntArray.length; i++) {
+ assertEquals(new Integer(this.testIntArray[i]),
+ new FieldFromArray<Integer>(i).extract(testIntArray));
+ }
+ }
+
+}