You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/21 11:04:00 UTC

[44/51] [partial] flink git commit: [FLINK-2877] Move Streaming API out of Staging package

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/pom.xml b/flink-staging/flink-streaming/flink-streaming-core/pom.xml
deleted file mode 100644
index cebb538..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/pom.xml
+++ /dev/null
@@ -1,116 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-
-	<modelVersion>4.0.0</modelVersion>
-
-	<parent>
-		<groupId>org.apache.flink</groupId>
-		<artifactId>flink-streaming-parent</artifactId>
-		<version>0.10-SNAPSHOT</version>
-		<relativePath>..</relativePath>
-	</parent>
-
-	<artifactId>flink-streaming-core</artifactId>
-	<name>flink-streaming-core</name>
-
-	<packaging>jar</packaging>
-
-	<dependencies>
-
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-core</artifactId>
-			<version>${project.version}</version>
-		</dependency>
-
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-runtime</artifactId>
-			<version>${project.version}</version>
-		</dependency>
-
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-clients</artifactId>
-			<version>${project.version}</version>
-		</dependency>
-
-		<dependency>
-			<groupId>org.apache.commons</groupId>
-			<artifactId>commons-math</artifactId>
-			<version>2.2</version>
-		</dependency>
-
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-test-utils</artifactId>
-			<version>${project.version}</version>
-			<scope>test</scope>
-		</dependency>
-
-		<dependency>
-			<groupId>org.apache.sling</groupId>
-			<artifactId>org.apache.sling.commons.json</artifactId>
-			<version>2.0.6</version>
-		</dependency>
-
-		<dependency>
-			<groupId>com.google.guava</groupId>
-			<artifactId>guava</artifactId>
-			<version>${guava.version}</version>
-		</dependency>
-
-        <dependency>
-            <groupId>org.apache.commons</groupId>
-            <artifactId>commons-math3</artifactId>
-            <version>3.5</version>
-        </dependency>
-
-    </dependencies>
-
-	<build>
-		<plugins>
-			<plugin>
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-jar-plugin</artifactId>
-				<executions>
-					<execution>
-						<goals>
-							<goal>test-jar</goal>
-						</goals>
-					</execution>
-				</executions>
-			</plugin>
-
-			<!-- disable fork reuse for the streaming project, because of
-			incorrect declaration of tests -->
-			<plugin>
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-surefire-plugin</artifactId>
-				<configuration>
-					<reuseForks>false</reuseForks>
-				</configuration>
-			</plugin>
-		</plugins>
-	</build>
-
-</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/CheckpointingMode.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/CheckpointingMode.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/CheckpointingMode.java
deleted file mode 100644
index db46d00..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/CheckpointingMode.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api;
-
-/**
- * The checkpointing mode defines what consistency guarantees the system gives in the presence of
- * failures.
- * 
- * <p>When checkpointing is activated, the data streams are replayed such that lost parts of the
- * processing are repeated. For stateful operations and functions, the checkpointing mode defines
- * whether the system draws checkpoints such that a recovery behaves as if the operators/functions
- * see each record "exactly once" ({@link #EXACTLY_ONCE}), or whether the checkpoints are drawn
- * in a simpler fashion that typically encounteres some duplicates upon recovery
- * ({@link #AT_LEAST_ONCE})</p> 
- */
-public enum CheckpointingMode {
-
-	/**
-	 * Sets the checkpointing mode to "exactly once". This mode means that the system will
-	 * checkpoint the operator and user function state in such a way that, upon recovery,
-	 * every record will be reflected exactly once in the operator state.
-	 * 
-	 * <p>For example, if a user function counts the number of elements in a stream, 
-	 * this number will consistently be equal to the number of actual elements in the stream,
-	 * regardless of failures and recovery.</p>
-	 * 
-	 * <p>Note that this does not mean that each record flows through the streaming data flow
-	 * only once. It means that upon recovery, the state of operators/functions is restored such
-	 * that the resumed data streams pick up exactly at after the last modification to the state.</p> 
-	 *  
-	 * <p>Note that this mode does not guarantee exactly-once behavior in the interaction with
-	 * external systems (only state in Flink's operators and user functions). The reason for that
-	 * is that a certain level of "collaboration" is required between two systems to achieve
-	 * exactly-once guarantees. However, for certain systems, connectors can be written that facilitate
-	 * this collaboration.</p>
-	 * 
-	 * <p>This mode sustains high throughput. Depending on the data flow graph and operations,
-	 * this mode may increase the record latency, because operators need to align their input
-	 * streams, in order to create a consistent snapshot point. The latency increase for simple
-	 * dataflows (no repartitioning) is negligible. For simple dataflows with repartitioning, the average
-	 * latency remains small, but the slowest records typically have an increased latency.</p>
-	 */
-	EXACTLY_ONCE,
-
-	/**
-	 * Sets the checkpointing mode to "at least once". This mode means that the system will
-	 * checkpoint the operator and user function state in a simpler way. Upon failure and recovery,
-	 * some records may be reflected multiple times in the operator state.
-	 * 
-	 * <p>For example, if a user function counts the number of elements in a stream, 
-	 * this number will equal to, or larger, than the actual number of elements in the stream,
-	 * in the presence of failure and recovery.</p>
-	 * 
-	 * <p>This mode has minimal impact on latency and may be preferable in very-low latency
-	 * scenarios, where a sustained very-low latency (such as few milliseconds) is needed,
-	 * and where occasional duplicate messages (on recovery) do not matter.</p>
-	 */
-	AT_LEAST_ONCE
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/TimeCharacteristic.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/TimeCharacteristic.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/TimeCharacteristic.java
deleted file mode 100644
index 125ca65..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/TimeCharacteristic.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api;
-
-/**
- * The time characteristic defines how the system determines time for time-dependent
- * order and operations that depend on time (such as time windows).
- */
-public enum TimeCharacteristic {
-
-	/**
-	 * Processing time for operators means that the operator uses the system clock of the machine
-	 * to determine the current time of the data stream. Processing-time windows trigger based
-	 * on wall-clock time and include whatever elements happen to have arrived at the operator at
-	 * that point in time.
-	 * <p>
-	 * Using processing time for window operations results in general in quite non-deterministic results,
-	 * because the contents of the windows depends on the speed in which elements arrive. It is, however,
-	 * the cheapest method of forming windows and the method that introduces the least latency.
-	 */
-	ProcessingTime,
-
-	/**
-	 * Ingestion time means that the time of each individual element in the stream is determined
-	 * when the element enters the Flink streaming data flow. Operations like windows group the
-	 * elements based on that time, meaning that processing speed within the streaming dataflow
-	 * does not affect windowing, but only the speed at which sources receive elements.
-	 * <p>
-	 * Ingestion time is often a good compromise between processing time and event time.
-	 * It does not need and special manual form of watermark generation, and events are typically
-	 * not too much out-or-order when they arrive at operators; in fact, out-of-orderness can 
-	 * only be introduced by streaming shuffles or split/join/union operations. The fact that elements
-	 * are not very much out-of-order means that the latency increase is moderate, compared to event
-	 * time.
-	 */
-	IngestionTime,
-
-	/**
-	 * Event time means that the time of each individual element in the stream (also called event)
-	 * is determined by the event's individual custom timestamp. These timestamps either exist in the
-	 * elements from before they entered the Flink streaming dataflow, or are user-assigned at the sources.
-	 * The big implication of this is that elements arrive in the sources and in all operators generally
-	 * out of order, meaning that elements with earlier timestamps may arrive after elements with
-	 * later timestamps.
-	 * <p>
-	 * Operators that window or order data with respect to event time must buffer data until they can
-	 * be sure that all timestamps for a certain time interval have been received. This is handled by
-	 * the so called "time watermarks".
-	 * <p>
-	 * Operations based on event time are very predictable - the result of windowing operations
-	 * is typically identical no matter when the window is executed and how fast the streams operate.
-	 * At the same time, the buffering and tracking of event time is also costlier than operating
-	 * with processing time, and typically also introduces more latency. The amount of extra
-	 * cost depends mostly on how much out of order the elements arrive, i.e., how long the time span
-	 * between the arrival of early and late elements is. With respect to the "time watermarks", this
-	 * means that the cost typically depends on how early or late the watermarks can be generated
-	 * for their timestamp.
-	 * <p>
-	 * In relation to {@link #IngestionTime}, the event time is similar, but refers the the event's
-	 * original time, rather than the time assigned at the data source. Practically, that means that
-	 * event time has generally more meaning, but also that it takes longer to determine that all
-	 * elements for a certain time have arrived.
-	 */
-	EventTime
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointNotifier.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointNotifier.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointNotifier.java
deleted file mode 100644
index c2d2182..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointNotifier.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.api.checkpoint;
-
-/**
- * This interface must be implemented by functions/operations that want to receive
- * a commit notification once a checkpoint has been completely acknowledged by all
- * participants.
- */
-public interface CheckpointNotifier {
-
-	/**
-	 * This method is called as a notification once a distributed checkpoint has been completed.
-	 * 
-	 * Note that any exception during this method will not cause the checkpoint to
-	 * fail any more.
-	 * 
-	 * @param checkpointId The ID of the checkpoint that has been completed.
-	 * @throws Exception
-	 */
-	void notifyCheckpointComplete(long checkpointId) throws Exception;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/checkpoint/Checkpointed.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/checkpoint/Checkpointed.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/checkpoint/Checkpointed.java
deleted file mode 100644
index ac1cbfb..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/checkpoint/Checkpointed.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.checkpoint;
-
-import java.io.Serializable;
-
-/**
- * This method must be implemented by functions that have state that needs to be
- * checkpointed. The functions get a call whenever a checkpoint should take place
- * and return a snapshot of their state, which will be checkpointed.
- * 
- * <p>This interface marks a function as <i>synchronously</i> checkpointed. While the
- * state is written, the function is not called, so the function needs not return a
- * copy of its state, but may return a reference to its state. Functions that can
- * continue to work and mutate the state, even while the state snapshot is being accessed,
- * can implement the {@link org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously}
- * interface.</p>
- * 
- * @param <T> The type of the operator state.
- */
-public interface Checkpointed<T extends Serializable> {
-
-	/**
-	 * Gets the current state of the function of operator. The state must reflect the result of all
-	 * prior invocations to this function. 
-	 * 
-	 * @param checkpointId The ID of the checkpoint.
-	 * @param checkpointTimestamp The timestamp of the checkpoint, as derived by
-	 *                            System.currentTimeMillis() on the JobManager.
-	 *                            
-	 * @return A snapshot of the operator state.
-	 * 
-	 * @throws Exception Thrown if the creation of the state object failed. This causes the
-	 *                   checkpoint to fail. The system may decide to fail the operation (and trigger
-	 *                   recovery), or to discard this checkpoint attempt and to continue running
-	 *                   and to try again with the next checkpoint attempt.
-	 */
-	T snapshotState(long checkpointId, long checkpointTimestamp) throws Exception;
-
-	/**
-	 * Restores the state of the function or operator to that of a previous checkpoint.
-	 * This method is invoked when a function is executed as part of a recovery run.
-	 *
-	 * Note that restoreState() is called before open().
-	 *
-	 * @param state The state to be restored. 
-	 */
-	void restoreState(T state) throws Exception;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedAsynchronously.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedAsynchronously.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedAsynchronously.java
deleted file mode 100644
index 4bd89c4..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedAsynchronously.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.checkpoint;
-
-import java.io.Serializable;
-
-/**
- * This interface marks a function/operator as <i>asynchronously checkpointed</i>.
- * Similar to the {@link Checkpointed} interface, the function must produce a
- * snapshot of its state. However, the function must be able to continue working
- * and mutating its state without mutating the returned state snapshot.
- * 
- * <p>Asynchronous checkpoints are desirable, because they allow the data streams at the
- * point of the checkpointed function/operator to continue running while the checkpoint
- * is in progress.</p>
- * 
- * <p>To be able to support asynchronous snapshots, the state returned by the
- * {@link #snapshotState(long, long)} method is typically a copy or shadow copy
- * of the actual state.</p>
- */
-public interface CheckpointedAsynchronously<T extends Serializable> extends Checkpointed<T> {}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/BroadcastOutputSelectorWrapper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/BroadcastOutputSelectorWrapper.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/BroadcastOutputSelectorWrapper.java
deleted file mode 100644
index 7034b11..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/BroadcastOutputSelectorWrapper.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.collector.selector;
-
-import java.util.ArrayList;
-
-import org.apache.flink.streaming.api.graph.StreamEdge;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.util.Collector;
-
-public class BroadcastOutputSelectorWrapper<OUT> implements OutputSelectorWrapper<OUT> {
-
-	private static final long serialVersionUID = 1L;
-	
-	private final ArrayList<Collector<StreamRecord<OUT>>> outputs;
-
-	public BroadcastOutputSelectorWrapper() {
-		outputs = new ArrayList<Collector<StreamRecord<OUT>>>();
-	}
-	
-	@Override
-	public void addCollector(Collector<StreamRecord<OUT>> output, StreamEdge edge) {
-		outputs.add(output);
-	}
-
-	@Override
-	public Iterable<Collector<StreamRecord<OUT>>> getSelectedOutputs(OUT record) {
-		return outputs;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutputSelectorWrapper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutputSelectorWrapper.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutputSelectorWrapper.java
deleted file mode 100644
index 84558fc..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutputSelectorWrapper.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.collector.selector;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-import org.apache.flink.streaming.api.graph.StreamEdge;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.util.Collector;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class DirectedOutputSelectorWrapper<OUT> implements OutputSelectorWrapper<OUT> {
-
-	private static final long serialVersionUID = 1L;
-
-	private static final Logger LOG = LoggerFactory.getLogger(DirectedOutputSelectorWrapper.class);
-
-	private List<OutputSelector<OUT>> outputSelectors;
-
-	private HashMap<String, ArrayList<Collector<StreamRecord<OUT>>>> outputMap;
-	private HashSet<Collector<StreamRecord<OUT>>> selectAllOutputs;
-
-	public DirectedOutputSelectorWrapper(List<OutputSelector<OUT>> outputSelectors) {
-		this.outputSelectors = outputSelectors;
-		this.selectAllOutputs = new HashSet<Collector<StreamRecord<OUT>>>();
-		this.outputMap = new HashMap<String, ArrayList<Collector<StreamRecord<OUT>>>>();
-	}
-	
-	@Override
-	public void addCollector(Collector<StreamRecord<OUT>> output, StreamEdge edge) {
-		List<String> selectedNames = edge.getSelectedNames();
-
-		if (selectedNames.isEmpty()) {
-			selectAllOutputs.add(output);
-		}
-		else {
-			for (String selectedName : selectedNames) {
-				if (!outputMap.containsKey(selectedName)) {
-					outputMap.put(selectedName, new ArrayList<Collector<StreamRecord<OUT>>>());
-					outputMap.get(selectedName).add(output);
-				}
-				else {
-					if (!outputMap.get(selectedName).contains(output)) {
-						outputMap.get(selectedName).add(output);
-					}
-				}
-			}
-		}
-	}
-
-	@Override
-	public Iterable<Collector<StreamRecord<OUT>>> getSelectedOutputs(OUT record) {
-		Set<Collector<StreamRecord<OUT>>> selectedOutputs = new HashSet<Collector<StreamRecord<OUT>>>(selectAllOutputs);
-
-		for (OutputSelector<OUT> outputSelector : outputSelectors) {
-			Iterable<String> outputNames = outputSelector.select(record);
-
-			for (String outputName : outputNames) {
-				List<Collector<StreamRecord<OUT>>> outputList = outputMap.get(outputName);
-
-				try {
-					selectedOutputs.addAll(outputList);
-				} catch (NullPointerException e) {
-					if (LOG.isErrorEnabled()) {
-						String format = String.format(
-								"Cannot emit because no output is selected with the name: %s",
-								outputName);
-						LOG.error(format);
-					}
-				}
-			}
-		}
-
-		return selectedOutputs;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelector.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelector.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelector.java
deleted file mode 100644
index 9c6eede..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelector.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.collector.selector;
-
-import java.io.Serializable;
-
-import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
-import org.apache.flink.streaming.api.datastream.SplitStream;
-
-/**
- * Interface for defining an OutputSelector for a {@link SplitStream} using
- * the {@link SingleOutputStreamOperator#split} call. Every output object of a
- * {@link SplitStream} will run through this operator to select outputs.
- * 
- * @param <OUT>
- *            Type parameter of the split values.
- */
-public interface OutputSelector<OUT> extends Serializable {
-	/**
-	 * Method for selecting output names for the emitted objects when using the
-	 * {@link SingleOutputStreamOperator#split} method. The values will be
-	 * emitted only to output names which are contained in the returned
-	 * iterable.
-	 * 
-	 * @param value
-	 *            Output object for which the output selection should be made.
-	 */
-	public Iterable<String> select(OUT value);
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelectorWrapper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelectorWrapper.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelectorWrapper.java
deleted file mode 100644
index f25c995..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelectorWrapper.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.collector.selector;
-
-import java.io.Serializable;
-
-import org.apache.flink.streaming.api.graph.StreamEdge;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.util.Collector;
-
-public interface OutputSelectorWrapper<OUT> extends Serializable {
-
-	public void addCollector(Collector<StreamRecord<OUT>> output, StreamEdge edge);
-
-	public Iterable<Collector<StreamRecord<OUT>>> getSelectedOutputs(OUT record);
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelectorWrapperFactory.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelectorWrapperFactory.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelectorWrapperFactory.java
deleted file mode 100644
index dca2ede..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelectorWrapperFactory.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.collector.selector;
-
-import java.util.List;
-
-public class OutputSelectorWrapperFactory {
-
-	@SuppressWarnings({ "rawtypes", "unchecked" })
-	public static OutputSelectorWrapper<?> create(List<OutputSelector<?>> outputSelectors) {
-		if (outputSelectors.size() == 0) {
-			return new BroadcastOutputSelectorWrapper();
-		} else {
-			return new DirectedOutputSelectorWrapper(outputSelectors);
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
deleted file mode 100644
index 7191304..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
+++ /dev/null
@@ -1,556 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.datastream;
-
-import org.apache.commons.lang.SerializationUtils;
-import org.apache.flink.api.common.functions.FoldFunction;
-import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.Utils;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction;
-import org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator;
-import org.apache.flink.streaming.api.functions.aggregation.SumAggregator;
-import org.apache.flink.streaming.api.functions.windowing.FoldAllWindowFunction;
-import org.apache.flink.streaming.api.functions.windowing.ReduceAllWindowFunction;
-import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
-import org.apache.flink.streaming.api.windowing.evictors.Evictor;
-import org.apache.flink.streaming.api.windowing.triggers.Trigger;
-import org.apache.flink.streaming.api.windowing.windows.Window;
-import org.apache.flink.streaming.runtime.operators.windowing.EvictingNonKeyedWindowOperator;
-import org.apache.flink.streaming.runtime.operators.windowing.NonKeyedWindowOperator;
-import org.apache.flink.streaming.runtime.operators.windowing.buffers.HeapWindowBuffer;
-import org.apache.flink.streaming.runtime.operators.windowing.buffers.PreAggregatingHeapWindowBuffer;
-
-/**
- * A {@code AllWindowedStream} represents a data stream where the stream of
- * elements is split into windows based on a
- * {@link org.apache.flink.streaming.api.windowing.assigners.WindowAssigner}. Window emission
- * is triggered based on a {@link org.apache.flink.streaming.api.windowing.triggers.Trigger}.
- *
- * <p>
- * If an {@link org.apache.flink.streaming.api.windowing.evictors.Evictor} is specified it will be
- * used to evict elements from the window after
- * evaluation was triggered by the {@code Trigger} but before the actual evaluation of the window.
- * When using an evictor window performance will degrade significantly, since
- * pre-aggregation of window results cannot be used.
- *
- * <p>
- * Note that the {@code AllWindowedStream} is purely and API construct, during runtime
- * the {@code AllWindowedStream} will be collapsed together with the
- * operation over the window into one single operation.
- *
- * @param <T> The type of elements in the stream.
- * @param <W> The type of {@code Window} that the {@code WindowAssigner} assigns the elements to.
- */
-public class AllWindowedStream<T, W extends Window> {
-
-	/** The data stream that is windowed by this stream */
-	private final DataStream<T> input;
-
-	/** The window assigner */
-	private final WindowAssigner<? super T, W> windowAssigner;
-
-	/** The trigger that is used for window evaluation/emission. */
-	private Trigger<? super T, ? super W> trigger;
-
-	/** The evictor that is used for evicting elements before window evaluation. */
-	private Evictor<? super T, ? super W> evictor;
-
-
-	public AllWindowedStream(DataStream<T> input,
-			WindowAssigner<? super T, W> windowAssigner) {
-		this.input = input;
-		this.windowAssigner = windowAssigner;
-		this.trigger = windowAssigner.getDefaultTrigger(input.getExecutionEnvironment());
-	}
-
-	/**
-	 * Sets the {@code Trigger} that should be used to trigger window emission.
-	 */
-	public AllWindowedStream<T, W> trigger(Trigger<? super T, ? super W> trigger) {
-		this.trigger = trigger;
-		return this;
-	}
-
-	/**
-	 * Sets the {@code Evictor} that should be used to evict elements from a window before emission.
-	 *
-	 * <p>
-	 * Note: When using an evictor window performance will degrade significantly, since
-	 * pre-aggregation of window results cannot be used.
-	 */
-	public AllWindowedStream<T, W> evictor(Evictor<? super T, ? super W> evictor) {
-		this.evictor = evictor;
-		return this;
-	}
-
-
-	// ------------------------------------------------------------------------
-	//  Operations on the keyed windows
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Applies a reduce function to the window. The window function is called for each evaluation
-	 * of the window for each key individually. The output of the reduce function is interpreted
-	 * as a regular non-windowed stream.
-	 * <p>
-	 * This window will try and pre-aggregate data as much as the window policies permit. For example,
-	 * tumbling time windows can perfectly pre-aggregate the data, meaning that only one element per
-	 * key is stored. Sliding time windows will pre-aggregate on the granularity of the slide interval,
-	 * so a few elements are stored per key (one per slide interval).
-	 * Custom windows may not be able to pre-aggregate, or may need to store extra values in an
-	 * aggregation tree.
-	 * 
-	 * @param function The reduce function.
-	 * @return The data stream that is the result of applying the reduce function to the window. 
-	 */
-	public SingleOutputStreamOperator<T, ?> reduce(ReduceFunction<T> function) {
-		//clean the closure
-		function = input.getExecutionEnvironment().clean(function);
-
-		String callLocation = Utils.getCallLocationName();
-		String udfName = "Reduce at " + callLocation;
-
-		SingleOutputStreamOperator<T, ?> result = createFastTimeOperatorIfValid(function, input.getType(), udfName);
-		if (result != null) {
-			return result;
-		}
-
-		String opName = "NonParallelTriggerWindow(" + windowAssigner + ", " + trigger + ", " + udfName + ")";
-
-		OneInputStreamOperator<T, T> operator;
-
-		boolean setProcessingTime = input.getExecutionEnvironment().getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime;
-
-		if (evictor != null) {
-			operator = new EvictingNonKeyedWindowOperator<>(windowAssigner,
-					windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
-					new HeapWindowBuffer.Factory<T>(),
-					new ReduceAllWindowFunction<W, T>(function),
-					trigger,
-					evictor).enableSetProcessingTime(setProcessingTime);
-
-		} else {
-			// we need to copy because we need our own instance of the pre aggregator
-			@SuppressWarnings("unchecked")
-			ReduceFunction<T> functionCopy = (ReduceFunction<T>) SerializationUtils.clone(function);
-
-			operator = new NonKeyedWindowOperator<>(windowAssigner,
-					windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
-					new PreAggregatingHeapWindowBuffer.Factory<>(functionCopy),
-					new ReduceAllWindowFunction<W, T>(function),
-					trigger).enableSetProcessingTime(setProcessingTime);
-		}
-
-		return input.transform(opName, input.getType(), operator).setParallelism(1);
-	}
-
-	/**
-	 * Applies the given fold function to each window. The window function is called for each
-	 * evaluation of the window for each key individually. The output of the reduce function is
-	 * interpreted as a regular non-windowed stream.
-	 *
-	 * @param function The fold function.
-	 * @return The data stream that is the result of applying the fold function to the window.
-	 */
-	public <R> SingleOutputStreamOperator<R, ?> fold(R initialValue, FoldFunction<T, R> function) {
-		//clean the closure
-		function = input.getExecutionEnvironment().clean(function);
-
-		TypeInformation<R> resultType = TypeExtractor.getFoldReturnTypes(function, input.getType(),
-				Utils.getCallLocationName(), true);
-
-		return apply(new FoldAllWindowFunction<W, T, R>(initialValue, function), resultType);
-	}
-
-	/**
-	 * Applies the given fold function to each window. The window function is called for each
-	 * evaluation of the window for each key individually. The output of the reduce function is
-	 * interpreted as a regular non-windowed stream.
-	 *
-	 * @param function The fold function.
-	 * @return The data stream that is the result of applying the fold function to the window.
-	 */
-	public <R> SingleOutputStreamOperator<R, ?> fold(R initialValue, FoldFunction<T, R> function, TypeInformation<R> resultType) {
-		//clean the closure
-		function = input.getExecutionEnvironment().clean(function);
-		return apply(new FoldAllWindowFunction<W, T, R>(initialValue, function), resultType);
-	}
-
-	/**
-	 * Applies a window function to the window. The window function is called for each evaluation
-	 * of the window for each key individually. The output of the window function is interpreted
-	 * as a regular non-windowed stream.
-	 * <p>
-	 * Not that this function requires that all data in the windows is buffered until the window
-	 * is evaluated, as the function provides no means of pre-aggregation.
-	 * 
-	 * @param function The window function.
-	 * @return The data stream that is the result of applying the window function to the window.
-	 */
-	public <R> SingleOutputStreamOperator<R, ?> apply(AllWindowFunction<T, R, W> function) {
-		TypeInformation<T> inType = input.getType();
-		TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType(
-				function, AllWindowFunction.class, true, true, inType, null, false);
-
-		return apply(function, resultType);
-	}
-
-	/**
-	 * Applies the given window function to each window. The window function is called for each evaluation
-	 * of the window for each key individually. The output of the window function is interpreted
-	 * as a regular non-windowed stream.
-	 * <p>
-	 * Not that this function requires that all data in the windows is buffered until the window
-	 * is evaluated, as the function provides no means of pre-aggregation.
-	 *
-	 * @param function The window function.
-	 * @return The data stream that is the result of applying the window function to the window.
-	 */
-	public <R> SingleOutputStreamOperator<R, ?> apply(AllWindowFunction<T, R, W> function, TypeInformation<R> resultType) {
-		//clean the closure
-		function = input.getExecutionEnvironment().clean(function);
-
-		String callLocation = Utils.getCallLocationName();
-		String udfName = "WindowApply at " + callLocation;
-
-		SingleOutputStreamOperator<R, ?> result = createFastTimeOperatorIfValid(function, resultType, udfName);
-		if (result != null) {
-			return result;
-		}
-
-
-		String opName = "TriggerWindow(" + windowAssigner + ", " + trigger + ", " + udfName + ")";
-
-		NonKeyedWindowOperator<T, R, W> operator;
-
-		boolean setProcessingTime = input.getExecutionEnvironment().getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime;
-
-		if (evictor != null) {
-			operator = new EvictingNonKeyedWindowOperator<>(windowAssigner,
-					windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
-					new HeapWindowBuffer.Factory<T>(),
-					function,
-					trigger,
-					evictor).enableSetProcessingTime(setProcessingTime);
-
-		} else {
-			operator = new NonKeyedWindowOperator<>(windowAssigner,
-					windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
-					new HeapWindowBuffer.Factory<T>(),
-					function,
-					trigger).enableSetProcessingTime(setProcessingTime);
-		}
-
-		return input.transform(opName, resultType, operator).setParallelism(1);
-	}
-
-	/**
-	 * Applies the given window function to each window. The window function is called for each
-	 * evaluation of the window for each key individually. The output of the window function is
-	 * interpreted as a regular non-windowed stream.
-	 *
-	 * <p>
-	 * Arriving data is pre-aggregated using the given pre-aggregation reducer.
-	 *
-	 * @param preAggregator The reduce function that is used for pre-aggregation
-	 * @param function The window function.
-	 * @return The data stream that is the result of applying the window function to the window.
-	 */
-
-	public <R> SingleOutputStreamOperator<R, ?> apply(ReduceFunction<T> preAggregator, AllWindowFunction<T, R, W> function) {
-		TypeInformation<T> inType = input.getType();
-		TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType(
-				function, AllWindowFunction.class, true, true, inType, null, false);
-
-		return apply(preAggregator, function, resultType);
-	}
-
-	/**
-	 * Applies the given window function to each window. The window function is called for each
-	 * evaluation of the window for each key individually. The output of the window function is
-	 * interpreted as a regular non-windowed stream.
-	 *
-	 * <p>
-	 * Arriving data is pre-aggregated using the given pre-aggregation reducer.
-	 *
-	 * @param preAggregator The reduce function that is used for pre-aggregation
-	 * @param function The window function.
-	 * @param resultType Type information for the result type of the window function
-	 * @return The data stream that is the result of applying the window function to the window.
-	 */
-	public <R> SingleOutputStreamOperator<R, ?> apply(ReduceFunction<T> preAggregator, AllWindowFunction<T, R, W> function, TypeInformation<R> resultType) {
-		//clean the closures
-		function = input.getExecutionEnvironment().clean(function);
-		preAggregator = input.getExecutionEnvironment().clean(preAggregator);
-
-		String callLocation = Utils.getCallLocationName();
-		String udfName = "WindowApply at " + callLocation;
-
-		String opName = "TriggerWindow(" + windowAssigner + ", " + trigger + ", " + udfName + ")";
-
-		OneInputStreamOperator<T, R> operator;
-
-		boolean setProcessingTime = input.getExecutionEnvironment().getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime;
-
-		if (evictor != null) {
-			operator = new EvictingNonKeyedWindowOperator<>(windowAssigner,
-					windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
-					new HeapWindowBuffer.Factory<T>(),
-					function,
-					trigger,
-					evictor).enableSetProcessingTime(setProcessingTime);
-
-		} else {
-			operator = new NonKeyedWindowOperator<>(windowAssigner,
-					windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
-					new PreAggregatingHeapWindowBuffer.Factory<>(preAggregator),
-					function,
-					trigger).enableSetProcessingTime(setProcessingTime);
-		}
-
-		return input.transform(opName, resultType, operator).setParallelism(1);
-	}
-
-	// ------------------------------------------------------------------------
-	//  Aggregations on the  windows
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Applies an aggregation that sums every window of the data stream at the
-	 * given position.
-	 *
-	 * @param positionToSum The position in the tuple/array to sum
-	 * @return The transformed DataStream.
-	 */
-	public SingleOutputStreamOperator<T, ?> sum(int positionToSum) {
-		return aggregate(new SumAggregator<>(positionToSum, input.getType(), input.getExecutionConfig()));
-	}
-
-	/**
-	 * Applies an aggregation that sums every window of the pojo data stream at
-	 * the given field for every window.
-	 *
-	 * <p>
-	 * A field expression is either
-	 * the name of a public field or a getter method with parentheses of the
-	 * stream's underlying type. A dot can be used to drill down into objects,
-	 * as in {@code "field1.getInnerField2()" }.
-	 *
-	 * @param field The field to sum
-	 * @return The transformed DataStream.
-	 */
-	public SingleOutputStreamOperator<T, ?> sum(String field) {
-		return aggregate(new SumAggregator<>(field, input.getType(), input.getExecutionConfig()));
-	}
-
-	/**
-	 * Applies an aggregation that that gives the minimum value of every window
-	 * of the data stream at the given position.
-	 *
-	 * @param positionToMin The position to minimize
-	 * @return The transformed DataStream.
-	 */
-	public SingleOutputStreamOperator<T, ?> min(int positionToMin) {
-		return aggregate(new ComparableAggregator<>(positionToMin, input.getType(), AggregationFunction.AggregationType.MIN, input.getExecutionConfig()));
-	}
-
-	/**
-	 * Applies an aggregation that that gives the minimum value of the pojo data
-	 * stream at the given field expression for every window.
-	 *
-	 * <p>
-	 * A field
-	 * expression is either the name of a public field or a getter method with
-	 * parentheses of the {@link DataStream}S underlying type. A dot can be used
-	 * to drill down into objects, as in {@code "field1.getInnerField2()" }.
-	 *
-	 * @param field The field expression based on which the aggregation will be applied.
-	 * @return The transformed DataStream.
-	 */
-	public SingleOutputStreamOperator<T, ?> min(String field) {
-		return aggregate(new ComparableAggregator<>(field, input.getType(), AggregationFunction.AggregationType.MIN, false, input.getExecutionConfig()));
-	}
-
-	/**
-	 * Applies an aggregation that gives the minimum element of every window of
-	 * the data stream by the given position. If more elements have the same
-	 * minimum value the operator returns the first element by default.
-	 *
-	 * @param positionToMinBy
-	 *            The position to minimize by
-	 * @return The transformed DataStream.
-	 */
-	public SingleOutputStreamOperator<T, ?> minBy(int positionToMinBy) {
-		return this.minBy(positionToMinBy, true);
-	}
-
-	/**
-	 * Applies an aggregation that gives the minimum element of every window of
-	 * the data stream by the given position. If more elements have the same
-	 * minimum value the operator returns the first element by default.
-	 *
-	 * @param positionToMinBy The position to minimize by
-	 * @return The transformed DataStream.
-	 */
-	public SingleOutputStreamOperator<T, ?> minBy(String positionToMinBy) {
-		return this.minBy(positionToMinBy, true);
-	}
-
-	/**
-	 * Applies an aggregation that gives the minimum element of every window of
-	 * the data stream by the given position. If more elements have the same
-	 * minimum value the operator returns either the first or last one depending
-	 * on the parameter setting.
-	 *
-	 * @param positionToMinBy The position to minimize
-	 * @param first If true, then the operator return the first element with the minimum value, otherwise returns the last
-	 * @return The transformed DataStream.
-	 */
-	public SingleOutputStreamOperator<T, ?> minBy(int positionToMinBy, boolean first) {
-		return aggregate(new ComparableAggregator<>(positionToMinBy, input.getType(), AggregationFunction.AggregationType.MINBY, first, input.getExecutionConfig()));
-	}
-
-	/**
-	 * Applies an aggregation that that gives the minimum element of the pojo
-	 * data stream by the given field expression for every window. A field
-	 * expression is either the name of a public field or a getter method with
-	 * parentheses of the {@link DataStream DataStreams} underlying type. A dot can be used
-	 * to drill down into objects, as in {@code "field1.getInnerField2()" }.
-	 *
-	 * @param field The field expression based on which the aggregation will be applied.
-	 * @param first If True then in case of field equality the first object will be returned
-	 * @return The transformed DataStream.
-	 */
-	public SingleOutputStreamOperator<T, ?> minBy(String field, boolean first) {
-		return aggregate(new ComparableAggregator<>(field, input.getType(), AggregationFunction.AggregationType.MINBY, first, input.getExecutionConfig()));
-	}
-
-	/**
-	 * Applies an aggregation that gives the maximum value of every window of
-	 * the data stream at the given position.
-	 *
-	 * @param positionToMax The position to maximize
-	 * @return The transformed DataStream.
-	 */
-	public SingleOutputStreamOperator<T, ?> max(int positionToMax) {
-		return aggregate(new ComparableAggregator<>(positionToMax, input.getType(), AggregationFunction.AggregationType.MAX, input.getExecutionConfig()));
-	}
-
-	/**
-	 * Applies an aggregation that that gives the maximum value of the pojo data
-	 * stream at the given field expression for every window. A field expression
-	 * is either the name of a public field or a getter method with parentheses
-	 * of the {@link DataStream DataStreams} underlying type. A dot can be used to drill
-	 * down into objects, as in {@code "field1.getInnerField2()" }.
-	 *
-	 * @param field The field expression based on which the aggregation will be applied.
-	 * @return The transformed DataStream.
-	 */
-	public SingleOutputStreamOperator<T, ?> max(String field) {
-		return aggregate(new ComparableAggregator<>(field, input.getType(), AggregationFunction.AggregationType.MAX, false, input.getExecutionConfig()));
-	}
-
-	/**
-	 * Applies an aggregation that gives the maximum element of every window of
-	 * the data stream by the given position. If more elements have the same
-	 * maximum value the operator returns the first by default.
-	 *
-	 * @param positionToMaxBy
-	 *            The position to maximize by
-	 * @return The transformed DataStream.
-	 */
-	public SingleOutputStreamOperator<T, ?> maxBy(int positionToMaxBy) {
-		return this.maxBy(positionToMaxBy, true);
-	}
-
-	/**
-	 * Applies an aggregation that gives the maximum element of every window of
-	 * the data stream by the given position. If more elements have the same
-	 * maximum value the operator returns the first by default.
-	 *
-	 * @param positionToMaxBy
-	 *            The position to maximize by
-	 * @return The transformed DataStream.
-	 */
-	public SingleOutputStreamOperator<T, ?> maxBy(String positionToMaxBy) {
-		return this.maxBy(positionToMaxBy, true);
-	}
-
-	/**
-	 * Applies an aggregation that gives the maximum element of every window of
-	 * the data stream by the given position. If more elements have the same
-	 * maximum value the operator returns either the first or last one depending
-	 * on the parameter setting.
-	 *
-	 * @param positionToMaxBy The position to maximize by
-	 * @param first If true, then the operator return the first element with the maximum value, otherwise returns the last
-	 * @return The transformed DataStream.
-	 */
-	public SingleOutputStreamOperator<T, ?> maxBy(int positionToMaxBy, boolean first) {
-		return aggregate(new ComparableAggregator<>(positionToMaxBy, input.getType(), AggregationFunction.AggregationType.MAXBY, first, input.getExecutionConfig()));
-	}
-
-	/**
-	 * Applies an aggregation that that gives the maximum element of the pojo
-	 * data stream by the given field expression for every window. A field
-	 * expression is either the name of a public field or a getter method with
-	 * parentheses of the {@link DataStream}S underlying type. A dot can be used
-	 * to drill down into objects, as in {@code "field1.getInnerField2()" }.
-	 *
-	 * @param field The field expression based on which the aggregation will be applied.
-	 * @param first If True then in case of field equality the first object will be returned
-	 * @return The transformed DataStream.
-	 */
-	public SingleOutputStreamOperator<T, ?> maxBy(String field, boolean first) {
-		return aggregate(new ComparableAggregator<>(field, input.getType(), AggregationFunction.AggregationType.MAXBY, first, input.getExecutionConfig()));
-	}
-
-	private SingleOutputStreamOperator<T, ?> aggregate(AggregationFunction<T> aggregator) {
-		return reduce(aggregator);
-	}
-
-	// ------------------------------------------------------------------------
-	//  Utilities
-	// ------------------------------------------------------------------------
-
-
-	private <R> SingleOutputStreamOperator<R, ?> createFastTimeOperatorIfValid(
-			Function function,
-			TypeInformation<R> resultType,
-			String functionName) {
-
-		// TODO: add once non-parallel fast aligned time windows operator is ready
-		return null;
-	}
-
-	public StreamExecutionEnvironment getExecutionEnvironment() {
-		return input.getExecutionEnvironment();
-	}
-
-	public TypeInformation<T> getInputType() {
-		return input.getType();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
deleted file mode 100644
index d1da783..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
+++ /dev/null
@@ -1,575 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.datastream;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.CoGroupFunction;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.operators.translation.WrappingFunction;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
-import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
-import org.apache.flink.streaming.api.windowing.evictors.Evictor;
-import org.apache.flink.streaming.api.windowing.triggers.Trigger;
-import org.apache.flink.streaming.api.windowing.windows.Window;
-import org.apache.flink.util.Collector;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import static java.util.Objects.requireNonNull;
-
-/**
- *{@code CoGroupedStreams} represents two {@link DataStream DataStreams} that have been co-grouped.
- * A streaming co-group operation is evaluated over elements in a window.
- *
- * <p>
- * To finalize co-group operation you also need to specify a {@link KeySelector} for
- * both the first and second input and a {@link WindowAssigner}.
- *
- * <p>
- * Note: Right now, the groups are being built in memory so you need to ensure that they don't
- * get too big. Otherwise the JVM might crash.
- *
- * <p>
- * Example:
- *
- * <pre> {@code
- * DataStream<Tuple2<String, Integer>> one = ...;
- * DataStream<Tuple2<String, Integer>> two = ...;
- *
- * DataStream<T> result = one.coGroup(two)
- *     .where(new MyFirstKeySelector())
- *     .equalTo(new MyFirstKeySelector())
- *     .window(TumblingTimeWindows.of(Time.of(5, TimeUnit.SECONDS)))
- *     .apply(new MyCoGroupFunction());
- * } </pre>
- */
-public class CoGroupedStreams<T1, T2> {
-
-	/** The first input stream */
-	private final DataStream<T1> input1;
-
-	/** The second input stream */
-	private final DataStream<T2> input2;
-
-	/**
-	 * Creates new CoGroped data streams, which are the first step towards building a streaming co-group.
-	 * 
-	 * @param input1 The first data stream.
-	 * @param input2 The second data stream.
-	 */
-	public CoGroupedStreams(DataStream<T1> input1, DataStream<T2> input2) {
-		this.input1 = requireNonNull(input1);
-		this.input2 = requireNonNull(input2);
-	}
-
-	/**
-	 * Specifies a {@link KeySelector} for elements from the first input.
-	 */
-	public <KEY> Where<KEY> where(KeySelector<T1, KEY> keySelector)  {
-		TypeInformation<KEY> keyType = TypeExtractor.getKeySelectorTypes(keySelector, input1.getType());
-		return new Where<>(input1.clean(keySelector), keyType);
-	}
-
-	// ------------------------------------------------------------------------
-	
-	/**
-	 * CoGrouped streams that have the key for one side defined.
-	 * 
-	 * @param <KEY> The type of the key.
-	 */
-	public class Where<KEY> {
-
-		private final KeySelector<T1, KEY> keySelector1;
-		private final TypeInformation<KEY> keyType;
-
-		Where(KeySelector<T1, KEY> keySelector1, TypeInformation<KEY> keyType) {
-			this.keySelector1 = keySelector1;
-			this.keyType = keyType;
-		}
-	
-		/**
-		 * Specifies a {@link KeySelector} for elements from the second input.
-		 */
-		public EqualTo equalTo(KeySelector<T2, KEY> keySelector)  {
-			TypeInformation<KEY> otherKey = TypeExtractor.getKeySelectorTypes(keySelector, input2.getType());
-			if (!otherKey.equals(this.keyType)) {
-				throw new IllegalArgumentException("The keys for the two inputs are not equal: " + 
-						"first key = " + this.keyType + " , second key = " + otherKey);
-			}
-			
-			return new EqualTo(input2.clean(keySelector));
-		}
-
-		// --------------------------------------------------------------------
-		
-		/**
-		 * A co-group operation that has {@link KeySelector KeySelectors} defined for both inputs.
-		 */
-		public class EqualTo {
-
-			private final KeySelector<T2, KEY> keySelector2;
-
-			EqualTo(KeySelector<T2, KEY> keySelector2) {
-				this.keySelector2 = requireNonNull(keySelector2);
-			}
-
-			/**
-			 * Specifies the window on which the co-group operation works.
-			 */
-			public <W extends Window> WithWindow<T1, T2, KEY, W> window(WindowAssigner<? super TaggedUnion<T1, T2>, W> assigner) {
-				return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType, assigner, null, null);
-			}
-		}
-	}
-
-	// ------------------------------------------------------------------------
-	
-	/**
-	 * A co-group operation that has {@link KeySelector KeySelectors} defined for both inputs as
-	 * well as a {@link WindowAssigner}.
-	 *
-	 * @param <T1> Type of the elements from the first input
-	 * @param <T2> Type of the elements from the second input
-	 * @param <KEY> Type of the key. This must be the same for both inputs
-	 * @param <W> Type of {@link Window} on which the co-group operation works.
-	 */
-	public static class WithWindow<T1, T2, KEY, W extends Window> {
-		private final DataStream<T1> input1;
-		private final DataStream<T2> input2;
-
-		private final KeySelector<T1, KEY> keySelector1;
-		private final KeySelector<T2, KEY> keySelector2;
-		
-		private final TypeInformation<KEY> keyType;
-
-		private final WindowAssigner<? super TaggedUnion<T1, T2>, W> windowAssigner;
-
-		private final Trigger<? super TaggedUnion<T1, T2>, ? super W> trigger;
-
-		private final Evictor<? super TaggedUnion<T1, T2>, ? super W> evictor;
-
-		protected WithWindow(DataStream<T1> input1,
-				DataStream<T2> input2,
-				KeySelector<T1, KEY> keySelector1,
-				KeySelector<T2, KEY> keySelector2,
-				TypeInformation<KEY> keyType,
-				WindowAssigner<? super TaggedUnion<T1, T2>, W> windowAssigner,
-				Trigger<? super TaggedUnion<T1, T2>, ? super W> trigger,
-				Evictor<? super TaggedUnion<T1, T2>, ? super W> evictor) {
-			this.input1 = input1;
-			this.input2 = input2;
-
-			this.keySelector1 = keySelector1;
-			this.keySelector2 = keySelector2;
-			this.keyType = keyType;
-			
-			this.windowAssigner = windowAssigner;
-			this.trigger = trigger;
-			this.evictor = evictor;
-		}
-
-		/**
-		 * Sets the {@code Trigger} that should be used to trigger window emission.
-		 */
-		public WithWindow<T1, T2, KEY, W> trigger(Trigger<? super TaggedUnion<T1, T2>, ? super W> newTrigger) {
-			return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType,
-					windowAssigner, newTrigger, evictor);
-		}
-
-		/**
-		 * Sets the {@code Evictor} that should be used to evict elements from a window before emission.
-		 *
-		 * <p>
-		 * Note: When using an evictor window performance will degrade significantly, since
-		 * pre-aggregation of window results cannot be used.
-		 */
-		public WithWindow<T1, T2, KEY, W> evictor(Evictor<? super TaggedUnion<T1, T2>, ? super W> newEvictor) {
-			return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType,
-					windowAssigner, trigger, newEvictor);
-		}
-
-		/**
-		 * Completes the co-group operation with the user function that is executed
-		 * for windowed groups.
-		 */
-		public <T> DataStream<T> apply(CoGroupFunction<T1, T2, T> function) {
-
-			TypeInformation<T> resultType = TypeExtractor.getBinaryOperatorReturnType(
-					function,
-					CoGroupFunction.class,
-					true,
-					true,
-					input1.getType(),
-					input2.getType(),
-					"CoGroup",
-					false);
-
-			return apply(function, resultType);
-		}
-
-		/**
-		 * Completes the co-group operation with the user function that is executed
-		 * for windowed groups.
-		 */
-		public <T> DataStream<T> apply(CoGroupFunction<T1, T2, T> function, TypeInformation<T> resultType) {
-			//clean the closure
-			function = input1.getExecutionEnvironment().clean(function);
-
-			UnionTypeInfo<T1, T2> unionType = new UnionTypeInfo<>(input1.getType(), input2.getType());
-			UnionKeySelector<T1, T2, KEY> unionKeySelector = new UnionKeySelector<>(keySelector1, keySelector2);
-			
-			DataStream<TaggedUnion<T1, T2>> taggedInput1 = input1
-					.map(new Input1Tagger<T1, T2>())
-					.returns(unionType);
-			DataStream<TaggedUnion<T1, T2>> taggedInput2 = input2
-					.map(new Input2Tagger<T1, T2>())
-					.returns(unionType);
-
-			DataStream<TaggedUnion<T1, T2>> unionStream = taggedInput1.union(taggedInput2);
-			
-			// we explicitly create the keyed stream to manually pass the key type information in
-			WindowedStream<TaggedUnion<T1, T2>, KEY, W> windowOp = 
-					new KeyedStream<TaggedUnion<T1, T2>, KEY>(unionStream, unionKeySelector, keyType)
-					.window(windowAssigner);
-
-			if (trigger != null) {
-				windowOp.trigger(trigger);
-			}
-			if (evictor != null) {
-				windowOp.evictor(evictor);
-			}
-
-			return windowOp.apply(new CoGroupWindowFunction<T1, T2, T, KEY, W>(function), resultType);
-		}
-	}
-
-	// ------------------------------------------------------------------------
-	//  Data type and type information for Tagged Union
-	// ------------------------------------------------------------------------
-	
-	/**
-	 * Internal class for implementing tagged union co-group.
-	 */
-	public static class TaggedUnion<T1, T2> {
-		private final T1 one;
-		private final T2 two;
-
-		private TaggedUnion(T1 one, T2 two) {
-			this.one = one;
-			this.two = two;
-		}
-
-		public boolean isOne() {
-			return one != null;
-		}
-
-		public boolean isTwo() {
-			return two != null;
-		}
-
-		public T1 getOne() {
-			return one;
-		}
-
-		public T2 getTwo() {
-			return two;
-		}
-
-		public static <T1, T2> TaggedUnion<T1, T2> one(T1 one) {
-			return new TaggedUnion<>(one, null);
-		}
-
-		public static <T1, T2> TaggedUnion<T1, T2> two(T2 two) {
-			return new TaggedUnion<>(null, two);
-		}
-	}
-
-	private static class UnionTypeInfo<T1, T2> extends TypeInformation<TaggedUnion<T1, T2>> {
-		private static final long serialVersionUID = 1L;
-
-		TypeInformation<T1> oneType;
-		TypeInformation<T2> twoType;
-
-		public UnionTypeInfo(TypeInformation<T1> oneType,
-				TypeInformation<T2> twoType) {
-			this.oneType = oneType;
-			this.twoType = twoType;
-		}
-
-		@Override
-		public boolean isBasicType() {
-			return false;
-		}
-
-		@Override
-		public boolean isTupleType() {
-			return false;
-		}
-
-		@Override
-		public int getArity() {
-			return 2;
-		}
-
-		@Override
-		public int getTotalFields() {
-			return 2;
-		}
-
-		@Override
-		@SuppressWarnings("unchecked, rawtypes")
-		public Class<TaggedUnion<T1, T2>> getTypeClass() {
-			return (Class) TaggedUnion.class;
-		}
-
-		@Override
-		public boolean isKeyType() {
-			return true;
-		}
-
-		@Override
-		public TypeSerializer<TaggedUnion<T1, T2>> createSerializer(ExecutionConfig config) {
-			return new UnionSerializer<>(oneType.createSerializer(config), twoType.createSerializer(config));
-		}
-
-		@Override
-		public String toString() {
-			return "TaggedUnion<" + oneType + ", " + twoType + ">";
-		}
-
-		@Override
-		public boolean equals(Object obj) {
-			if (obj instanceof UnionTypeInfo) {
-				@SuppressWarnings("unchecked")
-				UnionTypeInfo<T1, T2> unionTypeInfo = (UnionTypeInfo<T1, T2>) obj;
-
-				return unionTypeInfo.canEqual(this) && oneType.equals(unionTypeInfo.oneType) && twoType.equals(unionTypeInfo.twoType);
-			} else {
-				return false;
-			}
-		}
-
-		@Override
-		public int hashCode() {
-			return 31 *  oneType.hashCode() + twoType.hashCode();
-		}
-
-		@Override
-		public boolean canEqual(Object obj) {
-			return obj instanceof UnionTypeInfo;
-		}
-	}
-
-	private static class UnionSerializer<T1, T2> extends TypeSerializer<TaggedUnion<T1, T2>> {
-		private static final long serialVersionUID = 1L;
-
-		private final TypeSerializer<T1> oneSerializer;
-		private final TypeSerializer<T2> twoSerializer;
-
-		public UnionSerializer(TypeSerializer<T1> oneSerializer,
-				TypeSerializer<T2> twoSerializer) {
-			this.oneSerializer = oneSerializer;
-			this.twoSerializer = twoSerializer;
-		}
-
-		@Override
-		public boolean isImmutableType() {
-			return false;
-		}
-
-		@Override
-		public TypeSerializer<TaggedUnion<T1, T2>> duplicate() {
-			return this;
-		}
-
-		@Override
-		public TaggedUnion<T1, T2> createInstance() {
-			return null;
-		}
-
-		@Override
-		public TaggedUnion<T1, T2> copy(TaggedUnion<T1, T2> from) {
-			if (from.isOne()) {
-				return TaggedUnion.one(oneSerializer.copy(from.getOne()));
-			} else {
-				return TaggedUnion.two(twoSerializer.copy(from.getTwo()));
-			}
-		}
-
-		@Override
-		public TaggedUnion<T1, T2> copy(TaggedUnion<T1, T2> from, TaggedUnion<T1, T2> reuse) {
-			if (from.isOne()) {
-				return TaggedUnion.one(oneSerializer.copy(from.getOne()));
-			} else {
-				return TaggedUnion.two(twoSerializer.copy(from.getTwo()));
-			}		}
-
-		@Override
-		public int getLength() {
-			return -1;
-		}
-
-		@Override
-		public void serialize(TaggedUnion<T1, T2> record, DataOutputView target) throws IOException {
-			if (record.isOne()) {
-				target.writeByte(1);
-				oneSerializer.serialize(record.getOne(), target);
-			} else {
-				target.writeByte(2);
-				twoSerializer.serialize(record.getTwo(), target);
-			}
-		}
-
-		@Override
-		public TaggedUnion<T1, T2> deserialize(DataInputView source) throws IOException {
-			byte tag = source.readByte();
-			if (tag == 1) {
-				return TaggedUnion.one(oneSerializer.deserialize(source));
-			} else {
-				return TaggedUnion.two(twoSerializer.deserialize(source));
-			}
-		}
-
-		@Override
-		public TaggedUnion<T1, T2> deserialize(TaggedUnion<T1, T2> reuse,
-				DataInputView source) throws IOException {
-			byte tag = source.readByte();
-			if (tag == 1) {
-				return TaggedUnion.one(oneSerializer.deserialize(source));
-			} else {
-				return TaggedUnion.two(twoSerializer.deserialize(source));
-			}
-		}
-
-		@Override
-		public void copy(DataInputView source, DataOutputView target) throws IOException {
-			byte tag = source.readByte();
-			target.writeByte(tag);
-			if (tag == 1) {
-				oneSerializer.copy(source, target);
-			} else {
-				twoSerializer.copy(source, target);
-			}
-		}
-
-		@Override
-		public int hashCode() {
-			return 31 * oneSerializer.hashCode() + twoSerializer.hashCode();
-		}
-
-		@Override
-		@SuppressWarnings("unchecked")
-		public boolean equals(Object obj) {
-			if (obj instanceof UnionSerializer) {
-				UnionSerializer<T1, T2> other = (UnionSerializer<T1, T2>) obj;
-
-				return other.canEqual(this) && oneSerializer.equals(other.oneSerializer) && twoSerializer.equals(other.twoSerializer);
-			} else {
-				return false;
-			}
-		}
-
-		@Override
-		public boolean canEqual(Object obj) {
-			return obj instanceof UnionSerializer;
-		}
-	}
-
-	// ------------------------------------------------------------------------
-	//  Utility functions that implement the CoGroup logic based on the tagged
-	//  untion window reduce
-	// ------------------------------------------------------------------------
-	
-	private static class Input1Tagger<T1, T2> implements MapFunction<T1, TaggedUnion<T1, T2>> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public TaggedUnion<T1, T2> map(T1 value) throws Exception {
-			return TaggedUnion.one(value);
-		}
-	}
-
-	private static class Input2Tagger<T1, T2> implements MapFunction<T2, TaggedUnion<T1, T2>> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public TaggedUnion<T1, T2> map(T2 value) throws Exception {
-			return TaggedUnion.two(value);
-		}
-	}
-
-	private static class UnionKeySelector<T1, T2, KEY> implements KeySelector<TaggedUnion<T1, T2>, KEY> {
-		private static final long serialVersionUID = 1L;
-
-		private final KeySelector<T1, KEY> keySelector1;
-		private final KeySelector<T2, KEY> keySelector2;
-
-		public UnionKeySelector(KeySelector<T1, KEY> keySelector1,
-				KeySelector<T2, KEY> keySelector2) {
-			this.keySelector1 = keySelector1;
-			this.keySelector2 = keySelector2;
-		}
-
-		@Override
-		public KEY getKey(TaggedUnion<T1, T2> value) throws Exception{
-			if (value.isOne()) {
-				return keySelector1.getKey(value.getOne());
-			} else {
-				return keySelector2.getKey(value.getTwo());
-			}
-		}
-	}
-
-	private static class CoGroupWindowFunction<T1, T2, T, KEY, W extends Window>
-			extends WrappingFunction<CoGroupFunction<T1, T2, T>>
-			implements WindowFunction<TaggedUnion<T1, T2>, T, KEY, W> {
-		
-		private static final long serialVersionUID = 1L;
-
-		public CoGroupWindowFunction(CoGroupFunction<T1, T2, T> userFunction) {
-			super(userFunction);
-		}
-
-		@Override
-		public void apply(KEY key,
-				W window,
-				Iterable<TaggedUnion<T1, T2>> values,
-				Collector<T> out) throws Exception {
-			
-			List<T1> oneValues = new ArrayList<>();
-			List<T2> twoValues = new ArrayList<>();
-			
-			for (TaggedUnion<T1, T2> val: values) {
-				if (val.isOne()) {
-					oneValues.add(val.getOne());
-				} else {
-					twoValues.add(val.getTwo());
-				}
-			}
-			wrappedFunction.coGroup(oneValues, twoValues, out);
-		}
-	}
-}