You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/21 11:03:45 UTC

[29/51] [partial] flink git commit: [FLINK-2877] Move Streaming API out of Staging package

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/FieldAccessor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/FieldAccessor.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/FieldAccessor.java
deleted file mode 100644
index ba8a55b..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/FieldAccessor.java
+++ /dev/null
@@ -1,249 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.CompositeType;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.typeutils.PojoTypeInfo;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.api.java.typeutils.runtime.PojoComparator;
-import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
-import org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase;
-
-import java.io.Serializable;
-import java.lang.reflect.Array;
-import java.util.List;
-
-import scala.Product;
-
-
-/**
- * These classes encapsulate the logic of accessing a field specified by the user as either an index
- * or a field expression string. TypeInformation can also be requested for the field.
- * The position index might specify a field of a Tuple, an array, or a simple type (only "0th field").
- */
-public abstract class FieldAccessor<R, F> implements Serializable {
-
-	private static final long serialVersionUID = 1L;
-
-	TypeInformation fieldType;
-
-	// Note: Returns the corresponding basic type for array of a primitive type (Integer for int[]).
-	@SuppressWarnings("unchecked")
-	public TypeInformation<F> getFieldType() {
-		return fieldType;
-	}
-
-
-	public abstract F get(R record);
-
-	// Note: This has to return the result, because the SimpleFieldAccessor might not be able to modify the
-	// record in place. (for example, when R is simply Double) (Unfortunately there is no passing by reference in Java.)
-	public abstract R set(R record, F fieldValue);
-
-
-
-	@SuppressWarnings("unchecked")
-	public static <R, F> FieldAccessor<R, F> create(int pos, TypeInformation<R> typeInfo, ExecutionConfig config) {
-		if (typeInfo.isTupleType() && ((TupleTypeInfoBase)typeInfo).isCaseClass()) {
-			return new ProductFieldAccessor<R, F>(pos, typeInfo, config);
-		} else if (typeInfo.isTupleType()) {
-			return new TupleFieldAccessor<R, F>(pos, typeInfo);
-		} else if (typeInfo instanceof BasicArrayTypeInfo || typeInfo instanceof PrimitiveArrayTypeInfo) {
-			return new ArrayFieldAccessor<R, F>(pos, typeInfo);
-		} else {
-			if(pos != 0) {
-				throw new IndexOutOfBoundsException("Not 0th field selected for a simple type (non-tuple, non-array).");
-			}
-			return (FieldAccessor<R, F>) new SimpleFieldAccessor<R>(typeInfo);
-		}
-	}
-
-	public static <R, F> FieldAccessor<R, F> create(String field, TypeInformation<R> typeInfo, ExecutionConfig config) {
-		if (typeInfo.isTupleType() && ((TupleTypeInfoBase)typeInfo).isCaseClass()) {
-			int pos = ((TupleTypeInfoBase)typeInfo).getFieldIndex(field);
-			if(pos == -2) {
-				throw new RuntimeException("Invalid field selected: " + field);
-			}
-			return new ProductFieldAccessor<R, F>(pos, typeInfo, config);
-		} else if(typeInfo.isTupleType()) {
-			return new TupleFieldAccessor<R, F>(((TupleTypeInfo)typeInfo).getFieldIndex(field), typeInfo);
-		} else {
-			return new PojoFieldAccessor<R, F>(field, typeInfo, config);
-		}
-	}
-
-
-
-	public static class SimpleFieldAccessor<R> extends FieldAccessor<R, R> {
-
-		private static final long serialVersionUID = 1L;
-
-		SimpleFieldAccessor(TypeInformation<R> typeInfo) {
-			this.fieldType = typeInfo;
-		}
-
-		@Override
-		public R get(R record) {
-			return record;
-		}
-
-		@Override
-		public R set(R record, R fieldValue) {
-			return fieldValue;
-		}
-	}
-
-	public static class ArrayFieldAccessor<R, F> extends FieldAccessor<R, F> {
-
-		private static final long serialVersionUID = 1L;
-
-		int pos;
-
-		ArrayFieldAccessor(int pos, TypeInformation typeInfo) {
-			this.pos = pos;
-			this.fieldType = BasicTypeInfo.getInfoFor(typeInfo.getTypeClass().getComponentType());
-		}
-
-		@SuppressWarnings("unchecked")
-		@Override
-		public F get(R record) {
-			return (F) Array.get(record, pos);
-		}
-
-		@Override
-		public R set(R record, F fieldValue) {
-			Array.set(record, pos, fieldValue);
-			return record;
-		}
-	}
-
-	public static class TupleFieldAccessor<R, F> extends FieldAccessor<R, F> {
-
-		private static final long serialVersionUID = 1L;
-
-		int pos;
-
-		TupleFieldAccessor(int pos, TypeInformation<R> typeInfo) {
-			this.pos = pos;
-			this.fieldType = ((TupleTypeInfo)typeInfo).getTypeAt(pos);
-		}
-
-		@SuppressWarnings("unchecked")
-		@Override
-		public F get(R record) {
-			Tuple tuple = (Tuple) record;
-			return (F)tuple.getField(pos);
-		}
-
-		@Override
-		public R set(R record, F fieldValue) {
-			Tuple tuple = (Tuple) record;
-			tuple.setField(fieldValue, pos);
-			return record;
-		}
-	}
-
-	public static class PojoFieldAccessor<R, F> extends FieldAccessor<R, F> {
-
-		private static final long serialVersionUID = 1L;
-
-		PojoComparator comparator;
-
-		PojoFieldAccessor(String field, TypeInformation<R> type, ExecutionConfig config) {
-			if (!(type instanceof CompositeType<?>)) {
-				throw new IllegalArgumentException(
-						"Key expressions are only supported on POJO types and Tuples. "
-								+ "A type is considered a POJO if all its fields are public, or have both getters and setters defined");
-			}
-
-			@SuppressWarnings("unchecked")
-			CompositeType<R> cType = (CompositeType<R>) type;
-
-			List<CompositeType.FlatFieldDescriptor> fieldDescriptors = cType.getFlatFields(field);
-
-			int logicalKeyPosition = fieldDescriptors.get(0).getPosition();
-			this.fieldType = fieldDescriptors.get(0).getType();
-			Class<?> keyClass = fieldType.getTypeClass();
-
-			if (cType instanceof PojoTypeInfo) {
-				comparator = (PojoComparator<R>) cType.createComparator(
-						new int[] { logicalKeyPosition }, new boolean[] { false }, 0, config);
-			} else {
-				throw new IllegalArgumentException(
-						"Key expressions are only supported on POJO types. "
-								+ "A type is considered a POJO if all its fields are public, or have both getters and setters defined");
-			}
-		}
-
-		@SuppressWarnings("unchecked")
-		@Override
-		public F get(R record) {
-			return (F) comparator.accessField(comparator.getKeyFields()[0], record);
-		}
-
-		@Override
-		public R set(R record, F fieldValue) {
-			try {
-				comparator.getKeyFields()[0].set(record, fieldValue);
-			} catch (IllegalAccessException e) {
-				throw new RuntimeException("Could not modify the specified field.", e);
-			}
-			return record;
-		}
-	}
-
-	public static class ProductFieldAccessor<R, F> extends FieldAccessor<R, F> {
-
-		private static final long serialVersionUID = 1L;
-
-		int pos;
-		TupleSerializerBase<R> serializer;
-		Object[] fields;
-		int length;
-
-		ProductFieldAccessor(int pos, TypeInformation<R> typeInfo, ExecutionConfig config) {
-			this.pos = pos;
-			this.fieldType = ((TupleTypeInfoBase<R>)typeInfo).getTypeAt(pos);
-			this.serializer = (TupleSerializerBase<R>)typeInfo.createSerializer(config);
-			this.length = this.serializer.getArity();
-			this.fields = new Object[this.length];
-		}
-
-		@SuppressWarnings("unchecked")
-		@Override
-		public F get(R record) {
-			return (F)((Product)record).productElement(pos);
-		}
-
-		@Override
-		public R set(R record, F fieldValue) {
-			Product prod = (Product)record;
-			for (int i = 0; i < length; i++) {
-				fields[i] = prod.productElement(i);
-			}
-			fields[pos] = fieldValue;
-			return serializer.createInstance(fields);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java
deleted file mode 100644
index afbd8ab..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java
+++ /dev/null
@@ -1,240 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util.keys;
-
-import java.lang.reflect.Array;
-import java.util.Arrays;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.InvalidTypesException;
-import org.apache.flink.api.common.functions.Partitioner;
-import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
-import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.CompositeType;
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.operators.Keys;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-
-import static java.util.Objects.requireNonNull;
-
-/**
- * Utility class that contains helper methods to manipulating {@link KeySelector} for streaming.
- */
-public final class KeySelectorUtil {
-
-	public static <X> KeySelector<X, Tuple> getSelectorForKeys(Keys<X> keys, TypeInformation<X> typeInfo, ExecutionConfig executionConfig) {
-		if (!(typeInfo instanceof CompositeType)) {
-			throw new InvalidTypesException(
-					"This key operation requires a composite type such as Tuples, POJOs, or Case Classes.");
-		}
-
-		CompositeType<X> compositeType = (CompositeType<X>) typeInfo;
-		
-		int[] logicalKeyPositions = keys.computeLogicalKeyPositions();
-		int numKeyFields = logicalKeyPositions.length;
-		
-		// use ascending order here, the code paths for that are usually a slight bit faster
-		boolean[] orders = new boolean[numKeyFields];
-		TypeInformation<?>[] typeInfos = new TypeInformation<?>[numKeyFields];
-		for (int i = 0; i < numKeyFields; i++) {
-			orders[i] = true;
-			typeInfos[i] = compositeType.getTypeAt(logicalKeyPositions[i]);
-		}
-
-		TypeComparator<X> comparator = compositeType.createComparator(logicalKeyPositions, orders, 0, executionConfig);
-		return new ComparableKeySelector<>(comparator, numKeyFields, new TupleTypeInfo<>(typeInfos));
-	}
-
-	public static <X> ArrayKeySelector<X> getSelectorForArray(int[] positions, TypeInformation<X> typeInfo) {
-		if (positions == null || positions.length == 0 || positions.length > Tuple.MAX_ARITY) {
-			throw new IllegalArgumentException("Array keys must have between 1 and " + Tuple.MAX_ARITY + " fields.");
-		}
-		
-		TypeInformation<?> componentType;
-		
-		if (typeInfo instanceof BasicArrayTypeInfo) {
-			BasicArrayTypeInfo<X, ?>  arrayInfo = (BasicArrayTypeInfo<X, ?>) typeInfo;
-			componentType = arrayInfo.getComponentInfo();
-		}
-		else if (typeInfo instanceof PrimitiveArrayTypeInfo) {
-			PrimitiveArrayTypeInfo<X> arrayType = (PrimitiveArrayTypeInfo<X>) typeInfo;
-			componentType = arrayType.getComponentType();
-		}
-		else {
-			throw new IllegalArgumentException("This method only supports arrays of primitives and boxed primitives.");
-		}
-		
-		TypeInformation<?>[] primitiveInfos = new TypeInformation<?>[positions.length];
-		Arrays.fill(primitiveInfos, componentType);
-
-		return new ArrayKeySelector<>(positions, new TupleTypeInfo<>(primitiveInfos));
-	}
-
-	
-	public static <X, K> KeySelector<X, K> getSelectorForOneKey(
-			Keys<X> keys, Partitioner<K> partitioner, TypeInformation<X> typeInfo, ExecutionConfig executionConfig)
-	{
-		if (!(typeInfo instanceof CompositeType)) {
-			throw new InvalidTypesException(
-					"This key operation requires a composite type such as Tuples, POJOs, case classes, etc");
-		}
-		if (partitioner != null) {
-			keys.validateCustomPartitioner(partitioner, null);
-		}
-
-		CompositeType<X> compositeType = (CompositeType<X>) typeInfo;
-		int[] logicalKeyPositions = keys.computeLogicalKeyPositions();
-		if (logicalKeyPositions.length != 1) {
-			throw new IllegalArgumentException("There must be exactly 1 key specified");
-		}
-		
-		TypeComparator<X> comparator = compositeType.createComparator(
-				logicalKeyPositions, new boolean[] { true }, 0, executionConfig);
-		return new OneKeySelector<>(comparator);
-	}
-
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Private constructor to prevent instantiation.
-	 */
-	private KeySelectorUtil() {
-		throw new RuntimeException();
-	}
-
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Key extractor that extracts a single field via a generic comparator. 
-	 * 
-	 * @param <IN> The type of the elements where the key is extracted from.
-	 * @param <K> The type of the key.
-	 */
-	public static final class OneKeySelector<IN, K> implements KeySelector<IN, K> {
-
-		private static final long serialVersionUID = 1L;
-
-		private final TypeComparator<IN> comparator;
-
-		/** Reusable array to hold the key objects. Since this is initially empty (all positions
-		 * are null), it does not have any serialization problems */
-		@SuppressWarnings("NonSerializableFieldInSerializableClass")
-		private final Object[] keyArray;
-		
-		OneKeySelector(TypeComparator<IN> comparator) {
-			this.comparator = comparator;
-			this.keyArray = new Object[1];
-		}
-
-		@Override
-		@SuppressWarnings("unchecked")
-		public K getKey(IN value) throws Exception {
-			comparator.extractKeys(value, keyArray, 0);
-			return (K) keyArray[0];
-		}
-	}
-
-	// ------------------------------------------------------------------------
-	
-	/**
-	 * A key selector for selecting key fields via a TypeComparator.
-	 *
-	 * @param <IN> The type from which the key is extracted.
-	 */
-	public static final class ComparableKeySelector<IN> implements KeySelector<IN, Tuple>, ResultTypeQueryable<Tuple> {
-
-		private static final long serialVersionUID = 1L;
-
-		private final TypeComparator<IN> comparator;
-		private final int keyLength;
-		private transient TupleTypeInfo<Tuple> tupleTypeInfo;
-
-		/** Reusable array to hold the key objects. Since this is initially empty (all positions
-		 * are null), it does not have any serialization problems */
-		@SuppressWarnings("NonSerializableFieldInSerializableClass")
-		private final Object[] keyArray;
-
-		ComparableKeySelector(TypeComparator<IN> comparator, int keyLength, TupleTypeInfo<Tuple> tupleTypeInfo) {
-			this.comparator = comparator;
-			this.keyLength = keyLength;
-			this.tupleTypeInfo = tupleTypeInfo;
-			this.keyArray = new Object[keyLength];
-		}
-
-		@Override
-		public Tuple getKey(IN value) throws Exception {
-			Tuple key = Tuple.getTupleClass(keyLength).newInstance();
-			comparator.extractKeys(value, keyArray, 0);
-			for (int i = 0; i < keyLength; i++) {
-				key.setField(keyArray[i], i);
-			}
-			return key;
-		}
-
-		@Override
-		public TypeInformation<Tuple> getProducedType() {
-			if (tupleTypeInfo == null) {
-				throw new IllegalStateException("The return type information is not available after serialization");
-			}
-			return tupleTypeInfo;
-		}
-	}
-
-	// ------------------------------------------------------------------------
-	
-	/**
-	 * A key selector for selecting individual array fields as keys and returns them as a Tuple.
-	 * 
-	 * @param <IN> The type from which the key is extracted, i.e., the array type.
-	 */
-	public static final class ArrayKeySelector<IN> implements KeySelector<IN, Tuple>, ResultTypeQueryable<Tuple> {
-
-		private static final long serialVersionUID = 1L;
-		
-		private final int[] fields;
-		private final Class<? extends Tuple> tupleClass;
-		private transient TupleTypeInfo<Tuple> returnType;
-
-		ArrayKeySelector(int[] fields, TupleTypeInfo<Tuple> returnType) {
-			this.fields = requireNonNull(fields);
-			this.returnType = requireNonNull(returnType);
-			this.tupleClass = Tuple.getTupleClass(fields.length);
-		}
-
-		@Override
-		public Tuple getKey(IN value) throws Exception {
-			Tuple key = tupleClass.newInstance();
-			for (int i = 0; i < fields.length; i++) {
-				key.setField(Array.get(value, fields[i]), i);
-			}
-			return key;
-		}
-
-		@Override
-		public TypeInformation<Tuple> getProducedType() {
-			if (returnType == null) {
-				throw new IllegalStateException("The return type information is not available after serialization");
-			}
-			return returnType;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/DeserializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/DeserializationSchema.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/DeserializationSchema.java
deleted file mode 100644
index f0e4477..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/DeserializationSchema.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util.serialization;
-
-import java.io.Serializable;
-
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-
-/**
- * The deserialization schema describes how to turn the byte messages delivered by certain
- * data sources (for example Apache Kafka) into data types (Java/Scala objects) that are
- * processed by Flink.
- * 
- * @param <T> The type created by the deserialization schema.
- */
-public interface DeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {
-
-	/**
-	 * Deserializes the byte message.
-	 * 
-	 * @param message The message, as a byte array.
-	 * @return The deserialized message as an object.
-	 */
-	T deserialize(byte[] message);
-
-	/**
-	 * Method to decide whether the element signals the end of the stream. If
-	 * true is returned the element won't be emitted.
-	 * 
-	 * @param nextElement The element to test for the end-of-stream signal.
-	 * @return True, if the element signals end of stream, false otherwise.
-	 */
-	boolean isEndOfStream(T nextElement);
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/JavaDefaultStringSchema.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/JavaDefaultStringSchema.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/JavaDefaultStringSchema.java
deleted file mode 100644
index ebb785c..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/JavaDefaultStringSchema.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util.serialization;
-
-import org.apache.commons.lang3.SerializationUtils;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-
-public class JavaDefaultStringSchema implements DeserializationSchema<String>, SerializationSchema<String, byte[]> {
-
-	private static final long serialVersionUID = 1L;
-
-	@Override
-	public boolean isEndOfStream(String nextElement) {
-		return nextElement.equals("q");
-	}
-
-	@Override
-	public byte[] serialize(String element) {
-		return SerializationUtils.serialize(element);
-	}
-
-	@Override
-	public String deserialize(byte[] message) {
-		return SerializationUtils.deserialize(message);
-	}
-
-	@Override
-	public TypeInformation<String> getProducedType() {
-		return BasicTypeInfo.STRING_TYPE_INFO;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/RawSchema.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/RawSchema.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/RawSchema.java
deleted file mode 100644
index 4d9aaee..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/RawSchema.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util.serialization;
-
-import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-
-/**
- * A "no-op" serialization and deserialization schema for byte strings. The serialized representation is
- * identical with the original representation.
- * 
- * <p>This schema never considers a byte string to signal end-of-stream.</p>
- */
-public class RawSchema implements DeserializationSchema<byte[]>, SerializationSchema<byte[], byte[]> {
-
-	private static final long serialVersionUID = 1L;
-
-	@Override
-	public byte[] deserialize(byte[] message) {
-		return message;
-	}
-
-	@Override
-	public boolean isEndOfStream(byte[] nextElement) {
-		return false;
-	}
-
-	@Override
-	public byte[] serialize(byte[] element) {
-		return element;
-	}
-
-	@Override
-	public TypeInformation<byte[]> getProducedType() {
-		return PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/SerializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/SerializationSchema.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/SerializationSchema.java
deleted file mode 100644
index 21342b2..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/SerializationSchema.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util.serialization;
-
-import java.io.Serializable;
-
-/**
- * The serialization schema describes how to turn a data object into a different serialized
- * representation. Most data sinks (for example Apache Kafka) require the data to be handed
- * to them in a specific format (for example as byte strings).
- * 
- * @param <T> The type to be serialized.
- * @param <R> The serialized representation type.
- */
-public interface SerializationSchema<T, R> extends Serializable {
-
-	/**
-	 * Serializes the incoming element to a specified type.
-	 * 
-	 * @param element
-	 *            The incoming element to be serialized
-	 * @return The serialized element.
-	 */
-	R serialize(T element);
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/SimpleStringSchema.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/SimpleStringSchema.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/SimpleStringSchema.java
deleted file mode 100644
index 51d2d7f..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/SimpleStringSchema.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util.serialization;
-
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-
-public class SimpleStringSchema implements DeserializationSchema<String>,
-		SerializationSchema<String, String> {
-
-	private static final long serialVersionUID = 1L;
-
-	@Override
-	public String deserialize(byte[] message) {
-		return new String(message);
-	}
-
-	@Override
-	public boolean isEndOfStream(String nextElement) {
-		return false;
-	}
-
-	@Override
-	public String serialize(String element) {
-		return element;
-	}
-
-	@Override
-	public TypeInformation<String> getProducedType() {
-		return BasicTypeInfo.STRING_TYPE_INFO;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationSerializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationSerializationSchema.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationSerializationSchema.java
deleted file mode 100644
index 6ff9712..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationSerializationSchema.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util.serialization;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.typeutils.runtime.ByteArrayInputView;
-import org.apache.flink.runtime.util.DataOutputSerializer;
-
-import java.io.IOException;
-
-/**
- * A serialization and deserialization schema that uses Flink's serialization stack to
- * transform typed from and to byte arrays.
- * 
- * @param <T> The type to be serialized.
- */
-public class TypeInformationSerializationSchema<T> implements DeserializationSchema<T>, SerializationSchema<T, byte[]> {
-	
-	private static final long serialVersionUID = -5359448468131559102L;
-	
-	/** The serializer for the actual de-/serialization */
-	private final TypeSerializer<T> serializer;
-
-	/** The reusable output serialization buffer */
-	private transient DataOutputSerializer dos;
-
-	/** The type information, to be returned by {@link #getProducedType()}. It is
-	 * transient, because it is not serializable. Note that this means that the type information
-	 * is not available at runtime, but only prior to the first serialization / deserialization */
-	private transient TypeInformation<T> typeInfo;
-
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Creates a new de-/serialization schema for the given type.
-	 * 
-	 * @param typeInfo The type information for the type de-/serialized by this schema.
-	 * @param ec The execution config, which is used to parametrize the type serializers.
-	 */
-	public TypeInformationSerializationSchema(TypeInformation<T> typeInfo, ExecutionConfig ec) {
-		this.typeInfo = typeInfo;
-		this.serializer = typeInfo.createSerializer(ec);
-	}
-
-	// ------------------------------------------------------------------------
-	
-	@Override
-	public T deserialize(byte[] message) {
-		try {
-			return serializer.deserialize(new ByteArrayInputView(message));
-		}
-		catch (IOException e) {
-			throw new RuntimeException("Unable to deserialize message", e);
-		}
-	}
-
-	/**
-	 * This schema never considers an element to signal end-of-stream, so this method returns always false.
-	 * @param nextElement The element to test for the end-of-stream signal.
-	 * @return Returns false.
-	 */
-	@Override
-	public boolean isEndOfStream(T nextElement) {
-		return false;
-	}
-
-	@Override
-	public byte[] serialize(T element) {
-		if (dos == null) {
-			dos = new DataOutputSerializer(16);
-		}
-		
-		try {
-			serializer.serialize(element, dos);
-		}
-		catch (IOException e) {
-			throw new RuntimeException("Unable to serialize record", e);
-		}
-		
-		byte[] ret = dos.getByteArray();
-		if (ret.length != dos.length()) {
-			byte[] n = new byte[dos.length()];
-			System.arraycopy(ret, 0, n, 0, dos.length());
-			ret = n;
-		}
-		dos.clear();
-		return ret;
-	}
-
-	@Override
-	public TypeInformation<T> getProducedType() {
-		if (typeInfo != null) {
-			return typeInfo;
-		}
-		else {
-			throw new IllegalStateException(
-					"The type information is not available after this class has been serialized and distributed.");
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
deleted file mode 100644
index 1187fe6..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
+++ /dev/null
@@ -1,228 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-// We have it in this package because we could not mock the methods otherwise
-package org.apache.flink.runtime.io.network.partition.consumer;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.MemorySegmentFactory;
-import org.apache.flink.runtime.event.AbstractEvent;
-import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
-import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
-import org.apache.flink.runtime.io.network.api.serialization.RecordSerializer;
-import org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
-import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
-import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
-import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-import java.io.IOException;
-import java.util.concurrent.ConcurrentLinkedQueue;
-
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-/**
- * Test {@link InputGate} that allows setting multiple channels. Use
- * {@link #sendElement(Object, int)} to offer an element on a specific channel. Use
- * {@link #sendEvent(AbstractEvent, int)} to offer an event on the specified channel. Use
- * {@link #endInput()} to notify all channels of input end.
- */
-public class StreamTestSingleInputGate<T> extends TestSingleInputGate {
-
-	private final int numInputChannels;
-
-	private final TestInputChannel[] inputChannels;
-
-	private final int bufferSize;
-
-	private TypeSerializer<T> serializer;
-
-	private ConcurrentLinkedQueue<InputValue<Object>>[] inputQueues;
-
-	@SuppressWarnings("unchecked")
-	public StreamTestSingleInputGate(
-			int numInputChannels,
-			int bufferSize,
-			TypeSerializer<T> serializer) throws IOException, InterruptedException {
-		super(numInputChannels, false);
-
-		this.bufferSize = bufferSize;
-		this.serializer = serializer;
-
-		this.numInputChannels = numInputChannels;
-		inputChannels = new TestInputChannel[numInputChannels];
-
-		inputQueues = new ConcurrentLinkedQueue[numInputChannels];
-
-		setupInputChannels();
-		doReturn(bufferSize).when(inputGate).getPageSize();
-	}
-
-	@SuppressWarnings("unchecked")
-	private void setupInputChannels() throws IOException, InterruptedException {
-
-		for (int i = 0; i < numInputChannels; i++) {
-			final int channelIndex = i;
-			final RecordSerializer<SerializationDelegate<Object>> recordSerializer = new SpanningRecordSerializer<SerializationDelegate<Object>>();
-			final SerializationDelegate<Object> delegate = (SerializationDelegate<Object>) (SerializationDelegate<?>)
-					new SerializationDelegate<StreamElement>(new MultiplexingStreamRecordSerializer<T>(serializer));
-
-			inputQueues[channelIndex] = new ConcurrentLinkedQueue<InputValue<Object>>();
-			inputChannels[channelIndex] = new TestInputChannel(inputGate, i);
-
-
-			final Answer<Buffer> answer = new Answer<Buffer>() {
-				@Override
-				public Buffer answer(InvocationOnMock invocationOnMock) throws Throwable {
-					InputValue<Object> input = inputQueues[channelIndex].poll();
-					if (input != null && input.isStreamEnd()) {
-						when(inputChannels[channelIndex].getInputChannel().isReleased()).thenReturn(
-								true);
-						return EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE);
-					}
-					else if (input != null && input.isStreamRecord()) {
-						Object inputElement = input.getStreamRecord();
-						final Buffer buffer = new Buffer(
-								MemorySegmentFactory.allocateUnpooledSegment(bufferSize),
-								mock(BufferRecycler.class));
-						
-						recordSerializer.setNextBuffer(buffer);
-						delegate.setInstance(inputElement);
-						recordSerializer.addRecord(delegate);
-
-						// Call getCurrentBuffer to ensure size is set
-						return recordSerializer.getCurrentBuffer();
-					}
-					else if (input != null && input.isEvent()) {
-						AbstractEvent event = input.getEvent();
-						return EventSerializer.toBuffer(event);
-					}
-					else {
-						synchronized (inputQueues[channelIndex]) {
-							inputQueues[channelIndex].wait();
-							return answer(invocationOnMock);
-						}
-					}
-				}
-			};
-
-			when(inputChannels[channelIndex].getInputChannel().getNextBuffer()).thenAnswer(answer);
-
-			inputGate.setInputChannel(new IntermediateResultPartitionID(),
-					inputChannels[channelIndex].getInputChannel());
-		}
-	}
-
-	public void sendElement(Object element, int channel) {
-		synchronized (inputQueues[channel]) {
-			inputQueues[channel].add(InputValue.element(element));
-			inputQueues[channel].notifyAll();
-		}
-		inputGate.onAvailableBuffer(inputChannels[channel].getInputChannel());
-	}
-
-	public void sendEvent(AbstractEvent event, int channel) {
-		synchronized (inputQueues[channel]) {
-			inputQueues[channel].add(InputValue.event(event));
-			inputQueues[channel].notifyAll();
-		}
-		inputGate.onAvailableBuffer(inputChannels[channel].getInputChannel());
-	}
-
-	public void endInput() {
-		for (int i = 0; i < numInputChannels; i++) {
-			synchronized (inputQueues[i]) {
-				inputQueues[i].add(InputValue.streamEnd());
-				inputQueues[i].notifyAll();
-			}
-			inputGate.onAvailableBuffer(inputChannels[i].getInputChannel());
-		}
-	}
-
-	/**
-	 * Returns true iff all input queues are empty.
-	 */
-	public boolean allQueuesEmpty() {
-//		for (int i = 0; i < numInputChannels; i++) {
-//			synchronized (inputQueues[i]) {
-//				inputQueues[i].add(InputValue.<T>event(new DummyEvent()));
-//				inputQueues[i].notifyAll();
-//				inputGate.onAvailableBuffer(inputChannels[i].getInputChannel());
-//			}
-//		}
-
-		for (int i = 0; i < numInputChannels; i++) {
-			if (inputQueues[i].size() > 0) {
-				return false;
-			}
-		}
-		return true;
-	}
-
-	public static class InputValue<T> {
-		private Object elementOrEvent;
-		private boolean isStreamEnd;
-		private boolean isStreamRecord;
-		private boolean isEvent;
-
-		private InputValue(Object elementOrEvent, boolean isStreamEnd, boolean isEvent, boolean isStreamRecord) {
-			this.elementOrEvent = elementOrEvent;
-			this.isStreamEnd = isStreamEnd;
-			this.isStreamRecord = isStreamRecord;
-			this.isEvent = isEvent;
-		}
-
-		public static <X> InputValue<X> element(Object element) {
-			return new InputValue<X>(element, false, false, true);
-		}
-
-		public static <X> InputValue<X> streamEnd() {
-			return new InputValue<X>(null, true, false, false);
-		}
-
-		public static <X> InputValue<X> event(AbstractEvent event) {
-			return new InputValue<X>(event, false, true, false);
-		}
-
-		public Object getStreamRecord() {
-			return elementOrEvent;
-		}
-
-		public AbstractEvent getEvent() {
-			return (AbstractEvent) elementOrEvent;
-		}
-
-		public boolean isStreamEnd() {
-			return isStreamEnd;
-		}
-
-		public boolean isStreamRecord() {
-			return isStreamRecord;
-		}
-
-		public boolean isEvent() {
-			return isEvent;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
deleted file mode 100644
index dd8dec9..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
+++ /dev/null
@@ -1,416 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api;
-
-import static org.junit.Assert.assertEquals;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-
-import com.google.common.collect.ImmutableList;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.operators.Keys;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction.AggregationType;
-import org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator;
-import org.apache.flink.streaming.api.functions.aggregation.SumAggregator;
-import org.apache.flink.streaming.api.operators.StreamGroupedReduce;
-import org.apache.flink.streaming.util.MockContext;
-import org.apache.flink.streaming.util.keys.KeySelectorUtil;
-
-import org.junit.Test;
-
-public class AggregationFunctionTest {
-
-	@Test
-	public void groupSumIntegerTest() {
-
-		// preparing expected outputs
-		List<Tuple2<Integer, Integer>> expectedGroupSumList = new ArrayList<>();
-		List<Tuple2<Integer, Integer>> expectedGroupMinList = new ArrayList<>();
-		List<Tuple2<Integer, Integer>> expectedGroupMaxList = new ArrayList<>();
-
-		int groupedSum0 = 0;
-		int groupedSum1 = 0;
-		int groupedSum2 = 0;
-
-		for (int i = 0; i < 9; i++) {
-			int groupedSum;
-			switch (i % 3) {
-				case 0:
-					groupedSum = groupedSum0 += i;
-					break;
-				case 1:
-					groupedSum = groupedSum1 += i;
-					break;
-				default:
-					groupedSum = groupedSum2 += i;
-					break;
-			}
-
-			expectedGroupSumList.add(new Tuple2<>(i % 3, groupedSum));
-			expectedGroupMinList.add(new Tuple2<>(i % 3, i % 3));
-			expectedGroupMaxList.add(new Tuple2<>(i % 3, i));
-		}
-
-		// some necessary boiler plate
-		TypeInformation<Tuple2<Integer, Integer>> typeInfo = TypeExtractor.getForObject(new Tuple2<>(0, 0));
-
-		ExecutionConfig config = new ExecutionConfig();
-
-		KeySelector<Tuple2<Integer, Integer>, Tuple> keySelector = KeySelectorUtil.getSelectorForKeys(
-				new Keys.ExpressionKeys<>(new int[]{0}, typeInfo),
-				typeInfo, config);
-		TypeInformation<Tuple> keyType = TypeExtractor.getKeySelectorTypes(keySelector, typeInfo);
-
-		// aggregations tested
-		ReduceFunction<Tuple2<Integer, Integer>> sumFunction =
-				new SumAggregator<>(1, typeInfo, config);
-		ReduceFunction<Tuple2<Integer, Integer>> minFunction = new ComparableAggregator<>(
-				1, typeInfo, AggregationType.MIN, config);
-		ReduceFunction<Tuple2<Integer, Integer>> maxFunction = new ComparableAggregator<>(
-				1, typeInfo, AggregationType.MAX, config);
-
-		List<Tuple2<Integer, Integer>> groupedSumList = MockContext.createAndExecuteForKeyedStream(
-				new StreamGroupedReduce<>(sumFunction, typeInfo.createSerializer(config)),
-				getInputList(),
-				keySelector, keyType);
-
-		List<Tuple2<Integer, Integer>> groupedMinList = MockContext.createAndExecuteForKeyedStream(
-				new StreamGroupedReduce<>(minFunction, typeInfo.createSerializer(config)),
-				getInputList(),
-				keySelector, keyType);
-
-		List<Tuple2<Integer, Integer>> groupedMaxList = MockContext.createAndExecuteForKeyedStream(
-				new StreamGroupedReduce<>(maxFunction, typeInfo.createSerializer(config)),
-				getInputList(),
-				keySelector, keyType);
-
-		assertEquals(expectedGroupSumList, groupedSumList);
-		assertEquals(expectedGroupMinList, groupedMinList);
-		assertEquals(expectedGroupMaxList, groupedMaxList);
-	}
-
-	@Test
-	public void pojoGroupSumIntegerTest() {
-
-		// preparing expected outputs
-		List<MyPojo> expectedGroupSumList = new ArrayList<>();
-		List<MyPojo> expectedGroupMinList = new ArrayList<>();
-		List<MyPojo> expectedGroupMaxList = new ArrayList<>();
-
-		int groupedSum0 = 0;
-		int groupedSum1 = 0;
-		int groupedSum2 = 0;
-
-		for (int i = 0; i < 9; i++) {
-			int groupedSum;
-			switch (i % 3) {
-				case 0:
-					groupedSum = groupedSum0 += i;
-					break;
-				case 1:
-					groupedSum = groupedSum1 += i;
-					break;
-				default:
-					groupedSum = groupedSum2 += i;
-					break;
-			}
-
-			expectedGroupSumList.add(new MyPojo(i % 3, groupedSum));
-			expectedGroupMinList.add(new MyPojo(i % 3, i % 3));
-			expectedGroupMaxList.add(new MyPojo(i % 3, i));
-		}
-
-		// some necessary boiler plate
-		TypeInformation<MyPojo> typeInfo = TypeExtractor.getForObject(new MyPojo(0, 0));
-
-		ExecutionConfig config = new ExecutionConfig();
-
-		KeySelector<MyPojo, Tuple> keySelector = KeySelectorUtil.getSelectorForKeys(
-				new Keys.ExpressionKeys<>(new String[]{"f0"}, typeInfo),
-				typeInfo, config);
-		TypeInformation<Tuple> keyType = TypeExtractor.getKeySelectorTypes(keySelector, typeInfo);
-
-		// aggregations tested
-		ReduceFunction<MyPojo> sumFunction = new SumAggregator<>("f1", typeInfo, config);
-		ReduceFunction<MyPojo> minFunction = new ComparableAggregator<>("f1", typeInfo, AggregationType.MIN,
-				false, config);
-		ReduceFunction<MyPojo> maxFunction = new ComparableAggregator<>("f1", typeInfo, AggregationType.MAX,
-				false, config);
-
-		List<MyPojo> groupedSumList = MockContext.createAndExecuteForKeyedStream(
-				new StreamGroupedReduce<>(sumFunction, typeInfo.createSerializer(config)),
-				getInputPojoList(),
-				keySelector, keyType);
-		
-		List<MyPojo> groupedMinList = MockContext.createAndExecuteForKeyedStream(
-				new StreamGroupedReduce<>(minFunction, typeInfo.createSerializer(config)),
-				getInputPojoList(),
-				keySelector, keyType);
-
-		List<MyPojo> groupedMaxList = MockContext.createAndExecuteForKeyedStream(
-				new StreamGroupedReduce<>(maxFunction, typeInfo.createSerializer(config)),
-				getInputPojoList(),
-				keySelector, keyType);
-
-		assertEquals(expectedGroupSumList, groupedSumList);
-		assertEquals(expectedGroupMinList, groupedMinList);
-		assertEquals(expectedGroupMaxList, groupedMaxList);
-	}
-	
-	@Test
-	public void minMaxByTest() {
-		// Tuples are grouped on field 0, aggregated on field 1
-		
-		// preparing expected outputs
-		List<Tuple3<Integer, Integer, Integer>> maxByFirstExpected = ImmutableList.of(
-				Tuple3.of(0,0,0), Tuple3.of(0,1,1), Tuple3.of(0,2,2),
-				Tuple3.of(0,2,2), Tuple3.of(0,2,2), Tuple3.of(0,2,2),
-				Tuple3.of(0,2,2), Tuple3.of(0,2,2), Tuple3.of(0,2,2));
-
-		List<Tuple3<Integer, Integer, Integer>> maxByLastExpected = ImmutableList.of(
-				Tuple3.of(0, 0, 0), Tuple3.of(0, 1, 1), Tuple3.of(0, 2, 2),
-				Tuple3.of(0, 2, 2), Tuple3.of(0, 2, 2), Tuple3.of(0, 2, 5),
-				Tuple3.of(0, 2, 5), Tuple3.of(0, 2, 5), Tuple3.of(0, 2, 8));
-
-		List<Tuple3<Integer, Integer, Integer>> minByFirstExpected = ImmutableList.of(
-				Tuple3.of(0,0,0), Tuple3.of(0,0,0), Tuple3.of(0,0,0),
-				Tuple3.of(0,0,0), Tuple3.of(0,0,0), Tuple3.of(0,0,0),
-				Tuple3.of(0,0,0), Tuple3.of(0,0,0), Tuple3.of(0,0,0));
-
-		List<Tuple3<Integer, Integer, Integer>> minByLastExpected = ImmutableList.of(
-				Tuple3.of(0, 0, 0), Tuple3.of(0, 0, 0), Tuple3.of(0, 0, 0),
-				Tuple3.of(0, 0, 3), Tuple3.of(0, 0, 3), Tuple3.of(0, 0, 3),
-				Tuple3.of(0, 0, 6), Tuple3.of(0, 0, 6), Tuple3.of(0, 0, 6));
-
-		// some necessary boiler plate
-		TypeInformation<Tuple3<Integer, Integer, Integer>> typeInfo = TypeExtractor
-				.getForObject(Tuple3.of(0,0,0));
-
-		ExecutionConfig config = new ExecutionConfig();
-
-		KeySelector<Tuple3<Integer, Integer, Integer>, Tuple> keySelector = KeySelectorUtil.getSelectorForKeys(
-				new Keys.ExpressionKeys<>(new int[]{0}, typeInfo),
-				typeInfo, config);
-		TypeInformation<Tuple> keyType = TypeExtractor.getKeySelectorTypes(keySelector, typeInfo);
-		
-		// aggregations tested
-		ReduceFunction<Tuple3<Integer, Integer, Integer>> maxByFunctionFirst =
-				new ComparableAggregator<>(1, typeInfo, AggregationType.MAXBY, true, config);
-		ReduceFunction<Tuple3<Integer, Integer, Integer>> maxByFunctionLast =
-				new ComparableAggregator<>(1, typeInfo, AggregationType.MAXBY, false, config);
-		ReduceFunction<Tuple3<Integer, Integer, Integer>> minByFunctionFirst =
-				new ComparableAggregator<>(1, typeInfo, AggregationType.MINBY, true, config);
-		ReduceFunction<Tuple3<Integer, Integer, Integer>> minByFunctionLast =
-				new ComparableAggregator<>(1, typeInfo, AggregationType.MINBY, false, config);
-
-		assertEquals(maxByFirstExpected, MockContext.createAndExecuteForKeyedStream(
-				new StreamGroupedReduce<>(maxByFunctionFirst, typeInfo.createSerializer(config)),
-				getInputByList(),
-				keySelector, keyType));
-		
-		assertEquals(maxByLastExpected, MockContext.createAndExecuteForKeyedStream(
-				new StreamGroupedReduce<>(maxByFunctionLast, typeInfo.createSerializer(config)),
-				getInputByList(),
-				keySelector, keyType));
-		
-		assertEquals(minByLastExpected, MockContext.createAndExecuteForKeyedStream(
-				new StreamGroupedReduce<>(minByFunctionLast, typeInfo.createSerializer(config)),
-				getInputByList(),
-				keySelector, keyType));
-		
-		assertEquals(minByFirstExpected, MockContext.createAndExecuteForKeyedStream(
-				new StreamGroupedReduce<>(minByFunctionFirst, typeInfo.createSerializer(config)),
-				getInputByList(),
-				keySelector, keyType));
-	}
-
-	@Test
-	public void pojoMinMaxByTest() {
-		// Pojos are grouped on field 0, aggregated on field 1
-
-		// preparing expected outputs
-		List<MyPojo3> maxByFirstExpected = ImmutableList.of(
-				new MyPojo3(0, 0), new MyPojo3(1, 1), new MyPojo3(2, 2),
-				new MyPojo3(2, 2), new MyPojo3(2, 2), new MyPojo3(2, 2),
-				new MyPojo3(2, 2), new MyPojo3(2, 2), new MyPojo3(2, 2));
-
-		List<MyPojo3> maxByLastExpected = ImmutableList.of(
-				new MyPojo3(0, 0), new MyPojo3(1, 1), new MyPojo3(2, 2),
-				new MyPojo3(2, 2), new MyPojo3(2, 2), new MyPojo3(2, 5),
-				new MyPojo3(2, 5), new MyPojo3(2, 5), new MyPojo3(2, 8));
-
-		List<MyPojo3> minByFirstExpected = ImmutableList.of(
-				new MyPojo3(0, 0), new MyPojo3(0, 0), new MyPojo3(0, 0),
-				new MyPojo3(0, 0), new MyPojo3(0, 0), new MyPojo3(0, 0),
-				new MyPojo3(0, 0), new MyPojo3(0, 0), new MyPojo3(0, 0));
-
-		List<MyPojo3> minByLastExpected = ImmutableList.of(
-				new MyPojo3(0, 0), new MyPojo3(0, 0), new MyPojo3(0, 0),
-				new MyPojo3(0, 3), new MyPojo3(0, 3), new MyPojo3(0, 3),
-				new MyPojo3(0, 6), new MyPojo3(0, 6), new MyPojo3(0, 6));
-
-		// some necessary boiler plate
-		TypeInformation<MyPojo3> typeInfo = TypeExtractor.getForObject(new MyPojo3(0, 0));
-
-		ExecutionConfig config = new ExecutionConfig();
-
-		KeySelector<MyPojo3, Tuple> keySelector = KeySelectorUtil.getSelectorForKeys(
-				new Keys.ExpressionKeys<>(new String[]{"f0"}, typeInfo),
-				typeInfo, config);
-		TypeInformation<Tuple> keyType = TypeExtractor.getKeySelectorTypes(keySelector, typeInfo);
-
-		// aggregations tested
-		ReduceFunction<MyPojo3> maxByFunctionFirst =
-				new ComparableAggregator<>("f1", typeInfo, AggregationType.MAXBY, true, config);
-		ReduceFunction<MyPojo3> maxByFunctionLast =
-				new ComparableAggregator<>("f1", typeInfo, AggregationType.MAXBY, false, config);
-		ReduceFunction<MyPojo3> minByFunctionFirst =
-				new ComparableAggregator<>("f1", typeInfo, AggregationType.MINBY, true, config);
-		ReduceFunction<MyPojo3> minByFunctionLast =
-				new ComparableAggregator<>("f1", typeInfo, AggregationType.MINBY, false, config);
-
-		assertEquals(maxByFirstExpected, MockContext.createAndExecuteForKeyedStream(
-						new StreamGroupedReduce<>(maxByFunctionFirst, typeInfo.createSerializer(config)),
-						getInputByPojoList(),
-						keySelector, keyType));
-		
-		assertEquals(maxByLastExpected, MockContext.createAndExecuteForKeyedStream(
-				new StreamGroupedReduce<>(maxByFunctionLast, typeInfo.createSerializer(config)),
-				getInputByPojoList(),
-				keySelector, keyType));
-		
-		assertEquals(minByLastExpected, MockContext.createAndExecuteForKeyedStream(
-				new StreamGroupedReduce<>(minByFunctionLast, typeInfo.createSerializer(config)),
-				getInputByPojoList(),
-				keySelector, keyType));
-
-		assertEquals(minByFirstExpected, MockContext.createAndExecuteForKeyedStream(
-				new StreamGroupedReduce<>(minByFunctionFirst, typeInfo.createSerializer(config)),
-				getInputByPojoList(),
-				keySelector, keyType));
-	}
-
-	// *************************************************************************
-	//     UTILS
-	// *************************************************************************
-
-	private List<Tuple2<Integer, Integer>> getInputList() {
-		ArrayList<Tuple2<Integer, Integer>> inputList = new ArrayList<>();
-		for (int i = 0; i < 9; i++) {
-			inputList.add(Tuple2.of(i % 3, i));
-		}
-		return inputList;
-	}
-
-	private List<MyPojo> getInputPojoList() {
-		ArrayList<MyPojo> inputList = new ArrayList<>();
-		for (int i = 0; i < 9; i++) {
-			inputList.add(new MyPojo(i % 3, i));
-		}
-		return inputList;
-	}
-
-	private List<Tuple3<Integer, Integer, Integer>> getInputByList() {
-		ArrayList<Tuple3<Integer, Integer, Integer>> inputList = new ArrayList<>();
-		for (int i = 0; i < 9; i++) {
-			inputList.add(Tuple3.of(0, i % 3, i));
-		}
-		return inputList;
-	}
-
-	private List<MyPojo3> getInputByPojoList() {
-		ArrayList<MyPojo3> inputList = new ArrayList<>();
-		for (int i = 0; i < 9; i++) {
-			inputList.add(new MyPojo3(i % 3, i));
-		}
-		return inputList;
-	}
-
-	public static class MyPojo implements Serializable {
-		
-		private static final long serialVersionUID = 1L;
-		public int f0;
-		public int f1;
-
-		public MyPojo(int f0, int f1) {
-			this.f0 = f0;
-			this.f1 = f1;
-		}
-
-		public MyPojo() {
-		}
-
-		@Override
-		public String toString() {
-			return "POJO(" + f0 + "," + f1 + ")";
-		}
-
-		@Override
-		public boolean equals(Object other) {
-			if (other instanceof MyPojo) {
-				return this.f0 == ((MyPojo) other).f0 && this.f1 == ((MyPojo) other).f1;
-			} else {
-				return false;
-			}
-		}
-	}
-
-	public static class MyPojo3 implements Serializable {
-
-		private static final long serialVersionUID = 1L;
-		public int f0;
-		public int f1;
-		public int f2;
-
-		// Field 0 is always initialized to 0
-		public MyPojo3(int f1, int f2) {
-			this.f1 = f1;
-			this.f2 = f2;
-		}
-
-		public MyPojo3() {
-		}
-
-		@Override
-		public String toString() {
-			return "POJO3(" + f0 + "," + f1 + "," + f2 + ")";
-		}
-
-		@Override
-		public boolean equals(Object other) {
-			if (other instanceof MyPojo3) {
-				return this.f0 == ((MyPojo3) other).f0
-						&& this.f1 == ((MyPojo3) other).f1
-						&& this.f2 == ((MyPojo3) other).f2;
-			} else {
-				return false;
-			}
-		}
-
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/ChainedRuntimeContextTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/ChainedRuntimeContextTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/ChainedRuntimeContextTest.java
deleted file mode 100644
index 68a047c..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/ChainedRuntimeContextTest.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api;
-
-import static org.junit.Assert.*;
-
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import org.apache.flink.streaming.util.NoOpSink;
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
-
-import org.junit.Test;
-
-@SuppressWarnings("serial")
-public class ChainedRuntimeContextTest extends StreamingMultipleProgramsTestBase {
-	private static RuntimeContext srcContext;
-	private static RuntimeContext mapContext;
-
-	@Test
-	public void test() throws Exception {
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		env.setParallelism(1);
-
-		env.addSource(new TestSource()).map(new TestMap()).addSink(new NoOpSink<Integer>());
-		env.execute();
-
-		assertNotEquals(srcContext, mapContext);
-
-	}
-
-	private static class TestSource extends RichParallelSourceFunction<Integer> {
-
-		@Override
-		public void run(SourceContext<Integer> ctx) throws Exception {
-		}
-
-		@Override
-		public void cancel() {
-		}
-
-		@Override
-		public void open(Configuration c) {
-			srcContext = getRuntimeContext();
-		}
-
-	}
-
-	private static class TestMap extends RichMapFunction<Integer, Integer> {
-
-		@Override
-		public Integer map(Integer value) throws Exception {
-			return value;
-		}
-
-		@Override
-		public void open(Configuration c) {
-			mapContext = getRuntimeContext();
-		}
-
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/CoStreamTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/CoStreamTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/CoStreamTest.java
deleted file mode 100644
index 0f9cbe9..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/CoStreamTest.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.flink.api.common.functions.FilterFunction;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
-import org.apache.flink.streaming.util.TestListResultSink;
-import org.apache.flink.streaming.util.TestStreamEnvironment;
-import org.apache.flink.util.Collector;
-import org.junit.Test;
-
-public class CoStreamTest extends StreamingMultipleProgramsTestBase {
-
-	private static ArrayList<String> expected = new ArrayList<String>();
-
-	@Test
-	public void test() {
-
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		env.setParallelism(1);
-
-		TestListResultSink<String> resultSink = new TestListResultSink<String>();
-
-		DataStream<Integer> src = env.fromElements(1, 3, 5);
-
-		DataStream<Integer> filter1 = src.filter(new FilterFunction<Integer>() {
-	
-			private static final long serialVersionUID = 1L;
-
-			@Override
-			public boolean filter(Integer value) throws Exception {
-				return true;
-			}
-		}).keyBy(new KeySelector<Integer, Integer>() {
-
-			private static final long serialVersionUID = 1L;
-
-			@Override
-			public Integer getKey(Integer value) throws Exception {
-				return value;
-			}
-		});
-
-		DataStream<Tuple2<Integer, Integer>> filter2 = src
-				.map(new MapFunction<Integer, Tuple2<Integer, Integer>>() {
-
-					private static final long serialVersionUID = 1L;
-
-					@Override
-					public Tuple2<Integer, Integer> map(Integer value) throws Exception {
-						return new Tuple2<Integer, Integer>(value, value + 1);
-					}
-				})
-				.rebalance()
-				.filter(new FilterFunction<Tuple2<Integer, Integer>>() {
-
-					private static final long serialVersionUID = 1L;
-
-					@Override
-					public boolean filter(Tuple2<Integer, Integer> value) throws Exception {
-						return true;
-					}
-				}).disableChaining().keyBy(new KeySelector<Tuple2<Integer, Integer>, Integer>() {
-
-					private static final long serialVersionUID = 1L;
-
-					@Override
-					public Integer getKey(Tuple2<Integer, Integer> value) throws Exception {
-						return value.f0;
-					}
-				});
-
-		DataStream<String> connected = filter1.connect(filter2).flatMap(new CoFlatMapFunction<Integer, Tuple2<Integer, Integer>, String>() {
-
-			private static final long serialVersionUID = 1L;
-
-			@Override
-			public void flatMap1(Integer value, Collector<String> out) throws Exception {
-				out.collect(value.toString());
-			}
-
-			@Override
-			public void flatMap2(Tuple2<Integer, Integer> value, Collector<String> out) throws Exception {
-				out.collect(value.toString());
-			}
-		});
-
-		connected.addSink(resultSink);
-
-		try {
-			env.execute();
-		} catch (Exception e) {
-			e.printStackTrace();
-		}
-
-		expected = new ArrayList<String>();
-		expected.addAll(Arrays.asList("(1,2)", "(3,4)", "(5,6)", "1", "3", "5"));
-
-		List<String> result = resultSink.getResult();
-		Collections.sort(result);
-
-		assertEquals(expected, result);
-	}
-}