You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/09/25 13:50:29 UTC

[1/3] flink git commit: [hotfix] Remove leftover KeyedTimePanes

Repository: flink
Updated Branches:
  refs/heads/master d8ed58b6a -> 1ebd44a63


[hotfix] Remove leftover KeyedTimePanes

Recently, the aligned window operators were removes, these classes where
leftover after that removal.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/68a99d7a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/68a99d7a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/68a99d7a

Branch: refs/heads/master
Commit: 68a99d7ab7ff10c3c0b6cd19babedbfdbfc31354
Parents: d8ed58b
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Mon Sep 25 10:59:39 2017 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Sep 25 10:59:39 2017 +0200

----------------------------------------------------------------------
 .../windowing/AbstractKeyedTimePanes.java       | 156 -------------
 .../windowing/AccumulatingKeyedTimePanes.java   | 224 -------------------
 .../windowing/AggregatingKeyedTimePanes.java    | 119 ----------
 3 files changed, 499 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/68a99d7a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractKeyedTimePanes.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractKeyedTimePanes.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractKeyedTimePanes.java
deleted file mode 100644
index f815107..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractKeyedTimePanes.java
+++ /dev/null
@@ -1,156 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.operators.windowing;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-import org.apache.flink.util.Collector;
-
-import java.io.IOException;
-import java.util.ArrayDeque;
-import java.util.Iterator;
-
-/**
- * Base class for a multiple key/value maps organized in panes.
- */
-@Internal
-public abstract class AbstractKeyedTimePanes<Type, Key, Aggregate, Result> {
-
-	private static final int BEGIN_OF_STATE_MAGIC_NUMBER = 0x0FF1CE42;
-
-	private static final int BEGIN_OF_PANE_MAGIC_NUMBER = 0xBADF00D5;
-
-	/** The latest time pane. */
-	protected KeyMap<Key, Aggregate> latestPane = new KeyMap<>();
-
-	/** The previous time panes, ordered by time (early to late). */
-	protected final ArrayDeque<KeyMap<Key, Aggregate>> previousPanes = new ArrayDeque<>();
-
-	// ------------------------------------------------------------------------
-
-	public abstract void addElementToLatestPane(Type element) throws Exception;
-
-	public abstract void evaluateWindow(Collector<Result> out, TimeWindow window, AbstractStreamOperator<Result> operator) throws Exception;
-
-	public void dispose() {
-		// since all is heap data, there is no need to clean up anything
-		latestPane = null;
-		previousPanes.clear();
-	}
-
-	public int getNumPanes() {
-		return previousPanes.size() + 1;
-	}
-
-	public void slidePanes(int panesToKeep) {
-		if (panesToKeep > 1) {
-			// the current pane becomes the latest previous pane
-			previousPanes.addLast(latestPane);
-
-			// truncate the history
-			while (previousPanes.size() >= panesToKeep) {
-				previousPanes.removeFirst();
-			}
-		}
-
-		// we need a new latest pane
-		latestPane = new KeyMap<>();
-	}
-
-	public void truncatePanes(int numToRetain) {
-		while (previousPanes.size() >= numToRetain) {
-			previousPanes.removeFirst();
-		}
-	}
-
-	protected void traverseAllPanes(KeyMap.TraversalEvaluator<Key, Aggregate> traversal, long traversalPass) throws Exception{
-		// gather all panes in an array (faster iterations)
-		@SuppressWarnings({"unchecked", "rawtypes"})
-		KeyMap<Key, Aggregate>[] panes = previousPanes.toArray(new KeyMap[previousPanes.size() + 1]);
-		panes[panes.length - 1] = latestPane;
-
-		// let the maps make a coordinated traversal and evaluate the window function per contained key
-		KeyMap.traverseMaps(panes, traversal, traversalPass);
-	}
-
-	// ------------------------------------------------------------------------
-	//  Serialization and de-serialization
-	// ------------------------------------------------------------------------
-
-	public void writeToOutput(
-			final DataOutputView output,
-			final TypeSerializer<Key> keySerializer,
-			final TypeSerializer<Aggregate> aggSerializer) throws IOException {
-		output.writeInt(BEGIN_OF_STATE_MAGIC_NUMBER);
-
-		int numPanes = getNumPanes();
-		output.writeInt(numPanes);
-
-		// write from the past
-		Iterator<KeyMap<Key, Aggregate>> previous = previousPanes.iterator();
-		for (int paneNum = 0; paneNum < numPanes; paneNum++) {
-			output.writeInt(BEGIN_OF_PANE_MAGIC_NUMBER);
-			KeyMap<Key, Aggregate> pane = (paneNum == numPanes - 1) ? latestPane : previous.next();
-
-			output.writeInt(pane.size());
-			for (KeyMap.Entry<Key, Aggregate> entry : pane) {
-				keySerializer.serialize(entry.getKey(), output);
-				aggSerializer.serialize(entry.getValue(), output);
-			}
-		}
-	}
-
-	public void readFromInput(
-			final DataInputView input,
-			final TypeSerializer<Key> keySerializer,
-			final TypeSerializer<Aggregate> aggSerializer) throws IOException {
-		validateMagicNumber(BEGIN_OF_STATE_MAGIC_NUMBER, input.readInt());
-		int numPanes = input.readInt();
-
-		// read from the past towards the presence
-		while (numPanes > 0) {
-			validateMagicNumber(BEGIN_OF_PANE_MAGIC_NUMBER, input.readInt());
-			KeyMap<Key, Aggregate> pane = (numPanes == 1) ? latestPane : new KeyMap<Key, Aggregate>();
-
-			final int numElementsInPane = input.readInt();
-			for (int i = numElementsInPane - 1; i >= 0; i--) {
-				Key k = keySerializer.deserialize(input);
-				Aggregate a = aggSerializer.deserialize(input);
-				pane.put(k, a);
-			}
-
-			if (numPanes > 1) {
-				previousPanes.addLast(pane);
-			}
-			numPanes--;
-		}
-	}
-
-	private static void validateMagicNumber(int expected, int found) throws IOException {
-		if (expected != found) {
-			throw new IOException("Corrupt state stream - wrong magic number. " +
-				"Expected '" + Integer.toHexString(expected) +
-				"', found '" + Integer.toHexString(found) + '\'');
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/68a99d7a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java
deleted file mode 100644
index 6892aaa..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java
+++ /dev/null
@@ -1,224 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.operators.windowing;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.state.FoldingState;
-import org.apache.flink.api.common.state.FoldingStateDescriptor;
-import org.apache.flink.api.common.state.KeyedStateStore;
-import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.state.MapState;
-import org.apache.flink.api.common.state.MapStateDescriptor;
-import org.apache.flink.api.common.state.ReducingState;
-import org.apache.flink.api.common.state.ReducingStateDescriptor;
-import org.apache.flink.api.common.state.ValueState;
-import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-import org.apache.flink.streaming.api.windowing.windows.Window;
-import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.UnionIterator;
-
-import java.util.ArrayList;
-
-/**
- * Key/value map organized in panes for accumulating windows (with a window function).
- */
-@Internal
-public class AccumulatingKeyedTimePanes<Type, Key, Result> extends AbstractKeyedTimePanes<Type, Key, ArrayList<Type>, Result> {
-
-	private final KeySelector<Type, Key> keySelector;
-
-	private final KeyMap.LazyFactory<ArrayList<Type>> listFactory = getListFactory();
-
-	private final InternalWindowFunction<Iterable<Type>, Result, Key, Window> function;
-
-	private final AccumulatingKeyedTimePanesContext context;
-
-	/**
-	 * IMPORTANT: This value needs to start at one, so it is fresher than the value that new entries
-	 * have (zero).
-	 */
-	private long evaluationPass = 1L;
-
-	// ------------------------------------------------------------------------
-
-	public AccumulatingKeyedTimePanes(KeySelector<Type, Key> keySelector, InternalWindowFunction<Iterable<Type>, Result, Key, Window> function) {
-		this.keySelector = keySelector;
-		this.function = function;
-		this.context = new AccumulatingKeyedTimePanesContext();
-	}
-
-	// ------------------------------------------------------------------------
-
-	@Override
-	public void addElementToLatestPane(Type element) throws Exception {
-		Key k = keySelector.getKey(element);
-		ArrayList<Type> elements = latestPane.putIfAbsent(k, listFactory);
-		elements.add(element);
-	}
-
-	@Override
-	public void evaluateWindow(Collector<Result> out, final TimeWindow window,
-								AbstractStreamOperator<Result> operator) throws Exception {
-		if (previousPanes.isEmpty()) {
-			// optimized path for single pane case (tumbling window)
-			for (KeyMap.Entry<Key, ArrayList<Type>> entry : latestPane) {
-				Key key = entry.getKey();
-				operator.setCurrentKey(key);
-				context.globalState = operator.getKeyedStateStore();
-
-				function.process(entry.getKey(), window, context, entry.getValue(), out);
-			}
-		}
-		else {
-			// general code path for multi-pane case
-			WindowFunctionTraversal<Key, Type, Result> evaluator = new WindowFunctionTraversal<>(
-					function, window, out, operator, context);
-			traverseAllPanes(evaluator, evaluationPass);
-		}
-
-		evaluationPass++;
-	}
-
-	// ------------------------------------------------------------------------
-	//  Running a window function in a map traversal
-	// ------------------------------------------------------------------------
-
-	static final class WindowFunctionTraversal<Key, Type, Result> implements KeyMap.TraversalEvaluator<Key, ArrayList<Type>> {
-
-		private final InternalWindowFunction<Iterable<Type>, Result, Key, Window> function;
-
-		private final UnionIterator<Type> unionIterator;
-
-		private final Collector<Result> out;
-
-		private final TimeWindow window;
-
-		private final AbstractStreamOperator<Result> contextOperator;
-
-		private Key currentKey;
-
-		private AccumulatingKeyedTimePanesContext context;
-
-		WindowFunctionTraversal(InternalWindowFunction<Iterable<Type>, Result, Key, Window> function, TimeWindow window,
-								Collector<Result> out, AbstractStreamOperator<Result> contextOperator, AccumulatingKeyedTimePanesContext context) {
-			this.function = function;
-			this.out = out;
-			this.unionIterator = new UnionIterator<>();
-			this.window = window;
-			this.contextOperator = contextOperator;
-			this.context = context;
-		}
-
-		@Override
-		public void startNewKey(Key key) {
-			unionIterator.clear();
-			currentKey = key;
-		}
-
-		@Override
-		public void nextValue(ArrayList<Type> value) {
-			unionIterator.addList(value);
-		}
-
-		@Override
-		public void keyDone() throws Exception {
-			contextOperator.setCurrentKey(currentKey);
-			context.globalState = contextOperator.getKeyedStateStore();
-			function.process(currentKey, window, context, unionIterator, out);
-		}
-	}
-
-	// ------------------------------------------------------------------------
-	//  Lazy factory for lists (put if absent)
-	// ------------------------------------------------------------------------
-
-	@SuppressWarnings("unchecked")
-	private static <V> KeyMap.LazyFactory<ArrayList<V>> getListFactory() {
-		return (KeyMap.LazyFactory<ArrayList<V>>) LIST_FACTORY;
-	}
-
-	private static class ThrowingKeyedStateStore implements KeyedStateStore {
-		@Override
-		public <T> ValueState<T> getState(ValueStateDescriptor<T> stateProperties) {
-			throw new UnsupportedOperationException("Per-window state is not supported when using aligned processing-time windows.");
-		}
-
-		@Override
-		public <T> ListState<T> getListState(ListStateDescriptor<T> stateProperties) {
-			throw new UnsupportedOperationException("Per-window state is not supported when using aligned processing-time windows.");
-		}
-
-		@Override
-		public <T> ReducingState<T> getReducingState(ReducingStateDescriptor<T> stateProperties) {
-			throw new UnsupportedOperationException("Per-window state is not supported when using aligned processing-time windows.");
-		}
-
-		@Override
-		public <T, A> FoldingState<T, A> getFoldingState(FoldingStateDescriptor<T, A> stateProperties) {
-			throw new UnsupportedOperationException("Per-window state is not supported when using aligned processing-time windows.");
-		}
-
-		@Override
-		public <UK, UV> MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> stateProperties) {
-			throw new UnsupportedOperationException("Per-window state is not supported when using aligned processing-time windows.");
-		}
-	}
-
-	private static class AccumulatingKeyedTimePanesContext implements InternalWindowFunction.InternalWindowContext {
-		KeyedStateStore globalState;
-		KeyedStateStore throwingStore;
-
-		public AccumulatingKeyedTimePanesContext() {
-			this.throwingStore = new ThrowingKeyedStateStore();
-		}
-
-		@Override
-		public long currentProcessingTime() {
-			throw new UnsupportedOperationException("current processing time is not supported in this context");
-		}
-
-		@Override
-		public long currentWatermark() {
-			throw new UnsupportedOperationException("current watermark is not supported in this context");
-		}
-
-		@Override
-		public KeyedStateStore windowState() {
-			return throwingStore;
-		}
-
-		@Override
-		public KeyedStateStore globalState() {
-			return globalState;
-		}
-	}
-
-	private static final KeyMap.LazyFactory<?> LIST_FACTORY = new KeyMap.LazyFactory<ArrayList<?>>() {
-
-		@Override
-		public ArrayList<?> create() {
-			return new ArrayList<>(4);
-		}
-	};
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/68a99d7a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingKeyedTimePanes.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingKeyedTimePanes.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingKeyedTimePanes.java
deleted file mode 100644
index 66d41f1..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingKeyedTimePanes.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.operators.windowing;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-import org.apache.flink.util.Collector;
-
-/**
- * Key/value map organized in panes for aggregating windows (with a reduce function).
- */
-@Internal
-public class AggregatingKeyedTimePanes<Type, Key> extends AbstractKeyedTimePanes<Type, Key, Type, Type> {
-
-	private final KeySelector<Type, Key> keySelector;
-
-	private final ReduceFunction<Type> reducer;
-
-	/**
-	 * IMPORTANT: This value needs to start at one, so it is fresher than the value that new entries
-	 * have (zero).
-	 */
-	private long evaluationPass = 1L;
-
-	// ------------------------------------------------------------------------
-
-	public AggregatingKeyedTimePanes(KeySelector<Type, Key> keySelector, ReduceFunction<Type> reducer) {
-		this.keySelector = keySelector;
-		this.reducer = reducer;
-	}
-
-	// ------------------------------------------------------------------------
-
-	@Override
-	public void addElementToLatestPane(Type element) throws Exception {
-		Key k = keySelector.getKey(element);
-		latestPane.putOrAggregate(k, element, reducer);
-	}
-
-	@Override
-	public void evaluateWindow(Collector<Type> out, TimeWindow window,
-								AbstractStreamOperator<Type> operator) throws Exception {
-		if (previousPanes.isEmpty()) {
-			// optimized path for single pane case
-			for (KeyMap.Entry<Key, Type> entry : latestPane) {
-				out.collect(entry.getValue());
-			}
-		}
-		else {
-			// general code path for multi-pane case
-			AggregatingTraversal<Key, Type> evaluator = new AggregatingTraversal<>(reducer, out, operator);
-			traverseAllPanes(evaluator, evaluationPass);
-		}
-
-		evaluationPass++;
-	}
-
-	// ------------------------------------------------------------------------
-	//  The maps traversal that performs the final aggregation
-	// ------------------------------------------------------------------------
-
-	static final class AggregatingTraversal<Key, Type> implements KeyMap.TraversalEvaluator<Key, Type> {
-
-		private final ReduceFunction<Type> function;
-
-		private final Collector<Type> out;
-
-		private final AbstractStreamOperator<Type> operator;
-
-		private Type currentValue;
-
-		AggregatingTraversal(ReduceFunction<Type> function, Collector<Type> out,
-								AbstractStreamOperator<Type> operator) {
-			this.function = function;
-			this.out = out;
-			this.operator = operator;
-		}
-
-		@Override
-		public void startNewKey(Key key) {
-			currentValue = null;
-			operator.setCurrentKey(key);
-		}
-
-		@Override
-		public void nextValue(Type value) throws Exception {
-			if (currentValue != null) {
-				currentValue = function.reduce(currentValue, value);
-			}
-			else {
-				currentValue = value;
-			}
-		}
-
-		@Override
-		public void keyDone() throws Exception {
-			out.collect(currentValue);
-		}
-	}
-}


[2/3] flink git commit: [FLINK-7635] Support side output in ProcessWindowFunction

Posted by al...@apache.org.
[FLINK-7635] Support side output in ProcessWindowFunction


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c151a537
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c151a537
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c151a537

Branch: refs/heads/master
Commit: c151a537c205d20db598354ba5afc4f228c746c3
Parents: 68a99d7
Author: Bowen Li <bo...@gmail.com>
Authored: Tue Sep 19 23:35:34 2017 -0700
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Sep 25 12:06:51 2017 +0200

----------------------------------------------------------------------
 .../InternalProcessApplyWindowContext.java      |  6 +++
 .../windowing/ProcessWindowFunction.java        |  9 ++++
 .../api/operators/ProcessOperator.java          |  5 +--
 .../operators/windowing/WindowOperator.java     |  7 +++
 .../functions/InternalProcessWindowContext.java |  6 +++
 .../functions/InternalWindowFunction.java       |  3 ++
 .../scala/function/ProcessWindowFunction.scala  |  9 ++--
 .../ScalaProcessWindowFunctionWrapper.scala     |  5 +++
 .../streaming/api/scala/SideOutputITCase.scala  | 46 ++++++++++++++++++++
 .../streaming/runtime/SideOutputITCase.java     | 35 +++++++++++++++
 10 files changed, 124 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c151a537/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/InternalProcessApplyWindowContext.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/InternalProcessApplyWindowContext.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/InternalProcessApplyWindowContext.java
index 47a2e3a..3d52e35 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/InternalProcessApplyWindowContext.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/InternalProcessApplyWindowContext.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.api.functions.windowing;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.state.KeyedStateStore;
 import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.util.OutputTag;
 
 /**
  * Internal reusable context wrapper.
@@ -64,4 +65,9 @@ public class InternalProcessApplyWindowContext<IN, OUT, KEY, W extends Window>
 	public KeyedStateStore globalState() {
 		return context.globalState();
 	}
+
+	@Override
+	public <X> void output(OutputTag<X> outputTag, X value) {
+		context.output(outputTag, value);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c151a537/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.java
index 506b610..08ed49c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.functions.AbstractRichFunction;
 import org.apache.flink.api.common.state.KeyedStateStore;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.util.Collector;
+import org.apache.flink.util.OutputTag;
 
 /**
  * Base abstract class for functions that are evaluated over keyed (grouped) windows using a context
@@ -85,5 +86,13 @@ public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window> exte
 		 * State accessor for per-key global state.
 		 */
 		public abstract KeyedStateStore globalState();
+
+		/**
+		 * Emits a record to the side output identified by the {@link OutputTag}.
+		 *
+		 * @param outputTag the {@code OutputTag} that identifies the side output to emit to.
+		 * @param value The record to emit.
+		 */
+		public abstract <X> void output(OutputTag<X> outputTag, X value);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c151a537/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/ProcessOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/ProcessOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/ProcessOperator.java
index 5c9e8fc..b353a63 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/ProcessOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/ProcessOperator.java
@@ -73,10 +73,7 @@ public class ProcessOperator<IN, OUT>
 		this.currentWatermark = mark.getTimestamp();
 	}
 
-	private class ContextImpl
-			extends ProcessFunction<IN, OUT>.Context
-			implements TimerService {
-
+	private class ContextImpl extends ProcessFunction<IN, OUT>.Context implements TimerService {
 		private StreamRecord<IN> element;
 
 		private final ProcessingTimeService processingTimeService;

http://git-wip-us.apache.org/repos/asf/flink/blob/c151a537/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index b14739f..fd90e65 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -774,6 +774,13 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 		public KeyedStateStore globalState() {
 			return WindowOperator.this.getKeyedStateStore();
 		}
+
+		public <X> void output(OutputTag<X> outputTag, X value) {
+			if (outputTag == null) {
+				throw new IllegalArgumentException("OutputTag must not be null.");
+			}
+			output.collect(outputTag, new StreamRecord<>(value, window.maxTimestamp()));
+		}
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/c151a537/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalProcessWindowContext.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalProcessWindowContext.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalProcessWindowContext.java
index 9505332..4d5d1c6 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalProcessWindowContext.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalProcessWindowContext.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.state.KeyedStateStore;
 import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
 import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.util.OutputTag;
 
 /**
  * Internal reusable context wrapper.
@@ -66,4 +67,9 @@ public class InternalProcessWindowContext<IN, OUT, KEY, W extends Window>
 	public KeyedStateStore globalState() {
 		return internalContext.globalState();
 	}
+
+	@Override
+	public <X> void output(OutputTag<X> outputTag, X value) {
+		internalContext.output(outputTag, value);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c151a537/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalWindowFunction.java
index 0999565..c304d7a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalWindowFunction.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.state.KeyedStateStore;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.util.Collector;
+import org.apache.flink.util.OutputTag;
 
 /**
  * Internal interface for functions that are evaluated over keyed (grouped) windows.
@@ -63,5 +64,7 @@ public interface InternalWindowFunction<IN, OUT, KEY, W extends Window> extends
 		KeyedStateStore windowState();
 
 		KeyedStateStore globalState();
+
+		<X> void output(OutputTag<X> outputTag, X value);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c151a537/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessWindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessWindowFunction.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessWindowFunction.scala
index d2075db..7ae51ea 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessWindowFunction.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessWindowFunction.scala
@@ -18,11 +18,10 @@
 
 package org.apache.flink.streaming.api.scala.function
 
-import java.io.Serializable
-
 import org.apache.flink.annotation.PublicEvolving
 import org.apache.flink.api.common.functions.AbstractRichFunction
 import org.apache.flink.api.common.state.KeyedStateStore
+import org.apache.flink.streaming.api.scala.OutputTag
 import org.apache.flink.streaming.api.windowing.windows.Window
 import org.apache.flink.util.Collector
 
@@ -88,6 +87,10 @@ abstract class ProcessWindowFunction[IN, OUT, KEY, W <: Window]
       * State accessor for per-key global state.
       */
     def globalState: KeyedStateStore
-  }
 
+    /**
+      * Emits a record to the side output identified by the [[OutputTag]].
+      */
+    def output[X](outputTag: OutputTag[X], value: X);
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c151a537/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala
index bc4b7dd..98b050c 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.functions.RuntimeContext
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.streaming.api.functions.windowing.{ProcessWindowFunction => JProcessWindowFunction}
 import org.apache.flink.streaming.api.functions.windowing.{ProcessAllWindowFunction => JProcessAllWindowFunction}
+import org.apache.flink.streaming.api.scala.OutputTag
 import org.apache.flink.streaming.api.scala.function.{ProcessWindowFunction => ScalaProcessWindowFunction}
 import org.apache.flink.streaming.api.scala.function.{ProcessAllWindowFunction => ScalaProcessAllWindowFunction}
 import org.apache.flink.streaming.api.windowing.windows.Window
@@ -56,6 +57,8 @@ final class ScalaProcessWindowFunctionWrapper[IN, OUT, KEY, W <: Window](
       override def windowState = context.windowState()
 
       override def globalState = context.globalState()
+
+      override def output[X](outputTag: OutputTag[X], value: X) = context.output(outputTag, value)
     }
     func.process(key, ctx, elements.asScala, out)
   }
@@ -71,6 +74,8 @@ final class ScalaProcessWindowFunctionWrapper[IN, OUT, KEY, W <: Window](
       override def windowState = context.windowState()
 
       override def globalState = context.globalState()
+
+      override def output[X](outputTag: OutputTag[X], value: X) = context.output(outputTag, value)
     }
     func.clear(ctx)
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/c151a537/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/SideOutputITCase.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/SideOutputITCase.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/SideOutputITCase.scala
index 29bcbcf..f09323c 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/SideOutputITCase.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/SideOutputITCase.scala
@@ -234,6 +234,52 @@ class SideOutputITCase extends StreamingMultipleProgramsTestBase {
     assertEquals(util.Arrays.asList(("3", 3), ("4", 4)), lateResultSink.getResult)
   }
 
+  /**
+    * Test ProcessWindowFunction side output.
+    */
+  @Test
+  def testProcessWindowFunctionSideOutput() {
+    val resultSink = new TestListResultSink[String]
+    val sideOutputResultSink = new TestListResultSink[String]
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setParallelism(1)
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+
+    val dataStream = env.fromElements(("1", 1), ("2", 2), ("5", 5), ("3", 3), ("4", 4))
+
+
+    val sideOutputTag = OutputTag[String]("side")
+
+    val windowOperator = dataStream
+      .assignTimestampsAndWatermarks(new TestAssigner)
+      .keyBy(i => i._1)
+      .window(TumblingEventTimeWindows.of(Time.milliseconds(1)))
+      .process(new ProcessWindowFunction[(String, Int), String, String, TimeWindow] {
+        override def process(
+            key: String,
+            context: Context,
+            elements: Iterable[(String, Int)],
+            out: Collector[String]): Unit = {
+          for (in <- elements) {
+            out.collect(in._1)
+            context.output(sideOutputTag, "sideout-" + in._1)
+          }
+        }
+      })
+
+    windowOperator
+      .getSideOutput(sideOutputTag)
+      .addSink(sideOutputResultSink)
+
+    windowOperator.addSink(resultSink)
+
+    env.execute()
+
+    assertEquals(util.Arrays.asList("1", "2", "5"), resultSink.getResult)
+    assertEquals(util.Arrays.asList("sideout-1", "sideout-2", "sideout-5"),
+                  sideOutputResultSink.getResult)
+  }
 }
 
 class TestAssigner extends AssignerWithPunctuatedWatermarks[(String, Int)] {

http://git-wip-us.apache.org/repos/asf/flink/blob/c151a537/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java
index f73bf42..f74f8ff 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java
@@ -29,6 +29,7 @@ import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
 import org.apache.flink.streaming.api.functions.ProcessFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
@@ -547,4 +548,38 @@ public class SideOutputITCase extends StreamingMultipleProgramsTestBase implemen
 		assertEquals(Collections.singletonList(3), lateResultSink.getSortedResult());
 	}
 
