You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2015/04/15 11:38:56 UTC

[15/19] flink git commit: [streaming] Major internal renaming and restructure

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/aggregation/ComparableAggregator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/aggregation/ComparableAggregator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/aggregation/ComparableAggregator.java
new file mode 100644
index 0000000..9860916
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/aggregation/ComparableAggregator.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.aggregation;
+
+import java.lang.reflect.Array;
+import java.lang.reflect.Field;
+import java.util.List;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.common.typeutils.CompositeType.FlatFieldDescriptor;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+import org.apache.flink.api.java.typeutils.runtime.PojoComparator;
+
+public abstract class ComparableAggregator<T> extends AggregationFunction<T> {
+
+	private static final long serialVersionUID = 1L;
+
+	public Comparator comparator;
+	public boolean byAggregate;
+	public boolean first;
+
+	public ComparableAggregator(int pos, AggregationType aggregationType, boolean first) {
+		super(pos);
+		this.comparator = Comparator.getForAggregation(aggregationType);
+		this.byAggregate = (aggregationType == AggregationType.MAXBY)
+				|| (aggregationType == AggregationType.MINBY);
+		this.first = first;
+	}
+
+	public static <R> AggregationFunction<R> getAggregator(int positionToAggregate,
+			TypeInformation<R> typeInfo, AggregationType aggregationType) {
+		return getAggregator(positionToAggregate, typeInfo, aggregationType, false);
+	}
+
+	public static <R> AggregationFunction<R> getAggregator(int positionToAggregate,
+			TypeInformation<R> typeInfo, AggregationType aggregationType, boolean first) {
+
+		if (typeInfo.isTupleType()) {
+			return new TupleComparableAggregator<R>(positionToAggregate, aggregationType, first);
+		} else if (typeInfo instanceof BasicArrayTypeInfo
+				|| typeInfo instanceof PrimitiveArrayTypeInfo) {
+			return new ArrayComparableAggregator<R>(positionToAggregate, aggregationType, first);
+		} else {
+			return new SimpleComparableAggregator<R>(aggregationType);
+		}
+	}
+
+	public static <R> AggregationFunction<R> getAggregator(String field,
+			TypeInformation<R> typeInfo, AggregationType aggregationType, boolean first,
+			ExecutionConfig config) {
+
+		return new PojoComparableAggregator<R>(field, typeInfo, aggregationType, first, config);
+	}
+
+	private static class TupleComparableAggregator<T> extends ComparableAggregator<T> {
+
+		private static final long serialVersionUID = 1L;
+
+		public TupleComparableAggregator(int pos, AggregationType aggregationType, boolean first) {
+			super(pos, aggregationType, first);
+		}
+
+		@SuppressWarnings("unchecked")
+		@Override
+		public T reduce(T value1, T value2) throws Exception {
+			Tuple tuple1 = (Tuple) value1;
+			Tuple tuple2 = (Tuple) value2;
+
+			Comparable<Object> o1 = tuple1.getField(position);
+			Object o2 = tuple2.getField(position);
+
+			int c = comparator.isExtremal(o1, o2);
+
+			if (byAggregate) {
+				if (c == 1) {
+					return (T) tuple1;
+				}
+				if (first) {
+					if (c == 0) {
+						return (T) tuple1;
+					}
+				}
+
+				return (T) tuple2;
+
+			} else {
+				if (c == 1) {
+					tuple2.setField(o1, position);
+				}
+				return (T) tuple2;
+			}
+
+		}
+	}
+
+	private static class ArrayComparableAggregator<T> extends ComparableAggregator<T> {
+
+		private static final long serialVersionUID = 1L;
+
+		public ArrayComparableAggregator(int pos, AggregationType aggregationType, boolean first) {
+			super(pos, aggregationType, first);
+		}
+
+		@SuppressWarnings("unchecked")
+		@Override
+		public T reduce(T array1, T array2) throws Exception {
+
+			Object v1 = Array.get(array1, position);
+			Object v2 = Array.get(array2, position);
+
+			int c = comparator.isExtremal((Comparable<Object>) v1, v2);
+
+			if (byAggregate) {
+				if (c == 1) {
+					return array1;
+				}
+				if (first) {
+					if (c == 0) {
+						return array1;
+					}
+				}
+
+				return array2;
+			} else {
+				if (c == 1) {
+					Array.set(array2, position, v1);
+				}
+
+				return array2;
+			}
+		}
+
+	}
+
+	private static class SimpleComparableAggregator<T> extends ComparableAggregator<T> {
+
+		private static final long serialVersionUID = 1L;
+
+		public SimpleComparableAggregator(AggregationType aggregationType) {
+			super(0, aggregationType, false);
+		}
+
+		@SuppressWarnings("unchecked")
+		@Override
+		public T reduce(T value1, T value2) throws Exception {
+
+			if (comparator.isExtremal((Comparable<Object>) value1, value2) == 1) {
+				return value1;
+			} else {
+				return value2;
+			}
+		}
+
+	}
+
+	private static class PojoComparableAggregator<T> extends ComparableAggregator<T> {
+
+		private static final long serialVersionUID = 1L;
+		PojoComparator<T> pojoComparator;
+
+		public PojoComparableAggregator(String field, TypeInformation<?> typeInfo,
+				AggregationType aggregationType, boolean first, ExecutionConfig config) {
+			super(0, aggregationType, first);
+			if (!(typeInfo instanceof CompositeType<?>)) {
+				throw new IllegalArgumentException(
+						"Key expressions are only supported on POJO types and Tuples. "
+								+ "A type is considered a POJO if all its fields are public, or have both getters and setters defined");
+			}
+
+			@SuppressWarnings("unchecked")
+			CompositeType<T> cType = (CompositeType<T>) typeInfo;
+
+			List<FlatFieldDescriptor> fieldDescriptors = cType.getFlatFields(field);
+			int logicalKeyPosition = fieldDescriptors.get(0).getPosition();
+
+			if (cType instanceof PojoTypeInfo) {
+				pojoComparator = (PojoComparator<T>) cType.createComparator(
+						new int[] { logicalKeyPosition }, new boolean[] { false }, 0, config);
+			} else {
+				throw new IllegalArgumentException(
+						"Key expressions are only supported on POJO types. "
+								+ "A type is considered a POJO if all its fields are public, or have both getters and setters defined");
+			}
+		}
+
+		@Override
+		public T reduce(T value1, T value2) throws Exception {
+
+			Field[] keyFields = pojoComparator.getKeyFields();
+			Object field1 = pojoComparator.accessField(keyFields[0], value1);
+			Object field2 = pojoComparator.accessField(keyFields[0], value2);
+
+			@SuppressWarnings("unchecked")
+			int c = comparator.isExtremal((Comparable<Object>) field1, field2);
+
+			if (byAggregate) {
+				if (c == 1) {
+					return value1;
+				}
+				if (first) {
+					if (c == 0) {
+						return value1;
+					}
+				}
+
+				return value2;
+			} else {
+				if (c == 1) {
+					keyFields[0].set(value2, field1);
+				}
+
+				return value2;
+			}
+		}
+
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/aggregation/Comparator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/aggregation/Comparator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/aggregation/Comparator.java
new file mode 100644
index 0000000..f85a2e1
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/aggregation/Comparator.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.aggregation;
+
+import java.io.Serializable;
+
+import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction.AggregationType;
+
+public abstract class Comparator implements Serializable {
+
+	private static final long serialVersionUID = 1L;
+
+	public abstract <R> int isExtremal(Comparable<R> o1, R o2);
+
+	public static Comparator getForAggregation(AggregationType type) {
+		switch (type) {
+		case MAX:
+			return new MaxComparator();
+		case MIN:
+			return new MinComparator();
+		case MINBY:
+			return new MinByComparator();
+		case MAXBY:
+			return new MaxByComparator();
+		default:
+			throw new IllegalArgumentException("Unsupported aggregation type.");
+		}
+	}
+
+	private static class MaxComparator extends Comparator {
+
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public <R> int isExtremal(Comparable<R> o1, R o2) {
+			return o1.compareTo(o2) > 0 ? 1 : 0;
+		}
+
+	}
+
+	private static class MaxByComparator extends Comparator {
+
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public <R> int isExtremal(Comparable<R> o1, R o2) {
+			int c = o1.compareTo(o2);
+			if (c > 0) {
+				return 1;
+			}
+			if (c == 0) {
+				return 0;
+			} else {
+				return -1;
+			}
+		}
+
+	}
+
+	private static class MinByComparator extends Comparator {
+
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public <R> int isExtremal(Comparable<R> o1, R o2) {
+			int c = o1.compareTo(o2);
+			if (c < 0) {
+				return 1;
+			}
+			if (c == 0) {
+				return 0;
+			} else {
+				return -1;
+			}
+		}
+
+	}
+
+	private static class MinComparator extends Comparator {
+
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public <R> int isExtremal(Comparable<R> o1, R o2) {
+			return o1.compareTo(o2) < 0 ? 1 : 0;
+		}
+
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/aggregation/SumAggregator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/aggregation/SumAggregator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/aggregation/SumAggregator.java
new file mode 100644
index 0000000..61337c9
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/aggregation/SumAggregator.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.aggregation;
+
+import java.lang.reflect.Array;
+import java.lang.reflect.Field;
+import java.util.List;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.common.typeutils.CompositeType.FlatFieldDescriptor;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+import org.apache.flink.api.java.typeutils.runtime.PojoComparator;
+
+public abstract class SumAggregator {
+
+	public static <T> ReduceFunction<T> getSumFunction(int pos, Class<?> clazz,
+			TypeInformation<T> typeInfo) {
+
+		if (typeInfo.isTupleType()) {
+			return new TupleSumAggregator<T>(pos, SumFunction.getForClass(clazz));
+		} else if (typeInfo instanceof BasicArrayTypeInfo
+				|| typeInfo instanceof PrimitiveArrayTypeInfo) {
+			return new ArraySumAggregator<T>(pos, SumFunction.getForClass(clazz));
+		} else {
+			return new SimpleSumAggregator<T>(SumFunction.getForClass(clazz));
+		}
+
+	}
+
+	public static <T> ReduceFunction<T> getSumFunction(String field, TypeInformation<T> typeInfo,
+			ExecutionConfig config) {
+
+		return new PojoSumAggregator<T>(field, typeInfo, config);
+	}
+
+	private static class TupleSumAggregator<T> extends AggregationFunction<T> {
+
+		private static final long serialVersionUID = 1L;
+
+		SumFunction adder;
+
+		public TupleSumAggregator(int pos, SumFunction adder) {
+			super(pos);
+			this.adder = adder;
+		}
+
+		@SuppressWarnings("unchecked")
+		@Override
+		public T reduce(T value1, T value2) throws Exception {
+
+			Tuple tuple1 = (Tuple) value1;
+			Tuple tuple2 = (Tuple) value2;
+
+			tuple2.setField(adder.add(tuple1.getField(position), tuple2.getField(position)),
+					position);
+
+			return (T) tuple2;
+		}
+
+	}
+
+	private static class ArraySumAggregator<T> extends AggregationFunction<T> {
+
+		private static final long serialVersionUID = 1L;
+
+		SumFunction adder;
+
+		public ArraySumAggregator(int pos, SumFunction adder) {
+			super(pos);
+			this.adder = adder;
+		}
+
+		@Override
+		public T reduce(T value1, T value2) throws Exception {
+
+			Object v1 = Array.get(value1, position);
+			Object v2 = Array.get(value2, position);
+			Array.set(value2, position, adder.add(v1, v2));
+			return value2;
+		}
+
+	}
+
+	private static class SimpleSumAggregator<T> extends AggregationFunction<T> {
+
+		private static final long serialVersionUID = 1L;
+
+		SumFunction adder;
+
+		public SimpleSumAggregator(SumFunction adder) {
+			super(0);
+			this.adder = adder;
+		}
+
+		@SuppressWarnings("unchecked")
+		@Override
+		public T reduce(T value1, T value2) throws Exception {
+
+			return (T) adder.add(value1, value2);
+		}
+
+	}
+
+	private static class PojoSumAggregator<T> extends AggregationFunction<T> {
+
+		private static final long serialVersionUID = 1L;
+		SumFunction adder;
+		PojoComparator<T> comparator;
+
+		public PojoSumAggregator(String field, TypeInformation<?> type, ExecutionConfig config) {
+			super(0);
+			if (!(type instanceof CompositeType<?>)) {
+				throw new IllegalArgumentException(
+						"Key expressions are only supported on POJO types and Tuples. "
+								+ "A type is considered a POJO if all its fields are public, or have both getters and setters defined");
+			}
+
+			@SuppressWarnings("unchecked")
+			CompositeType<T> cType = (CompositeType<T>) type;
+
+			List<FlatFieldDescriptor> fieldDescriptors = cType.getFlatFields(field);
+
+			int logicalKeyPosition = fieldDescriptors.get(0).getPosition();
+			Class<?> keyClass = fieldDescriptors.get(0).getType().getTypeClass();
+
+			adder = SumFunction.getForClass(keyClass);
+
+			if (cType instanceof PojoTypeInfo) {
+				comparator = (PojoComparator<T>) cType.createComparator(
+						new int[] { logicalKeyPosition }, new boolean[] { false }, 0, config);
+			} else {
+				throw new IllegalArgumentException(
+						"Key expressions are only supported on POJO types. "
+								+ "A type is considered a POJO if all its fields are public, or have both getters and setters defined");
+			}
+		}
+
+		@Override
+		public T reduce(T value1, T value2) throws Exception {
+
+			Field[] keyFields = comparator.getKeyFields();
+			Object field1 = comparator.accessField(keyFields[0], value1);
+			Object field2 = comparator.accessField(keyFields[0], value2);
+
+			keyFields[0].set(value2, adder.add(field1, field2));
+
+			return value2;
+		}
+
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/aggregation/SumFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/aggregation/SumFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/aggregation/SumFunction.java
new file mode 100644
index 0000000..e5fa4c3
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/aggregation/SumFunction.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.aggregation;
+
+import java.io.Serializable;
+
+public abstract class SumFunction implements Serializable{
+
+	private static final long serialVersionUID = 1L;
+
+	public abstract Object add(Object o1, Object o2);
+
+	public static SumFunction getForClass(Class<?> clazz) {
+
+		if (clazz == Integer.class) {
+			return new IntSum();
+		} else if (clazz == Long.class) {
+			return new LongSum();
+		} else if (clazz == Short.class) {
+			return new ShortSum();
+		} else if (clazz == Double.class) {
+			return new DoubleSum();
+		} else if (clazz == Float.class) {
+			return new FloatSum();
+		} else if (clazz == Byte.class) {
+			return new ByteSum();
+		} else {
+			throw new RuntimeException("DataStream cannot be summed because the class "
+					+ clazz.getSimpleName() + " does not support the + operator.");
+		}
+	}
+
+	public static class IntSum extends SumFunction {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public Object add(Object value1, Object value2) {
+			return (Integer) value1 + (Integer) value2;
+		}
+	}
+
+	public static class LongSum extends SumFunction {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public Object add(Object value1, Object value2) {
+			return (Long) value1 + (Long) value2;
+		}
+	}
+
+	public static class DoubleSum extends SumFunction {
+
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public Object add(Object value1, Object value2) {
+			return (Double) value1 + (Double) value2;
+		}
+	}
+
+	public static class ShortSum extends SumFunction {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public Object add(Object value1, Object value2) {
+			return (short) ((Short) value1 + (Short) value2);
+		}
+	}
+
+	public static class FloatSum extends SumFunction {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public Object add(Object value1, Object value2) {
+			return (Float) value1 + (Float) value2;
+		}
+	}
+
+	public static class ByteSum extends SumFunction {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public Object add(Object value1, Object value2) {
+			return (byte) ((Byte) value1 + (Byte) value2);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/CoFlatMapFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/CoFlatMapFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/CoFlatMapFunction.java
new file mode 100644
index 0000000..1137dda
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/CoFlatMapFunction.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.co;
+
+import java.io.Serializable;
+
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.util.Collector;
+
+/**
+ * A CoFlatMapFunction represents a FlatMap transformation with two different
+ * input types.
+ *
+ * @param <IN1>
+ *            Type of the first input.
+ * @param <IN2>
+ *            Type of the second input.
+ * @param <OUT>
+ *            Output type.
+ */
+public interface CoFlatMapFunction<IN1, IN2, OUT> extends Function, Serializable {
+
+	public void flatMap1(IN1 value, Collector<OUT> out) throws Exception;
+
+	public void flatMap2(IN2 value, Collector<OUT> out) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/CoMapFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/CoMapFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/CoMapFunction.java
new file mode 100644
index 0000000..67ab672
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/CoMapFunction.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.co;
+
+import java.io.Serializable;
+
+import org.apache.flink.api.common.functions.Function;
+
+/**
+ * A CoMapFunction represents a Map transformation with two different input
+ * types.
+ *
+ * @param <IN1>
+ *            Type of the first input.
+ * @param <IN2>
+ *            Type of the second input.
+ * @param <OUT>
+ *            Output type.
+ */
+public interface CoMapFunction<IN1, IN2, OUT> extends Function, Serializable {
+
+	public OUT map1(IN1 value);
+
+	public OUT map2(IN2 value);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/CoReduceFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/CoReduceFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/CoReduceFunction.java
new file mode 100644
index 0000000..4342dfd
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/CoReduceFunction.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.co;
+
+import java.io.Serializable;
+
+import org.apache.flink.api.common.functions.Function;
+
+/**
+ * The CoReduceFunction interface represents a Reduce transformation with two
+ * different input streams. The reduce1 function combine groups of elements of
+ * the first input with the same key to a single value, while reduce2 combine
+ * groups of elements of the second input with the same key to a single value.
+ * Each produced values are mapped to the same type by map1 and map2,
+ * respectively, to form one output stream.
+ * 
+ * The basic syntax for using a grouped ReduceFunction is as follows:
+ * 
+ * <pre>
+ * <blockquote>
+ * ConnectedDataStream<X> input = ...;
+ * 
+ * ConnectedDataStream<X> result = input.groupBy(keyPosition1, keyPosition2)
+ *          .reduce(new MyCoReduceFunction(), keyPosition1, keyPosition2).addSink(...);
+ * </blockquote>
+ * </pre>
+ * <p>
+ * 
+ * @param <IN1>
+ *            Type of the first input.
+ * @param <IN2>
+ *            Type of the second input.
+ * @param <OUT>
+ *            Output type.
+ */
+public interface CoReduceFunction<IN1, IN2, OUT> extends Function, Serializable {
+
+	/**
+	 * The core method of CoReduceFunction, combining two values of the first
+	 * input into one value of the same type. The reduce1 function is
+	 * consecutively applied to all values of a group until only a single value
+	 * remains.
+	 *
+	 * @param value1
+	 *            The first value to combine.
+	 * @param value2
+	 *            The second value to combine.
+	 * @return The combined value of both input values.
+	 *
+	 * @throws Exception
+	 *             This method may throw exceptions. Throwing an exception will
+	 *             cause the operation to fail and may trigger recovery.
+	 */
+	public IN1 reduce1(IN1 value1, IN1 value2);
+
+	/**
+	 * The core method of ReduceFunction, combining two values of the second
+	 * input into one value of the same type. The reduce2 function is
+	 * consecutively applied to all values of a group until only a single value
+	 * remains.
+	 *
+	 * @param value1
+	 *            The first value to combine.
+	 * @param value2
+	 *            The second value to combine.
+	 * @return The combined value of both input values.
+	 *
+	 * @throws Exception
+	 *             This method may throw exceptions. Throwing an exception will
+	 *             cause the operation to fail and may trigger recovery.
+	 */
+	public IN2 reduce2(IN2 value1, IN2 value2);
+
+	/**
+	 * Maps the reduced first input to the output type.
+	 * 
+	 * @param value
+	 *            Type of the first input.
+	 * @return the output type.
+	 */
+	public OUT map1(IN1 value);
+
+	/**
+	 * Maps the reduced second input to the output type.
+	 * 
+	 * @param value
+	 *            Type of the second input.
+	 * @return the output type.
+	 */
+	public OUT map2(IN2 value);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/CoWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/CoWindowFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/CoWindowFunction.java
new file mode 100644
index 0000000..1e8d03c
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/CoWindowFunction.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.co;
+
+import java.io.Serializable;
+import java.util.List;
+
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.util.Collector;
+
+public interface CoWindowFunction<IN1, IN2, O> extends Function, Serializable {
+
+	public void coWindow(List<IN1> first, List<IN2> second, Collector<O> out) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/CrossWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/CrossWindowFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/CrossWindowFunction.java
new file mode 100644
index 0000000..e9c0169
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/CrossWindowFunction.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.co;
+
+import java.util.List;
+
+import org.apache.flink.api.common.functions.CrossFunction;
+import org.apache.flink.util.Collector;
+
+public class CrossWindowFunction<IN1, IN2, OUT> implements CoWindowFunction<IN1, IN2, OUT> {
+	private static final long serialVersionUID = 1L;
+
+	private CrossFunction<IN1, IN2, OUT> crossFunction;
+
+	public CrossWindowFunction(CrossFunction<IN1, IN2, OUT> crossFunction) {
+		this.crossFunction = crossFunction;
+	}
+
+	@Override
+	public void coWindow(List<IN1> first, List<IN2> second, Collector<OUT> out) throws Exception {
+		for (IN1 firstValue : first) {
+			for (IN2 secondValue : second) {
+				out.collect(crossFunction.cross(firstValue, secondValue));
+			}
+		}
+	}
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/JoinWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/JoinWindowFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/JoinWindowFunction.java
new file mode 100644
index 0000000..6f658c6
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/JoinWindowFunction.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.co;
+
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.util.Collector;
+
+public class JoinWindowFunction<IN1, IN2, OUT> implements CoWindowFunction<IN1, IN2, OUT> {
+	private static final long serialVersionUID = 1L;
+
+	private KeySelector<IN1, ?> keySelector1;
+	private KeySelector<IN2, ?> keySelector2;
+	private JoinFunction<IN1, IN2, OUT> joinFunction;
+
+	public JoinWindowFunction(KeySelector<IN1, ?> keySelector1, KeySelector<IN2, ?> keySelector2,
+			JoinFunction<IN1, IN2, OUT> joinFunction) {
+		this.keySelector1 = keySelector1;
+		this.keySelector2 = keySelector2;
+		this.joinFunction = joinFunction;
+	}
+
+	@Override
+	public void coWindow(List<IN1> first, List<IN2> second, Collector<OUT> out) throws Exception {
+
+		Map<Object, List<IN1>> map = build(first);
+
+		for (IN2 record : second) {
+			Object key = keySelector2.getKey(record);
+			List<IN1> match = map.get(key);
+			if (match != null) {
+				for (IN1 matching : match) {
+					out.collect(joinFunction.join(matching, record));
+				}
+			}
+		}
+
+	}
+
+	private Map<Object, List<IN1>> build(List<IN1> records) throws Exception {
+
+		Map<Object, List<IN1>> map = new HashMap<Object, List<IN1>>();
+
+		for (IN1 record : records) {
+			Object key = keySelector1.getKey(record);
+			List<IN1> current = map.get(key);
+			if (current == null) {
+				current = new LinkedList<IN1>();
+				map.put(key, current);
+			}
+			current.add(record);
+		}
+
+		return map;
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/RichCoFlatMapFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/RichCoFlatMapFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/RichCoFlatMapFunction.java
new file mode 100644
index 0000000..6746140
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/RichCoFlatMapFunction.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.co;
+
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.common.functions.RichFunction;
+
+/**
+ * A RichCoFlatMapFunction represents a FlatMap transformation with two different input
+ * types. In addition to that the user can use the features provided by the
+ * {@link RichFunction} interface.
+ *
+ * @param <IN1>
+ *            Type of the first input.
+ * @param <IN2>
+ *            Type of the second input.
+ * @param <OUT>
+ *            Output type.
+ */
+public abstract class RichCoFlatMapFunction<IN1, IN2, OUT> extends AbstractRichFunction implements
+		CoFlatMapFunction<IN1, IN2, OUT> {
+
+	private static final long serialVersionUID = 1L;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/RichCoMapFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/RichCoMapFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/RichCoMapFunction.java
new file mode 100644
index 0000000..e561408
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/RichCoMapFunction.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.co;
+
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.common.functions.RichFunction;
+
+/**
+ * A RichCoMapFunction represents a Map transformation with two different input
+ * types. In addition to that the user can use the features provided by the
+ * {@link RichFunction} interface.
+ *
+ * @param <IN1>
+ *            Type of the first input.
+ * @param <IN2>
+ *            Type of the second input.
+ * @param <OUT>
+ *            Output type.
+ */
+public abstract class RichCoMapFunction<IN1, IN2, OUT> extends AbstractRichFunction implements
+		CoMapFunction<IN1, IN2, OUT> {
+
+	private static final long serialVersionUID = 1L;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/RichCoReduceFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/RichCoReduceFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/RichCoReduceFunction.java
new file mode 100644
index 0000000..d3e6f3a
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/RichCoReduceFunction.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.co;
+
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.common.functions.RichFunction;
+
+/**
+ * A RichCoReduceFunction represents a Reduce transformation with two different input
+ * types. In addition to that the user can use the features provided by the
+ * {@link RichFunction} interface.
+ *
+ * @param <IN1>
+ *            Type of the first input.
+ * @param <IN2>
+ *            Type of the second input.
+ * @param <OUT>
+ *            Output type.
+ */
+public abstract class RichCoReduceFunction<IN1, IN2, OUT> extends AbstractRichFunction implements
+		CoReduceFunction<IN1, IN2, OUT> {
+
+	private static final long serialVersionUID = 1L;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/RichCoWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/RichCoWindowFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/RichCoWindowFunction.java
new file mode 100644
index 0000000..e317065
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/RichCoWindowFunction.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.co;
+
+import java.util.List;
+
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.util.Collector;
+
+public abstract class RichCoWindowFunction<IN1, IN2, O> extends AbstractRichFunction implements
+		CoWindowFunction<IN1, IN2, O> {
+
+	private static final long serialVersionUID = 1L;
+
+	@Override
+	public abstract void coWindow(List<IN1> first, List<IN2> second, Collector<O> out)
+			throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/FileSinkFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/FileSinkFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/FileSinkFunction.java
new file mode 100644
index 0000000..031029f
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/FileSinkFunction.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import org.apache.flink.api.common.io.CleanupWhenUnsuccessful;
+import org.apache.flink.api.common.io.OutputFormat;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Simple implementation of the SinkFunction writing tuples in the specified
+ * OutputFormat format. Tuples are collected to a list and written to the file
+ * periodically. The target path and the overwrite mode are pre-packaged in
+ * format.
+ * 
+ * @param <IN>
+ *            Input type
+ */
+public abstract class FileSinkFunction<IN> extends RichSinkFunction<IN> {
+	private static final long serialVersionUID = 1L;
+	private static final Logger LOG = LoggerFactory.getLogger(FileSinkFunction.class);
+	protected ArrayList<IN> tupleList = new ArrayList<IN>();
+	protected volatile OutputFormat<IN> format;
+	protected volatile boolean cleanupCalled = false;
+	protected int indexInSubtaskGroup;
+	protected int currentNumberOfSubtasks;
+
+	public FileSinkFunction(OutputFormat<IN> format) {
+		this.format = format;
+	}
+
+	@Override
+	public void open(Configuration parameters) throws Exception {
+		StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext();
+		format.configure(context.getTaskStubParameters());
+		indexInSubtaskGroup = context.getIndexOfThisSubtask();
+		currentNumberOfSubtasks = context.getNumberOfParallelSubtasks();
+		format.open(indexInSubtaskGroup, currentNumberOfSubtasks);
+	}
+
+	@Override
+	public void invoke(IN record) throws Exception {
+		tupleList.add(record);
+		if (updateCondition()) {
+			flush();
+		}
+	}
+
+	@Override
+	public void close() throws IOException {
+		if (!tupleList.isEmpty()) {
+			flush();
+		}
+		try {
+			format.close();
+		} catch (Exception ex) {
+			try {
+				if (!cleanupCalled && format instanceof CleanupWhenUnsuccessful) {
+					cleanupCalled = true;
+					((CleanupWhenUnsuccessful) format).tryCleanupOnError();
+				}
+			} catch (Throwable t) {
+				LOG.error("Cleanup on error failed.", t);
+			}
+		}
+	}
+
+	protected void flush() {
+		try {
+			for (IN rec : tupleList) {
+				format.writeRecord(rec);
+			}
+		} catch (Exception ex) {
+			try {
+				if (!cleanupCalled && format instanceof CleanupWhenUnsuccessful) {
+					cleanupCalled = true;
+					((CleanupWhenUnsuccessful) format).tryCleanupOnError();
+				}
+			} catch (Throwable t) {
+				LOG.error("Cleanup on error failed.", t);
+			}
+		}
+		resetParameters();
+	}
+
+	/**
+	 * Condition for writing the contents of tupleList and clearing it.
+	 * 
+	 * @return value of the updating condition
+	 */
+	protected abstract boolean updateCondition();
+
+	/**
+	 * Statements to be executed after writing a batch goes here.
+	 */
+	protected abstract void resetParameters();
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/FileSinkFunctionByMillis.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/FileSinkFunctionByMillis.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/FileSinkFunctionByMillis.java
new file mode 100644
index 0000000..86bbb53
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/FileSinkFunctionByMillis.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink;
+
+import org.apache.flink.api.common.io.OutputFormat;
+
+/**
+ * Implementation of FileSinkFunction. Writes tuples to file in every millis
+ * milliseconds.
+ * 
+ * @param <IN>
+ *            Input type
+ */
+public class FileSinkFunctionByMillis<IN> extends FileSinkFunction<IN> {
+	private static final long serialVersionUID = 1L;
+
+	private final long millis;
+	private long lastTime;
+
+	public FileSinkFunctionByMillis(OutputFormat<IN> format, long millis) {
+		super(format);
+		this.millis = millis;
+		lastTime = System.currentTimeMillis();
+	}
+
+	/**
+	 * Condition for writing the contents of tupleList and clearing it.
+	 * 
+	 * @return value of the updating condition
+	 */
+	@Override
+	protected boolean updateCondition() {
+		return System.currentTimeMillis() - lastTime >= millis;
+	}
+
+	/**
+	 * Statements to be executed after writing a batch goes here.
+	 */
+	@Override
+	protected void resetParameters() {
+		tupleList.clear();
+		lastTime = System.currentTimeMillis();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/PrintSinkFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/PrintSinkFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/PrintSinkFunction.java
new file mode 100644
index 0000000..5a9c7a8
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/PrintSinkFunction.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink;
+
+import java.io.PrintStream;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
+
+/**
+ * Implementation of the SinkFunction writing every tuple to the standard
+ * output or standard error stream.
+ * 
+ * @param <IN>
+ *            Input record type
+ */
+public class PrintSinkFunction<IN> extends RichSinkFunction<IN> {
+	private static final long serialVersionUID = 1L;
+
+	private static final boolean STD_OUT = false;
+	private static final boolean STD_ERR = true;
+	
+	private boolean target; 
+	private transient PrintStream stream;
+	private transient String prefix;
+	
+	/**
+	 * Instantiates a print sink function that prints to standard out.
+	 */
+	public PrintSinkFunction() {}
+	
+	/**
+	 * Instantiates a print sink function that prints to standard out.
+	 * 
+	 * @param stdErr True, if the format should print to standard error instead of standard out.
+	 */
+	public PrintSinkFunction(boolean stdErr) {
+		target = stdErr;
+	}
+
+	public void setTargetToStandardOut() {
+		target = STD_OUT;
+	}
+	
+	public void setTargetToStandardErr() {
+		target = STD_ERR;
+	}
+	
+	@Override
+	public void open(Configuration parameters) throws Exception {
+		super.open(parameters);
+		StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext();
+		// get the target stream
+		stream = target == STD_OUT ? System.out : System.err;
+		
+		// set the prefix if we have a >1 parallelism
+		prefix = (context.getNumberOfParallelSubtasks() > 1) ? 
+				((context.getIndexOfThisSubtask() + 1) + "> ") : null;
+	}
+
+	@Override
+	public void invoke(IN record) {
+		if (prefix != null) {
+			stream.println(prefix + record.toString());
+		}
+		else {
+			stream.println(record.toString());
+		}
+	}
+	
+	@Override
+	public void close() {
+		this.stream = null;
+		this.prefix = null;
+	}
+	
+	@Override
+	public String toString() {
+		return "Print to " + (target == STD_OUT ? "System.out" : "System.err");
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/RichSinkFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/RichSinkFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/RichSinkFunction.java
new file mode 100644
index 0000000..7853758
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/RichSinkFunction.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink;
+
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+
+public abstract class RichSinkFunction<IN> extends AbstractRichFunction implements SinkFunction<IN> {
+
+	private static final long serialVersionUID = 1L;
+
+	public abstract void invoke(IN value) throws Exception;
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SinkFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SinkFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SinkFunction.java
new file mode 100644
index 0000000..9c7ceeb
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SinkFunction.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink;
+
+import java.io.Serializable;
+
+import org.apache.flink.api.common.functions.Function;
+
+/**
+ * Interface for implementing user defined sink functionality.
+ *
+ * @param <IN> Input type parameter.
+ */
+public interface SinkFunction<IN> extends Function, Serializable {
+
+	/**
+	 * Function for standard sink behaviour. This function is called for every record.
+	 *
+	 * @param value The input record.
+	 * @throws Exception
+	 */
+	public void invoke(IN value) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
new file mode 100644
index 0000000..cd6c21c
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *	http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.Socket;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Socket client that acts as a streaming sink. The data is sent to a Socket as a byte array.
+ *
+ * @param <IN> data to be written into the Socket.
+ */
+public class SocketClientSink<IN> extends RichSinkFunction<IN> {
+	private static final long serialVersionUID = 1L;
+
+	private static final Logger LOG = LoggerFactory.getLogger(SocketClientSink.class);
+
+	private final String hostName;
+	private final int port;
+	private final SerializationSchema<IN, byte[]> schema;
+	private transient Socket client;
+	private transient DataOutputStream dataOutputStream;
+
+	/**
+	 * Default constructor.
+	 *
+	 * @param hostName Host of the Socket server.
+	 * @param port Port of the Socket.
+	 * @param schema Schema of the data.
+	 */
+	public SocketClientSink(String hostName, int port, SerializationSchema<IN, byte[]> schema) {
+		this.hostName = hostName;
+		this.port = port;
+		this.schema = schema;
+	}
+
+	/**
+	 * Initializes the connection to Socket.
+	 */
+	public void intializeConnection() {
+		OutputStream outputStream;
+		try {
+			client = new Socket(hostName, port);
+			outputStream = client.getOutputStream();
+		} catch (IOException e) {
+			throw new RuntimeException(e);
+		}
+		dataOutputStream = new DataOutputStream(outputStream);
+	}
+
+	/**
+	 * Called when new data arrives to the sink, and forwards it to Socket.
+	 *
+	 * @param value
+	 *			The incoming data
+	 */
+	@Override
+	public void invoke(IN value) {
+		byte[] msg = schema.serialize(value);
+		try {
+			dataOutputStream.write(msg);
+		} catch (IOException e) {
+			if(LOG.isErrorEnabled()){
+				LOG.error("Cannot send message to socket server at " + hostName + ":" + port, e);
+			}
+		}
+	}
+
+	/**
+	 * Closes the connection of the Socket client.
+	 */
+	private void closeConnection(){
+		try {
+			dataOutputStream.flush();
+			client.close();
+		} catch (IOException e) {
+			throw new RuntimeException("Error while closing connection with socket server at "
+					+ hostName + ":" + port, e);
+		} finally {
+			if (client != null) {
+				try {
+					client.close();
+				} catch (IOException e) {
+					LOG.error("Cannot close connection with socket server at "
+							+ hostName + ":" + port, e);
+				}
+			}
+		}
+	}
+
+	/**
+	 * Initialize the connection with the Socket in the server.
+	 * @param parameters Configuration.
+	 */
+	@Override
+	public void open(Configuration parameters) {
+		intializeConnection();
+	}
+
+	/**
+	 * Closes the connection with the Socket server.
+	 */
+	@Override
+	public void close() {
+		closeConnection();
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteFormat.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteFormat.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteFormat.java
new file mode 100644
index 0000000..019d35f
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteFormat.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+
+/**
+ * Abstract class for formatting the output of the writeAsText and writeAsCsv
+ * functions.
+ *
+ * @param <IN>
+ *            Input tuple type
+ */
+public abstract class WriteFormat<IN> implements Serializable {
+	private static final long serialVersionUID = 1L;
+
+	/**
+	 * Writes the contents of tupleList to the file specified by path.
+	 * 
+	 * @param path
+	 *            is the path to the location where the tuples are written
+	 * @param tupleList
+	 *            is the list of tuples to be written
+	 */
+	protected abstract void write(String path, ArrayList<IN> tupleList);
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteFormatAsCsv.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteFormatAsCsv.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteFormatAsCsv.java
new file mode 100644
index 0000000..bfae653
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteFormatAsCsv.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink;
+
+import java.io.BufferedWriter;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+
+/**
+ * Writes tuples in csv format.
+ * 
+ * @param <IN>
+ *            Input tuple type
+ */
+public class WriteFormatAsCsv<IN> extends WriteFormat<IN> {
+	private static final long serialVersionUID = 1L;
+
+	@Override
+	protected void write(String path, ArrayList<IN> tupleList) {
+		try {
+			PrintWriter outStream = new PrintWriter(new BufferedWriter(new FileWriter(path, true)));
+			for (IN tupleToWrite : tupleList) {
+				String strTuple = tupleToWrite.toString();
+				outStream.println(strTuple.substring(1, strTuple.length() - 1));
+			}
+			outStream.close();
+		} catch (IOException e) {
+			throw new RuntimeException("Exception occured while writing file " + path, e);
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteFormatAsText.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteFormatAsText.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteFormatAsText.java
new file mode 100644
index 0000000..03fcb5c
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteFormatAsText.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink;
+
+import java.io.BufferedWriter;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+
+/**
+ * Writes tuples in text format.
+ *
+ * @param <IN>
+ *            Input tuple type
+ */
+public class WriteFormatAsText<IN> extends WriteFormat<IN> {
+	private static final long serialVersionUID = 1L;
+
+	@Override
+	public void write(String path, ArrayList<IN> tupleList) {
+		try {
+			PrintWriter outStream = new PrintWriter(new BufferedWriter(new FileWriter(path, true)));
+			for (IN tupleToWrite : tupleList) {
+				outStream.println(tupleToWrite);
+			}
+			outStream.close();
+		} catch (IOException e) {
+			throw new RuntimeException("Exception occured while writing file " + path, e);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteSinkFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteSinkFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteSinkFunction.java
new file mode 100644
index 0000000..b25806d
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteSinkFunction.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink;
+
+import java.io.FileNotFoundException;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+
+/**
+ * Simple implementation of the SinkFunction writing tuples as simple text to
+ * the file specified by path. Tuples are collected to a list and written to the
+ * file periodically. The file specified by path is created if it does not
+ * exist, cleared if it exists before the writing.
+ * 
+ * @param <IN>
+ *            Input tuple type
+ */
+public abstract class WriteSinkFunction<IN> implements SinkFunction<IN> {
+	private static final long serialVersionUID = 1L;
+
+	protected final String path;
+	protected ArrayList<IN> tupleList = new ArrayList<IN>();
+	protected WriteFormat<IN> format;
+
+	public WriteSinkFunction(String path, WriteFormat<IN> format) {
+		this.path = path;
+		this.format = format;
+		cleanFile(path);
+	}
+
+	/**
+	 * Creates target file if it does not exist, cleans it if it exists.
+	 * 
+	 * @param path
+	 *            is the path to the location where the tuples are written
+	 */
+	protected void cleanFile(String path) {
+		try {
+			PrintWriter writer;
+			writer = new PrintWriter(path);
+			writer.print("");
+			writer.close();
+		} catch (FileNotFoundException e) {
+			throw new RuntimeException("File not found " + path, e);
+		}
+	}
+
+	/**
+	 * Condition for writing the contents of tupleList and clearing it.
+	 * 
+	 * @return value of the updating condition
+	 */
+	protected abstract boolean updateCondition();
+
+	/**
+	 * Statements to be executed after writing a batch goes here.
+	 */
+	protected abstract void resetParameters();
+
+	/**
+	 * Implementation of the invoke method of the SinkFunction class. Collects
+	 * the incoming tuples in tupleList and appends the list to the end of the
+	 * target file if updateCondition() is true or the current tuple is the
+	 * endTuple.
+	 */
+	@Override
+	public void invoke(IN tuple) {
+
+		tupleList.add(tuple);
+		if (updateCondition()) {
+			format.write(path, tupleList);
+			resetParameters();
+		}
+
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteSinkFunctionByMillis.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteSinkFunctionByMillis.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteSinkFunctionByMillis.java
new file mode 100644
index 0000000..0364174
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteSinkFunctionByMillis.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink;
+
+/**
+ * Implementation of WriteSinkFunction. Writes tuples to file in every millis
+ * milliseconds.
+ * 
+ * @param <IN>
+ *            Input tuple type
+ */
+public class WriteSinkFunctionByMillis<IN> extends WriteSinkFunction<IN> {
+	private static final long serialVersionUID = 1L;
+
+	private final long millis;
+	private long lastTime;
+
+	public WriteSinkFunctionByMillis(String path, WriteFormat<IN> format, long millis) {
+		super(path, format);
+		this.millis = millis;
+		lastTime = System.currentTimeMillis();
+	}
+
+	@Override
+	protected boolean updateCondition() {
+		return System.currentTimeMillis() - lastTime >= millis;
+	}
+
+	@Override
+	protected void resetParameters() {
+		tupleList.clear();
+		lastTime = System.currentTimeMillis();
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunction.java
new file mode 100644
index 0000000..68ff532
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunction.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.source;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FileMonitoringFunction implements SourceFunction<Tuple3<String, Long, Long>> {
+	private static final long serialVersionUID = 1L;
+
+	private static final Logger LOG = LoggerFactory.getLogger(FileMonitoringFunction.class);
+
+	public enum WatchType {
+		ONLY_NEW_FILES, // Only new files will be processed.
+		REPROCESS_WITH_APPENDED, // When some files are appended, all contents
+									// of the files will be processed.
+		PROCESS_ONLY_APPENDED // When some files are appended, only appended
+								// contents will be processed.
+	}
+
+	private String path;
+	private long interval;
+	private WatchType watchType;
+
+	private FileSystem fileSystem;
+	private Map<String, Long> offsetOfFiles;
+	private Map<String, Long> modificationTimes;
+
+	private volatile boolean isRunning = false;
+
+	public FileMonitoringFunction(String path, long interval, WatchType watchType) {
+		this.path = path;
+		this.interval = interval;
+		this.watchType = watchType;
+		this.modificationTimes = new HashMap<String, Long>();
+		this.offsetOfFiles = new HashMap<String, Long>();
+	}
+
+	@Override
+	public void run(Collector<Tuple3<String, Long, Long>> collector) throws Exception {
+		isRunning = true;
+		fileSystem = FileSystem.get(new URI(path));
+
+		while (isRunning) {
+			List<String> files = listNewFiles();
+			for (String filePath : files) {
+				if (watchType == WatchType.ONLY_NEW_FILES
+						|| watchType == WatchType.REPROCESS_WITH_APPENDED) {
+					collector.collect(new Tuple3<String, Long, Long>(filePath, 0L, -1L));
+					offsetOfFiles.put(filePath, -1L);
+				} else if (watchType == WatchType.PROCESS_ONLY_APPENDED) {
+					long offset = 0;
+					long fileSize = fileSystem.getFileStatus(new Path(filePath)).getLen();
+					if (offsetOfFiles.containsKey(filePath)) {
+						offset = offsetOfFiles.get(filePath);
+					}
+
+					collector.collect(new Tuple3<String, Long, Long>(filePath, offset, fileSize));
+					offsetOfFiles.put(filePath, fileSize);
+
+					LOG.info("File processed: {}, {}, {}", filePath, offset, fileSize);
+				}
+			}
+
+			Thread.sleep(interval);
+		}
+	}
+
+	private List<String> listNewFiles() throws IOException {
+		List<String> files = new ArrayList<String>();
+
+		FileStatus[] statuses = fileSystem.listStatus(new Path(path));
+
+		for (FileStatus status : statuses) {
+			Path filePath = status.getPath();
+			String fileName = filePath.getName();
+			long modificationTime = status.getModificationTime();
+
+			if (!isFiltered(fileName, modificationTime)) {
+				files.add(filePath.toString());
+				modificationTimes.put(fileName, modificationTime);
+			}
+		}
+		return files;
+	}
+
+	private boolean isFiltered(String fileName, long modificationTime) {
+
+		if ((watchType == WatchType.ONLY_NEW_FILES && modificationTimes.containsKey(fileName))
+				|| fileName.startsWith(".") || fileName.contains("_COPYING_")) {
+			return true;
+		} else {
+			Long lastModification = modificationTimes.get(fileName);
+			if (lastModification == null) {
+				return false;
+			} else {
+				return lastModification >= modificationTime;
+			}
+		}
+	}
+
+	@Override
+	public void cancel() {
+		isRunning = false;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileReadFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileReadFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileReadFunction.java
new file mode 100644
index 0000000..945f953
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileReadFunction.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.Collector;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.net.URI;
+
+public class FileReadFunction implements FlatMapFunction<Tuple3<String, Long, Long>, String> {
+
+	private static final long serialVersionUID = 1L;
+
+	@Override
+	public void flatMap(Tuple3<String, Long, Long> value, Collector<String> out) throws Exception {
+		FSDataInputStream stream = FileSystem.get(new URI(value.f0)).open(new Path(value.f0));
+		stream.seek(value.f1);
+
+		BufferedReader reader = new BufferedReader(new InputStreamReader(stream));
+		String line;
+
+		try {
+			while ((line = reader.readLine()) != null && (value.f2 == -1L || stream.getPos() <= value.f2)) {
+				out.collect(line);
+			}
+		} finally {
+			reader.close();
+		}
+	}
+}