You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2015/04/15 11:38:53 UTC
[12/19] flink git commit: [streaming] Major internal renaming and
restructure
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/ParallelMerge.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/ParallelMerge.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/ParallelMerge.java
deleted file mode 100644
index 8ffca91..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/ParallelMerge.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator.windowing;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.function.co.RichCoFlatMapFunction;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-import org.apache.flink.util.Collector;
-
-/**
- * Class that encapsulates the functionality necessary to merge windows created
- * in parallel. This CoFlatMap uses the information received on the number of
- * parts for each window to merge the different parts. It waits until it
- * receives an indication on the number of parts from all the discretizers
- * before producing any output.
- */
-public class ParallelMerge<OUT> extends
- RichCoFlatMapFunction<StreamWindow<OUT>, Tuple2<Integer, Integer>, StreamWindow<OUT>> {
-
- private static final long serialVersionUID = 1L;
-
- protected Integer numberOfDiscretizers;
- private ReduceFunction<OUT> reducer;
-
- private Map<Integer, Integer> availableNumberOfParts = new HashMap<Integer, Integer>();
- private Map<Integer, Tuple2<StreamWindow<OUT>, Integer>> receivedWindows = new HashMap<Integer, Tuple2<StreamWindow<OUT>, Integer>>();
- private Map<Integer, Tuple2<Integer, Integer>> receivedNumberOfParts = new HashMap<Integer, Tuple2<Integer, Integer>>();
-
- public ParallelMerge(ReduceFunction<OUT> reducer) {
- this.reducer = reducer;
- }
-
- @Override
- public void flatMap1(StreamWindow<OUT> nextWindow, Collector<StreamWindow<OUT>> out)
- throws Exception {
-
- Integer id = nextWindow.windowID;
-
- Tuple2<StreamWindow<OUT>, Integer> current = receivedWindows.get(id);
-
- if (current == null) {
- current = new Tuple2<StreamWindow<OUT>, Integer>(nextWindow, 1);
- } else {
- updateCurrent(current.f0, nextWindow);
- current.f1++;
- }
-
- Integer count = current.f1;
-
- if (availableNumberOfParts.containsKey(id) && availableNumberOfParts.get(id) <= count) {
- out.collect(current.f0);
- receivedWindows.remove(id);
- availableNumberOfParts.remove(id);
-
- checkOld(id);
-
- } else {
- receivedWindows.put(id, (Tuple2<StreamWindow<OUT>, Integer>) current);
- }
- }
-
- private void checkOld(Integer id) {
- // In case we have remaining partial windows (which indicates errors in
- // processing), output and log them
- if (receivedWindows.containsKey(id - 1)) {
- throw new RuntimeException("Error in processing logic, window with id " + id
- + " should have already been processed");
- }
-
- }
-
- @Override
- public void flatMap2(Tuple2<Integer, Integer> partInfo, Collector<StreamWindow<OUT>> out)
- throws Exception {
-
- Integer id = partInfo.f0;
- Integer numOfParts = partInfo.f1;
-
- Tuple2<Integer, Integer> currentPartInfo = receivedNumberOfParts.get(id);
- if (currentPartInfo != null) {
- currentPartInfo.f0 += numOfParts;
- currentPartInfo.f1++;
- } else {
- currentPartInfo = new Tuple2<Integer, Integer>(numOfParts, 1);
- receivedNumberOfParts.put(id, currentPartInfo);
- }
-
- if (currentPartInfo.f1 >= numberOfDiscretizers) {
- receivedNumberOfParts.remove(id);
-
- Tuple2<StreamWindow<OUT>, Integer> current = receivedWindows.get(id);
-
- Integer count = current != null ? current.f1 : -1;
-
- if (count >= currentPartInfo.f0) {
- out.collect(current.f0);
- receivedWindows.remove(id);
- checkOld(id);
- } else if (currentPartInfo.f0 > 0) {
- availableNumberOfParts.put(id, currentPartInfo.f1);
- }
- }
-
- }
-
- protected void updateCurrent(StreamWindow<OUT> current, StreamWindow<OUT> nextWindow)
- throws Exception {
- if (current.size() != 1 || nextWindow.size() != 1) {
- throw new RuntimeException(
- "Error in parallel merge logic. Current window should contain only one element.");
- }
- OUT currentReduced = current.remove(0);
- currentReduced = reducer.reduce(currentReduced, nextWindow.get(0));
- current.add(currentReduced);
- }
-
- @Override
- public void open(Configuration conf) {
- this.numberOfDiscretizers = getRuntimeContext().getNumberOfParallelSubtasks();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamDiscretizer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamDiscretizer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamDiscretizer.java
deleted file mode 100644
index d4776e8..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamDiscretizer.java
+++ /dev/null
@@ -1,223 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator.windowing;
-
-import org.apache.flink.streaming.api.invokable.StreamInvokable;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-import org.apache.flink.streaming.api.windowing.WindowEvent;
-import org.apache.flink.streaming.api.windowing.policy.ActiveEvictionPolicy;
-import org.apache.flink.streaming.api.windowing.policy.ActiveTriggerCallback;
-import org.apache.flink.streaming.api.windowing.policy.ActiveTriggerPolicy;
-import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
-import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
-
-/**
- * This invokable represents the discretization step of a window transformation.
- * The user supplied eviction and trigger policies are applied to create the
- * {@link StreamWindow} that will be further transformed in the next stages.
- */
-public class StreamDiscretizer<IN> extends StreamInvokable<IN, WindowEvent<IN>> {
-
- /**
- * Auto-generated serial version UID
- */
- private static final long serialVersionUID = -8038984294071650730L;
-
- protected TriggerPolicy<IN> triggerPolicy;
- protected EvictionPolicy<IN> evictionPolicy;
- private boolean isActiveTrigger;
- private boolean isActiveEviction;
- private Thread activePolicyThread;
- private int bufferSize = 0;
-
- protected WindowEvent<IN> windowEvent = new WindowEvent<IN>();
-
- public StreamDiscretizer(TriggerPolicy<IN> triggerPolicy, EvictionPolicy<IN> evictionPolicy) {
- super(null);
-
- this.triggerPolicy = triggerPolicy;
- this.evictionPolicy = evictionPolicy;
-
- this.isActiveTrigger = triggerPolicy instanceof ActiveTriggerPolicy;
- this.isActiveEviction = evictionPolicy instanceof ActiveEvictionPolicy;
- }
-
- public TriggerPolicy<IN> getTrigger() {
- return triggerPolicy;
- }
-
- public EvictionPolicy<IN> getEviction() {
- return evictionPolicy;
- }
-
- @Override
- public void invoke() throws Exception {
-
- // Continuously run
- while (isRunning && readNext() != null) {
- processRealElement(nextObject);
- }
-
- if (activePolicyThread != null) {
- activePolicyThread.interrupt();
- }
-
- emitWindow();
-
- }
-
- /**
- * This method processed an arrived real element The method is synchronized
- * to ensure that it cannot interleave with
- * {@link StreamDiscretizer#triggerOnFakeElement(Object)}
- *
- * @param input
- * a real input element
- * @throws Exception
- */
- protected synchronized void processRealElement(IN input) throws Exception {
-
- // Setting the input element in order to avoid NullFieldException when triggering on fake element
- windowEvent.setElement(input);
- if (isActiveTrigger) {
- ActiveTriggerPolicy<IN> trigger = (ActiveTriggerPolicy<IN>) triggerPolicy;
- Object[] result = trigger.preNotifyTrigger(input);
- for (Object in : result) {
- triggerOnFakeElement(in);
- }
- }
-
- boolean isTriggered = false;
-
- if (triggerPolicy.notifyTrigger(input)) {
- emitWindow();
- isTriggered = true;
- }
-
- evict(input, isTriggered);
-
- collector.collect(windowEvent.setElement(input));
- bufferSize++;
-
- }
-
- /**
- * This method triggers on an arrived fake element The method is
- * synchronized to ensure that it cannot interleave with
- * {@link StreamDiscretizer#processRealElement(Object)}
- *
- * @param input
- * a fake input element
- */
- @SuppressWarnings("unchecked")
- protected synchronized void triggerOnFakeElement(Object input) {
- if (isActiveEviction) {
- activeEvict(input);
- emitWindow();
- } else {
- emitWindow();
- evict((IN) input, true);
- }
- }
-
- /**
- * This method emits the content of the buffer as a new {@link StreamWindow}
- * if not empty
- */
- protected void emitWindow() {
- collector.collect(windowEvent.setTrigger());
- }
-
- private void activeEvict(Object input) {
- int numToEvict = 0;
-
- if (isActiveEviction) {
- ActiveEvictionPolicy<IN> ep = (ActiveEvictionPolicy<IN>) evictionPolicy;
- numToEvict = ep.notifyEvictionWithFakeElement(input, bufferSize);
- }
-
- if (numToEvict > 0) {
- collector.collect(windowEvent.setEviction(numToEvict));
- bufferSize -= numToEvict;
- bufferSize = bufferSize >= 0 ? bufferSize : 0;
- }
- }
-
- private void evict(IN input, boolean isTriggered) {
- int numToEvict = evictionPolicy.notifyEviction(input, isTriggered, bufferSize);
-
- if (numToEvict > 0) {
- collector.collect(windowEvent.setEviction(numToEvict));
- bufferSize -= numToEvict;
- bufferSize = bufferSize >= 0 ? bufferSize : 0;
- }
- }
-
- @Override
- public void open(org.apache.flink.configuration.Configuration parameters) throws Exception {
- super.open(parameters);
-
- if (isActiveTrigger) {
- ActiveTriggerPolicy<IN> tp = (ActiveTriggerPolicy<IN>) triggerPolicy;
-
- Runnable runnable = tp.createActiveTriggerRunnable(new WindowingCallback());
- if (runnable != null) {
- activePolicyThread = new Thread(runnable);
- activePolicyThread.start();
- }
- }
- }
-
- /**
- * This class allows the active trigger thread to call back and push fake
- * elements at any time.
- */
- private class WindowingCallback implements ActiveTriggerCallback {
-
- @Override
- public void sendFakeElement(Object datapoint) {
- triggerOnFakeElement(datapoint);
- }
-
- }
-
- @Override
- public boolean equals(Object other) {
- if (other == null || !(other instanceof StreamDiscretizer)
- || (other instanceof GroupedStreamDiscretizer)) {
- return false;
- } else {
- try {
- @SuppressWarnings("unchecked")
- StreamDiscretizer<IN> otherDiscretizer = (StreamDiscretizer<IN>) other;
-
- return triggerPolicy.equals(otherDiscretizer.triggerPolicy)
- && evictionPolicy.equals(otherDiscretizer.evictionPolicy);
-
- } catch (ClassCastException e) {
- return false;
- }
- }
- }
-
- @Override
- public String toString() {
- return "Discretizer(Trigger: " + triggerPolicy.toString() + ", Eviction: "
- + evictionPolicy.toString() + ")";
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowBufferInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowBufferInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowBufferInvokable.java
deleted file mode 100644
index fbd8258..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowBufferInvokable.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator.windowing;
-
-import org.apache.flink.streaming.api.invokable.ChainableInvokable;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-import org.apache.flink.streaming.api.windowing.WindowEvent;
-import org.apache.flink.streaming.api.windowing.windowbuffer.WindowBuffer;
-
-/**
- * This invokable manages the window buffers attached to the discretizers.
- */
-public class WindowBufferInvokable<T> extends ChainableInvokable<WindowEvent<T>, StreamWindow<T>> {
-
- protected WindowBuffer<T> buffer;
-
- public WindowBufferInvokable(WindowBuffer<T> buffer) {
- super(null);
- this.buffer = buffer;
- withoutInputCopy();
- }
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public void invoke() throws Exception {
- while (isRunning && readNext() != null) {
- callUserFunctionAndLogException();
- }
- }
-
- @Override
- protected void callUserFunction() throws Exception {
- handleWindowEvent(nextObject);
- }
-
- protected void handleWindowEvent(WindowEvent<T> windowEvent, WindowBuffer<T> buffer)
- throws Exception {
- if (windowEvent.isElement()) {
- buffer.store(windowEvent.getElement());
- } else if (windowEvent.isEviction()) {
- buffer.evict(windowEvent.getEviction());
- } else if (windowEvent.isTrigger()) {
- buffer.emitWindow(collector);
- }
- }
-
- private void handleWindowEvent(WindowEvent<T> windowEvent) throws Exception {
- handleWindowEvent(windowEvent, buffer);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowFlattener.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowFlattener.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowFlattener.java
deleted file mode 100644
index 4aff6c1..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowFlattener.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator.windowing;
-
-import org.apache.flink.streaming.api.invokable.ChainableInvokable;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-
-/**
- * This invokable flattens the results of the window transformations by
- * outputing the elements of the {@link StreamWindow} one-by-one
- */
-public class WindowFlattener<T> extends ChainableInvokable<StreamWindow<T>, T> {
-
- public WindowFlattener() {
- super(null);
- withoutInputCopy();
- }
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public void invoke() throws Exception {
- while (isRunning && readNext() != null) {
- callUserFunctionAndLogException();
- }
- }
-
- @Override
- protected void callUserFunction() throws Exception {
- for (T element : nextObject) {
- collector.collect(element);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowFolder.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowFolder.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowFolder.java
deleted file mode 100644
index aa398c5..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowFolder.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator.windowing;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.FoldFunction;
-import org.apache.flink.streaming.api.datastream.WindowedDataStream;
-import org.apache.flink.streaming.api.invokable.operator.MapInvokable;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-
-/**
- * This invokable is used to apply foldWindow transformations on
- * {@link WindowedDataStream}s.
- */
-public class WindowFolder<IN, OUT> extends MapInvokable<StreamWindow<IN>, StreamWindow<OUT>> {
-
- private static final long serialVersionUID = 1L;
- FoldFunction<IN, OUT> folder;
-
- public WindowFolder(FoldFunction<IN, OUT> folder, OUT initialValue) {
- super(new WindowFoldFunction<IN, OUT>(folder, initialValue));
- this.folder = folder;
- withoutInputCopy();
- }
-
- private static class WindowFoldFunction<IN, OUT> implements
- MapFunction<StreamWindow<IN>, StreamWindow<OUT>> {
-
- private static final long serialVersionUID = 1L;
- private OUT initialValue;
- FoldFunction<IN, OUT> folder;
-
- public WindowFoldFunction(FoldFunction<IN, OUT> folder, OUT initialValue) {
- this.folder = folder;
- this.initialValue = initialValue;
- }
-
- @Override
- public StreamWindow<OUT> map(StreamWindow<IN> window) throws Exception {
- StreamWindow<OUT> outputWindow = new StreamWindow<OUT>(window.windowID);
- outputWindow.numberOfParts = window.numberOfParts;
-
- if (!window.isEmpty()) {
- OUT accumulator = initialValue;
- for (int i = 0; i < window.size(); i++) {
- accumulator = folder.fold(accumulator, window.get(i));
- }
- outputWindow.add(accumulator);
- }
- return outputWindow;
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowMapper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowMapper.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowMapper.java
deleted file mode 100644
index a065f4e..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowMapper.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator.windowing;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.streaming.api.datastream.WindowedDataStream;
-import org.apache.flink.streaming.api.function.WindowMapFunction;
-import org.apache.flink.streaming.api.invokable.operator.MapInvokable;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-
-/**
- * This invokable is used to apply mapWindow transformations on
- * {@link WindowedDataStream}s.
- */
-public class WindowMapper<IN, OUT> extends MapInvokable<StreamWindow<IN>, StreamWindow<OUT>> {
-
- private static final long serialVersionUID = 1L;
-
- WindowMapFunction<IN, OUT> mapper;
-
- public WindowMapper(WindowMapFunction<IN, OUT> mapper) {
- super(new WindowMap<IN, OUT>(mapper));
- this.mapper = mapper;
- withoutInputCopy();
- }
-
- private static class WindowMap<T, R> implements MapFunction<StreamWindow<T>, StreamWindow<R>> {
-
- private static final long serialVersionUID = 1L;
- WindowMapFunction<T, R> mapper;
-
- public WindowMap(WindowMapFunction<T, R> mapper) {
- this.mapper = mapper;
- }
-
- @Override
- public StreamWindow<R> map(StreamWindow<T> window) throws Exception {
- StreamWindow<R> outputWindow = new StreamWindow<R>(window.windowID);
-
- outputWindow.numberOfParts = window.numberOfParts;
-
- mapper.mapWindow(window, outputWindow);
-
- return outputWindow;
- }
-
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowMerger.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowMerger.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowMerger.java
deleted file mode 100644
index 4c112d2..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowMerger.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator.windowing;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.flink.streaming.api.invokable.ChainableInvokable;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-
-/**
- * This invokable merges together the different partitions of the
- * {@link StreamWindow}s used to merge the results of parallel transformations
- * that belong in the same window.
- */
-public class WindowMerger<T> extends ChainableInvokable<StreamWindow<T>, StreamWindow<T>> {
-
- private Map<Integer, StreamWindow<T>> windows;
-
- public WindowMerger() {
- super(null);
- this.windows = new HashMap<Integer, StreamWindow<T>>();
- withoutInputCopy();
- }
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public void invoke() throws Exception {
- while (isRunning && readNext() != null) {
- callUserFunctionAndLogException();
- }
- }
-
- @SuppressWarnings("unchecked")
- @Override
- protected void callUserFunction() throws Exception {
- StreamWindow<T> nextWindow = nextObject;
-
- StreamWindow<T> current = windows.get(nextWindow.windowID);
-
- if (current == null) {
- current = nextWindow;
- } else {
- current = StreamWindow.merge(current, nextWindow);
- }
-
- if (current.numberOfParts == 1) {
- collector.collect(current);
- windows.remove(nextWindow.windowID);
- } else {
- windows.put(nextWindow.windowID, current);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowPartExtractor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowPartExtractor.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowPartExtractor.java
deleted file mode 100644
index 416b915..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowPartExtractor.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator.windowing;
-
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-import org.apache.flink.util.Collector;
-
-/**
- * This FlatMapFunction is used to send the number of parts for each window ID
- * (for each parallel discretizer) to the parallel merger that will use is to
- * merge parallel discretized windows
- */
-public class WindowPartExtractor<OUT> implements FlatMapFunction<StreamWindow<OUT>, Tuple2<Integer, Integer>> {
-
- private static final long serialVersionUID = 1L;
-
- Integer lastIndex = -1;
-
- @Override
- public void flatMap(StreamWindow<OUT> value, Collector<Tuple2<Integer, Integer>> out)
- throws Exception {
-
- // We dont emit new values for the same index, this avoids sending the
- // same information for the same partitioned window multiple times
- if (value.windowID != lastIndex) {
-
- // For empty windows we send 0 since these windows will be filtered
- // out
- if (value.isEmpty()) {
- out.collect(new Tuple2<Integer, Integer>(value.windowID, 0));
- } else {
- out.collect(new Tuple2<Integer, Integer>(value.windowID, value.numberOfParts));
- }
- lastIndex = value.windowID;
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowPartitioner.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowPartitioner.java
deleted file mode 100644
index 9672b0f..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowPartitioner.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator.windowing;
-
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.streaming.api.invokable.ChainableInvokable;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-
-/**
- * This invokable applies either split or key partitioning depending on the
- * transformation.
- */
-public class WindowPartitioner<T> extends ChainableInvokable<StreamWindow<T>, StreamWindow<T>> {
-
- private KeySelector<T, ?> keySelector;
- private int numberOfSplits;
-
- public WindowPartitioner(KeySelector<T, ?> keySelector) {
- super(null);
- this.keySelector = keySelector;
- withoutInputCopy();
- }
-
- public WindowPartitioner(int numberOfSplits) {
- super(null);
- this.numberOfSplits = numberOfSplits;
- withoutInputCopy();
- }
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public void invoke() throws Exception {
- while (isRunning && readNext() != null) {
- callUserFunctionAndLogException();
- }
- }
-
- @Override
- protected void callUserFunction() throws Exception {
- StreamWindow<T> currentWindow = nextObject;
- if (keySelector == null) {
- if (numberOfSplits <= 1) {
- collector.collect(currentWindow);
- } else {
- for (StreamWindow<T> window : StreamWindow.split(currentWindow, numberOfSplits)) {
- collector.collect(window);
- }
- }
- } else {
-
- for (StreamWindow<T> window : StreamWindow
- .partitionBy(currentWindow, keySelector, true)) {
- collector.collect(window);
- }
-
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowReducer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowReducer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowReducer.java
deleted file mode 100644
index 67d42b5..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowReducer.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator.windowing;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.streaming.api.datastream.WindowedDataStream;
-import org.apache.flink.streaming.api.invokable.operator.MapInvokable;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-
-/**
- * This invokable is used to apply reduceWindow transformations on
- * {@link WindowedDataStream}s.
- */
-public class WindowReducer<IN> extends MapInvokable<StreamWindow<IN>, StreamWindow<IN>> {
-
- private static final long serialVersionUID = 1L;
-
- ReduceFunction<IN> reducer;
-
- public WindowReducer(ReduceFunction<IN> reducer) {
- super(new WindowReduceFunction<IN>(reducer));
- this.reducer = reducer;
- withoutInputCopy();
- }
-
- private static class WindowReduceFunction<T> implements
- MapFunction<StreamWindow<T>, StreamWindow<T>> {
-
- private static final long serialVersionUID = 1L;
- ReduceFunction<T> reducer;
-
- public WindowReduceFunction(ReduceFunction<T> reducer) {
- this.reducer = reducer;
- }
-
- @Override
- public StreamWindow<T> map(StreamWindow<T> window) throws Exception {
- StreamWindow<T> outputWindow = new StreamWindow<T>(window.windowID);
- outputWindow.numberOfParts = window.numberOfParts;
-
- if (!window.isEmpty()) {
- T reduced = window.get(0);
- for (int i = 1; i < window.size(); i++) {
- reduced = reducer.reduce(reduced, window.get(i));
- }
- outputWindow.add(reduced);
- }
- return outputWindow;
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/ChainableStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/ChainableStreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/ChainableStreamOperator.java
new file mode 100644
index 0000000..cc0790c
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/ChainableStreamOperator.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
+import org.apache.flink.util.Collector;
+
+public abstract class ChainableStreamOperator<IN, OUT> extends StreamOperator<IN, OUT> implements
+ Collector<IN> {
+
+ private static final long serialVersionUID = 1L;
+ private boolean copyInput = true;
+
+ public ChainableStreamOperator(Function userFunction) {
+ super(userFunction);
+ setChainingStrategy(ChainingStrategy.ALWAYS);
+ }
+
+ public void setup(Collector<OUT> collector, StreamRecordSerializer<IN> inSerializer) {
+ this.collector = collector;
+ this.inSerializer = inSerializer;
+ this.objectSerializer = inSerializer.getObjectSerializer();
+ }
+
+ public ChainableStreamOperator<IN, OUT> withoutInputCopy() {
+ copyInput = false;
+ return this;
+ }
+
+ protected IN copyInput(IN input) {
+ return copyInput ? copy(input) : input;
+ }
+
+ @Override
+ public void collect(IN record) {
+ if (isRunning) {
+ nextObject = copyInput(record);
+ callUserFunctionAndLogException();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamCounter.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamCounter.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamCounter.java
new file mode 100644
index 0000000..4c997d5
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamCounter.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+
+public class StreamCounter<IN> extends ChainableStreamOperator<IN, Long> {
+ private static final long serialVersionUID = 1L;
+
+ Long count = 0L;
+
+ public StreamCounter() {
+ super(null);
+ }
+
+ @Override
+ public void run() throws Exception {
+ while (isRunning && readNext() != null) {
+ collector.collect(++count);
+ }
+ }
+
+ @Override
+ public void collect(IN record) {
+ if (isRunning) {
+ collector.collect(++count);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFilter.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFilter.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFilter.java
new file mode 100644
index 0000000..d2cddf6
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFilter.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+
+public class StreamFilter<IN> extends ChainableStreamOperator<IN, IN> {
+
+ private static final long serialVersionUID = 1L;
+
+ FilterFunction<IN> filterFunction;
+ private boolean collect;
+
+ public StreamFilter(FilterFunction<IN> filterFunction) {
+ super(filterFunction);
+ this.filterFunction = filterFunction;
+ }
+
+ @Override
+ public void run() throws Exception {
+ while (isRunning && readNext() != null) {
+ callUserFunctionAndLogException();
+ }
+ }
+
+ @Override
+ protected void callUserFunction() throws Exception {
+ collect = filterFunction.filter(nextObject);
+ if (collect) {
+ collector.collect(nextObject);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFlatMap.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFlatMap.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFlatMap.java
new file mode 100644
index 0000000..a17b162
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFlatMap.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+
+public class StreamFlatMap<IN, OUT> extends ChainableStreamOperator<IN, OUT> {
+ private static final long serialVersionUID = 1L;
+
+ private FlatMapFunction<IN, OUT> flatMapper;
+
+ public StreamFlatMap(FlatMapFunction<IN, OUT> flatMapper) {
+ super(flatMapper);
+ this.flatMapper = flatMapper;
+ }
+
+ @Override
+ public void run() throws Exception {
+ while (isRunning && readNext() != null) {
+ callUserFunctionAndLogException();
+ }
+ }
+
+ @Override
+ protected void callUserFunction() throws Exception {
+ flatMapper.flatMap(nextObject, collector);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFold.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFold.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFold.java
new file mode 100644
index 0000000..fc5f187
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFold.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.api.common.functions.FoldFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+public class StreamFold<IN, OUT> extends ChainableStreamOperator<IN, OUT> {
+ private static final long serialVersionUID = 1L;
+
+ protected FoldFunction<IN, OUT> folder;
+ private OUT accumulator;
+ protected TypeSerializer<OUT> outTypeSerializer;
+
+ public StreamFold(FoldFunction<IN, OUT> folder, OUT initialValue,
+ TypeInformation<OUT> outTypeInformation) {
+ super(folder);
+ this.folder = folder;
+ this.accumulator = initialValue;
+ this.outTypeSerializer = outTypeInformation.createSerializer(executionConfig);
+ }
+
+ @Override
+ public void run() throws Exception {
+ while (isRunning && readNext() != null) {
+ callUserFunctionAndLogException();
+ }
+ }
+
+ @Override
+ protected void callUserFunction() throws Exception {
+
+ accumulator = folder.fold(outTypeSerializer.copy(accumulator), nextObject);
+ collector.collect(accumulator);
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java
new file mode 100644
index 0000000..303f1b3
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.flink.api.common.functions.FoldFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.KeySelector;
+
+public class StreamGroupedFold<IN, OUT> extends StreamFold<IN, OUT> {
+ private static final long serialVersionUID = 1L;
+
+ private KeySelector<IN, ?> keySelector;
+ private Map<Object, OUT> values;
+ private OUT initialValue;
+
+ public StreamGroupedFold(FoldFunction<IN, OUT> folder, KeySelector<IN, ?> keySelector,
+ OUT initialValue, TypeInformation<OUT> outTypeInformation) {
+ super(folder, initialValue, outTypeInformation);
+ this.keySelector = keySelector;
+ this.initialValue = initialValue;
+ values = new HashMap<Object, OUT>();
+ }
+
+ @Override
+ protected void callUserFunction() throws Exception {
+ Object key = nextRecord.getKey(keySelector);
+ OUT accumulator = values.get(key);
+ if (accumulator != null) {
+ OUT folded = folder.fold(outTypeSerializer.copy(accumulator), nextObject);
+ values.put(key, folded);
+ collector.collect(folded);
+ } else {
+ OUT first = folder.fold(outTypeSerializer.copy(initialValue), nextObject);
+ values.put(key, first);
+ collector.collect(first);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
new file mode 100644
index 0000000..f5c8f21
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.java.functions.KeySelector;
+
+public class StreamGroupedReduce<IN> extends StreamReduce<IN> {
+ private static final long serialVersionUID = 1L;
+
+ private KeySelector<IN, ?> keySelector;
+ private Map<Object, IN> values;
+
+ public StreamGroupedReduce(ReduceFunction<IN> reducer, KeySelector<IN, ?> keySelector) {
+ super(reducer);
+ this.keySelector = keySelector;
+ values = new HashMap<Object, IN>();
+ }
+
+ @Override
+ protected void callUserFunction() throws Exception {
+ Object key = keySelector.getKey(nextObject);
+ IN currentValue = values.get(key);
+ if (currentValue != null) {
+ IN reduced = reducer.reduce(copy(currentValue), nextObject);
+ values.put(key, reduced);
+ collector.collect(reduced);
+ } else {
+ values.put(key, nextObject);
+ collector.collect(nextObject);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamMap.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamMap.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamMap.java
new file mode 100644
index 0000000..9f1db1e
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamMap.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.api.common.functions.MapFunction;
+
+public class StreamMap<IN, OUT> extends ChainableStreamOperator<IN, OUT> {
+ private static final long serialVersionUID = 1L;
+
+ private MapFunction<IN, OUT> mapper;
+
+ public StreamMap(MapFunction<IN, OUT> mapper) {
+ super(mapper);
+ this.mapper = mapper;
+ }
+
+ @Override
+ public void run() throws Exception {
+ while (isRunning && readNext() != null) {
+ callUserFunctionAndLogException();
+ }
+ }
+
+ @Override
+ protected void callUserFunction() throws Exception {
+ collector.collect(mapper.map(nextObject));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
new file mode 100644
index 0000000..7ec0b0b
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.runtime.io.IndexedReaderIterator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
+import org.apache.flink.streaming.runtime.tasks.StreamTaskContext;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The StreamOperator represents the base class for all operators in the
+ * streaming topology.
+ *
+ * @param <OUT>
+ * The output type of the operator
+ */
+public abstract class StreamOperator<IN, OUT> implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+ private static final Logger LOG = LoggerFactory.getLogger(StreamOperator.class);
+
+ protected StreamTaskContext<OUT> taskContext;
+
+ protected ExecutionConfig executionConfig = null;
+
+ protected IndexedReaderIterator<StreamRecord<IN>> recordIterator;
+ protected StreamRecordSerializer<IN> inSerializer;
+ protected TypeSerializer<IN> objectSerializer;
+ protected StreamRecord<IN> nextRecord;
+ protected IN nextObject;
+ protected boolean isMutable;
+
+ public Collector<OUT> collector;
+ protected Function userFunction;
+ protected volatile boolean isRunning;
+
+ private ChainingStrategy chainingStrategy = ChainingStrategy.HEAD;
+
+ public StreamOperator(Function userFunction) {
+ this.userFunction = userFunction;
+ }
+
+ /**
+ * Initializes the {@link StreamOperator} for input and output handling
+ *
+ * @param taskContext
+ * StreamTaskContext representing the vertex
+ */
+ public void setup(StreamTaskContext<OUT> taskContext) {
+ this.collector = taskContext.getOutputCollector();
+ this.recordIterator = taskContext.getIndexedInput(0);
+ this.inSerializer = taskContext.getInputSerializer(0);
+ if (this.inSerializer != null) {
+ this.nextRecord = inSerializer.createInstance();
+ this.objectSerializer = inSerializer.getObjectSerializer();
+ }
+ this.taskContext = taskContext;
+ this.executionConfig = taskContext.getExecutionConfig();
+ }
+
+ /**
+ * Method that will be called when the operator starts, should encode the
+ * processing logic
+ */
+ public abstract void run() throws Exception;
+
+ /*
+ * Reads the next record from the reader iterator and stores it in the
+ * nextRecord variable
+ */
+ protected StreamRecord<IN> readNext() throws IOException {
+ this.nextRecord = inSerializer.createInstance();
+ try {
+ nextRecord = recordIterator.next(nextRecord);
+ try {
+ nextObject = nextRecord.getObject();
+ } catch (NullPointerException e) {
+ // end of stream
+ }
+ return nextRecord;
+ } catch (IOException e) {
+ if (isRunning) {
+ throw new RuntimeException("Could not read next record due to: "
+ + StringUtils.stringifyException(e));
+ } else {
+ // Task already cancelled do nothing
+ return null;
+ }
+ } catch (IllegalStateException e) {
+ if (isRunning) {
+ throw new RuntimeException("Could not read next record due to: "
+ + StringUtils.stringifyException(e));
+ } else {
+ // Task already cancelled do nothing
+ return null;
+ }
+ }
+ }
+
+ /**
+ * The call of the user implemented function should be implemented here
+ */
+ protected void callUserFunction() throws Exception {
+ }
+
+ /**
+ * Method for logging exceptions thrown during the user function call
+ */
+ protected void callUserFunctionAndLogException() {
+ try {
+ callUserFunction();
+ } catch (Exception e) {
+ if (LOG.isErrorEnabled()) {
+ LOG.error("Calling user function failed due to: {}",
+ StringUtils.stringifyException(e));
+ }
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Open method to be used if the user defined function extends the
+ * RichFunction class
+ *
+ * @param parameters
+ * The configuration parameters for the operator
+ */
+ public void open(Configuration parameters) throws Exception {
+ isRunning = true;
+ FunctionUtils.openFunction(userFunction, parameters);
+ }
+
+ /**
+ * Close method to be used if the user defined function extends the
+ * RichFunction class
+ *
+ */
+ public void close() {
+ isRunning = false;
+ collector.close();
+ try {
+ FunctionUtils.closeFunction(userFunction);
+ } catch (Exception e) {
+ throw new RuntimeException("Error when closing the function: " + e.getMessage());
+ }
+ }
+
+ public void cancel() {
+ isRunning = false;
+ }
+
+ public void setRuntimeContext(RuntimeContext t) {
+ FunctionUtils.setFunctionRuntimeContext(userFunction, t);
+ }
+
+ protected IN copy(IN record) {
+ return objectSerializer.copy(record);
+ }
+
+ public void setChainingStrategy(ChainingStrategy strategy) {
+ if (strategy == ChainingStrategy.ALWAYS) {
+ if (!(this instanceof ChainableStreamOperator)) {
+ throw new RuntimeException(
+ "Operator needs to extend ChainableOperator to be chained");
+ }
+ }
+ this.chainingStrategy = strategy;
+ }
+
+ public ChainingStrategy getChainingStrategy() {
+ return chainingStrategy;
+ }
+
+ public static enum ChainingStrategy {
+ ALWAYS, NEVER, HEAD;
+ }
+
+ public Function getUserFunction() {
+ return userFunction;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamProject.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamProject.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamProject.java
new file mode 100644
index 0000000..7f8b10d
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamProject.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.configuration.Configuration;
+
+public class StreamProject<IN, OUT extends Tuple> extends ChainableStreamOperator<IN, OUT> {
+ private static final long serialVersionUID = 1L;
+
+ transient OUT outTuple;
+ TypeSerializer<OUT> outTypeSerializer;
+ TypeInformation<OUT> outTypeInformation;
+ int[] fields;
+ int numFields;
+
+ public StreamProject(int[] fields, TypeInformation<OUT> outTypeInformation) {
+ super(null);
+ this.fields = fields;
+ this.numFields = this.fields.length;
+ this.outTypeInformation = outTypeInformation;
+ }
+
+ @Override
+ public void run() throws Exception {
+ while (isRunning && readNext() != null) {
+ callUserFunctionAndLogException();
+ }
+ }
+
+ @Override
+ protected void callUserFunction() throws Exception {
+ for (int i = 0; i < this.numFields; i++) {
+ outTuple.setField(((Tuple)nextObject).getField(fields[i]), i);
+ }
+ collector.collect(outTuple);
+ }
+
+ @Override
+ public void open(Configuration config) throws Exception {
+ super.open(config);
+ this.outTypeSerializer = outTypeInformation.createSerializer(executionConfig);
+ outTuple = outTypeSerializer.createInstance();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamReduce.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamReduce.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamReduce.java
new file mode 100644
index 0000000..179d690
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamReduce.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+
+public class StreamReduce<IN> extends ChainableStreamOperator<IN, IN> {
+ private static final long serialVersionUID = 1L;
+
+ protected ReduceFunction<IN> reducer;
+ private IN currentValue;
+
+ public StreamReduce(ReduceFunction<IN> reducer) {
+ super(reducer);
+ this.reducer = reducer;
+ currentValue = null;
+ }
+
+ @Override
+ public void run() throws Exception {
+ while (isRunning && readNext() != null) {
+ callUserFunctionAndLogException();
+ }
+ }
+
+ @Override
+ protected void callUserFunction() throws Exception {
+
+ if (currentValue != null) {
+ currentValue = reducer.reduce(copy(currentValue), nextObject);
+ } else {
+ currentValue = nextObject;
+
+ }
+ collector.collect(currentValue);
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java
new file mode 100644
index 0000000..d1f93d1
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+
+public class StreamSink<IN> extends ChainableStreamOperator<IN, IN> {
+ private static final long serialVersionUID = 1L;
+
+ private SinkFunction<IN> sinkFunction;
+
+ public StreamSink(SinkFunction<IN> sinkFunction) {
+ super(sinkFunction);
+ this.sinkFunction = sinkFunction;
+ }
+
+ @Override
+ public void run() throws Exception {
+ while (isRunning && readNext() != null) {
+ callUserFunctionAndLogException();
+ }
+ }
+
+ @Override
+ protected void callUserFunction() throws Exception {
+ sinkFunction.invoke(nextObject);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
new file mode 100644
index 0000000..8c834f5
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import java.io.Serializable;
+
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+
+public class StreamSource<OUT> extends StreamOperator<OUT, OUT> implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private SourceFunction<OUT> sourceFunction;
+
+ public StreamSource(SourceFunction<OUT> sourceFunction) {
+ super(sourceFunction);
+ this.sourceFunction = sourceFunction;
+ }
+
+ @Override
+ public void run() {
+ callUserFunctionAndLogException();
+ }
+
+ @Override
+ protected void callUserFunction() throws Exception {
+ sourceFunction.run(collector);
+ }
+
+ @Override
+ public void cancel() {
+ super.cancel();
+ sourceFunction.cancel();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMap.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMap.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMap.java
new file mode 100644
index 0000000..004a17a
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMap.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.co;
+
+import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
+
+public class CoStreamFlatMap<IN1, IN2, OUT> extends CoStreamOperator<IN1, IN2, OUT> {
+ private static final long serialVersionUID = 1L;
+
+ private CoFlatMapFunction<IN1, IN2, OUT> flatMapper;
+
+ public CoStreamFlatMap(CoFlatMapFunction<IN1, IN2, OUT> flatMapper) {
+ super(flatMapper);
+ this.flatMapper = flatMapper;
+ }
+
+ @Override
+ public void handleStream1() throws Exception {
+ callUserFunctionAndLogException1();
+ }
+
+ @Override
+ public void handleStream2() throws Exception {
+ callUserFunctionAndLogException2();
+ }
+
+ @Override
+ protected void callUserFunction1() throws Exception {
+ flatMapper.flatMap1(reuse1.getObject(), collector);
+
+ }
+
+ @Override
+ protected void callUserFunction2() throws Exception {
+ flatMapper.flatMap2(reuse2.getObject(), collector);
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamGroupedReduce.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamGroupedReduce.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamGroupedReduce.java
new file mode 100644
index 0000000..2ed3b2e
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamGroupedReduce.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.co;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.streaming.api.functions.co.CoReduceFunction;
+
+public class CoStreamGroupedReduce<IN1, IN2, OUT> extends CoStreamReduce<IN1, IN2, OUT> {
+ private static final long serialVersionUID = 1L;
+
+ protected KeySelector<IN1, ?> keySelector1;
+ protected KeySelector<IN2, ?> keySelector2;
+ private Map<Object, IN1> values1;
+ private Map<Object, IN2> values2;
+ IN1 reduced1;
+ IN2 reduced2;
+
+ public CoStreamGroupedReduce(CoReduceFunction<IN1, IN2, OUT> coReducer,
+ KeySelector<IN1, ?> keySelector1, KeySelector<IN2, ?> keySelector2) {
+ super(coReducer);
+ this.coReducer = coReducer;
+ this.keySelector1 = keySelector1;
+ this.keySelector2 = keySelector2;
+ values1 = new HashMap<Object, IN1>();
+ values2 = new HashMap<Object, IN2>();
+ }
+
+ @Override
+ public void handleStream1() throws Exception {
+ Object key = reuse1.getKey(keySelector1);
+ currentValue1 = values1.get(key);
+ nextValue1 = reuse1.getObject();
+ if (currentValue1 != null) {
+ callUserFunctionAndLogException1();
+ values1.put(key, reduced1);
+ collector.collect(coReducer.map1(reduced1));
+ } else {
+ values1.put(key, nextValue1);
+ collector.collect(coReducer.map1(nextValue1));
+ }
+ }
+
+ @Override
+ public void handleStream2() throws Exception {
+ Object key = reuse2.getKey(keySelector2);
+ currentValue2 = values2.get(key);
+ nextValue2 = reuse2.getObject();
+ if (currentValue2 != null) {
+ callUserFunctionAndLogException2();
+ values2.put(key, reduced2);
+ collector.collect(coReducer.map2(reduced2));
+ } else {
+ values2.put(key, nextValue2);
+ collector.collect(coReducer.map2(nextValue2));
+ }
+ }
+
+ @Override
+ protected void callUserFunction1() throws Exception {
+ reduced1 = coReducer.reduce1(currentValue1, nextValue1);
+
+ }
+
+ @Override
+ protected void callUserFunction2() throws Exception {
+ reduced2 = coReducer.reduce2(currentValue2, nextValue2);
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamMap.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamMap.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamMap.java
new file mode 100644
index 0000000..932438b
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamMap.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.co;
+
+import org.apache.flink.streaming.api.functions.co.CoMapFunction;
+
+public class CoStreamMap<IN1, IN2, OUT> extends CoStreamOperator<IN1, IN2, OUT> {
+ private static final long serialVersionUID = 1L;
+
+ private CoMapFunction<IN1, IN2, OUT> mapper;
+
+ public CoStreamMap(CoMapFunction<IN1, IN2, OUT> mapper) {
+ super(mapper);
+ this.mapper = mapper;
+ }
+
+ @Override
+ public void handleStream1() throws Exception {
+ callUserFunctionAndLogException1();
+ }
+
+ @Override
+ public void handleStream2() throws Exception {
+ callUserFunctionAndLogException2();
+ }
+
+ @Override
+ protected void callUserFunction1() throws Exception {
+ collector.collect(mapper.map1(reuse1.getObject()));
+
+ }
+
+ @Override
+ protected void callUserFunction2() throws Exception {
+ collector.collect(mapper.map2(reuse2.getObject()));
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamOperator.java
new file mode 100644
index 0000000..214cb17
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamOperator.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.co;
+
+import java.io.IOException;
+
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.runtime.io.CoReaderIterator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
+import org.apache.flink.streaming.runtime.tasks.StreamTaskContext;
+import org.apache.flink.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class CoStreamOperator<IN1, IN2, OUT> extends StreamOperator<IN1, OUT> {
+
+ public CoStreamOperator(Function userFunction) {
+ super(userFunction);
+ }
+
+ private static final long serialVersionUID = 1L;
+ private static final Logger LOG = LoggerFactory.getLogger(CoStreamOperator.class);
+
+ protected CoReaderIterator<StreamRecord<IN1>, StreamRecord<IN2>> recordIterator;
+ protected StreamRecord<IN1> reuse1;
+ protected StreamRecord<IN2> reuse2;
+ protected StreamRecordSerializer<IN1> srSerializer1;
+ protected StreamRecordSerializer<IN2> srSerializer2;
+ protected TypeSerializer<IN1> serializer1;
+ protected TypeSerializer<IN2> serializer2;
+
+ @Override
+ public void setup(StreamTaskContext<OUT> taskContext) {
+ this.collector = taskContext.getOutputCollector();
+
+ this.recordIterator = taskContext.getCoReader();
+
+ this.srSerializer1 = taskContext.getInputSerializer(0);
+ this.srSerializer2 = taskContext.getInputSerializer(1);
+
+ this.reuse1 = srSerializer1.createInstance();
+ this.reuse2 = srSerializer2.createInstance();
+
+ this.serializer1 = srSerializer1.getObjectSerializer();
+ this.serializer2 = srSerializer2.getObjectSerializer();
+ }
+
+ protected void resetReuseAll() {
+ this.reuse1 = srSerializer1.createInstance();
+ this.reuse2 = srSerializer2.createInstance();
+ }
+
+ protected void resetReuse1() {
+ this.reuse1 = srSerializer1.createInstance();
+ }
+
+ protected void resetReuse2() {
+ this.reuse2 = srSerializer2.createInstance();
+ }
+
+ @Override
+ public void run() throws Exception {
+ while (isRunning) {
+ int next;
+ try {
+ next = recordIterator.next(reuse1, reuse2);
+ } catch (IOException e) {
+ if (isRunning) {
+ throw new RuntimeException("Could not read next record.", e);
+ } else {
+ // Task already cancelled do nothing
+ next = 0;
+ }
+ } catch (IllegalStateException e) {
+ if (isRunning) {
+ throw new RuntimeException("Could not read next record.", e);
+ } else {
+ // Task already cancelled do nothing
+ next = 0;
+ }
+ }
+
+ if (next == 0) {
+ break;
+ } else if (next == 1) {
+ initialize1();
+ handleStream1();
+ resetReuse1();
+ } else {
+ initialize2();
+ handleStream2();
+ resetReuse2();
+ }
+ }
+ }
+
+ protected abstract void handleStream1() throws Exception;
+
+ protected abstract void handleStream2() throws Exception;
+
+ protected abstract void callUserFunction1() throws Exception;
+
+ protected abstract void callUserFunction2() throws Exception;
+
+ protected void initialize1() {
+
+ };
+
+ protected void initialize2() {
+
+ };
+
+ protected void callUserFunctionAndLogException1() {
+ try {
+ callUserFunction1();
+ } catch (Exception e) {
+ if (LOG.isErrorEnabled()) {
+ LOG.error("Calling user function failed due to: {}",
+ StringUtils.stringifyException(e));
+ }
+ throw new RuntimeException(e);
+ }
+ }
+
+ protected void callUserFunctionAndLogException2() {
+ try {
+ callUserFunction2();
+ } catch (Exception e) {
+ if (LOG.isErrorEnabled()) {
+ LOG.error("Calling user function failed due to: {}",
+ StringUtils.stringifyException(e));
+ }
+ throw new RuntimeException(e);
+ }
+ }
+
+}