+	@Test
+	public void testProcessdWindowFunctionSideOutput() throws Exception {
+		TestListResultSink<Integer> resultSink = new TestListResultSink<>();
+		TestListResultSink<String> sideOutputResultSink = new TestListResultSink<>();
+
+		StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
+		see.setParallelism(3);
+		see.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+		DataStream<Integer> dataStream = see.fromCollection(elements);
+
+		OutputTag<String> sideOutputTag = new OutputTag<String>("side"){};
+
+		SingleOutputStreamOperator<Integer> windowOperator = dataStream
+				.assignTimestampsAndWatermarks(new TestWatermarkAssigner())
+				.keyBy(new TestKeySelector())
+				.timeWindow(Time.milliseconds(1), Time.milliseconds(1))
+				.process(new ProcessWindowFunction<Integer, Integer, Integer, TimeWindow>() {
+					private static final long serialVersionUID = 1L;
+
+					@Override
+					public void process(Integer integer, Context context, Iterable<Integer> elements, Collector<Integer> out) throws Exception {
+						out.collect(integer);
+						context.output(sideOutputTag, "sideout-" + String.valueOf(integer));
+					}
+				});
+
+		windowOperator.getSideOutput(sideOutputTag).addSink(sideOutputResultSink);
+		windowOperator.addSink(resultSink);
+		see.execute();
+
+		assertEquals(Arrays.asList("sideout-1", "sideout-2", "sideout-5"), sideOutputResultSink.getSortedResult());
+		assertEquals(Arrays.asList(1, 2, 5), resultSink.getSortedResult());
+	}
 }


[3/3] flink git commit: [FLINK-7635] Add side-output test in WindowOperatorContractTest

Posted by al...@apache.org.
[FLINK-7635] Add side-output test in WindowOperatorContractTest


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1ebd44a6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1ebd44a6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1ebd44a6

Branch: refs/heads/master
Commit: 1ebd44a634fe5053c89acf7092571a6b169f11b9
Parents: c151a53
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Mon Sep 25 15:49:44 2017 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Sep 25 15:49:44 2017 +0200

----------------------------------------------------------------------
 .../windowing/WindowOperatorContractTest.java   | 54 ++++++++++++++++++++
 1 file changed, 54 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1ebd44a6/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
index 8ceda45..bd263f6 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
@@ -366,6 +366,60 @@ public abstract class WindowOperatorContractTest extends TestLogger {
 
 	}
 
+	/**
+	 * This also verifies that the timestamps ouf side-emitted records is correct.
+	 */
+	@Test
+	public void testSideOutput() throws Exception {
+
+		final OutputTag<Integer> integerOutputTag = new OutputTag<Integer>("int-out") {};
+		final OutputTag<Long> longOutputTag = new OutputTag<Long>("long-out") {};
+
+		WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner();
+		Trigger<Integer, TimeWindow> mockTrigger = mockTrigger();
+
+		InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> windowFunction =
+			new InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow>() {
+				@Override
+				public void process(
+						Integer integer,
+						TimeWindow window,
+						InternalWindowContext ctx,
+						Iterable<Integer> input,
+						Collector<Void> out) throws Exception {
+					Integer inputValue = input.iterator().next();
+
+					ctx.output(integerOutputTag, inputValue);
+					ctx.output(longOutputTag, inputValue.longValue());
+				}
+
+				@Override
+				public void clear(
+					TimeWindow window,
+					InternalWindowContext context) throws Exception {}
+			};
+
+		OneInputStreamOperatorTestHarness<Integer, Void> testHarness =
+			createWindowOperator(mockAssigner, mockTrigger, 0L, windowFunction);
+
+		testHarness.open();
+
+		final long windowEnd = 42L;
+
+		when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext()))
+			.thenReturn(Collections.singletonList(new TimeWindow(0, windowEnd)));
+
+		shouldFireOnElement(mockTrigger);
+
+		testHarness.processElement(new StreamRecord<>(17, 5L));
+
+		assertThat(testHarness.getSideOutput(integerOutputTag),
+			contains(isStreamRecord(17, windowEnd - 1)));
+
+		assertThat(testHarness.getSideOutput(longOutputTag),
+			contains(isStreamRecord(17L, windowEnd - 1)));
+	}
+
 	@Test
 	public void testAssignerIsInvokedOncePerElement() throws Exception {