You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2015/04/15 11:38:56 UTC
[15/19] flink git commit: [streaming] Major internal renaming and
restructure
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/aggregation/ComparableAggregator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/aggregation/ComparableAggregator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/aggregation/ComparableAggregator.java
new file mode 100644
index 0000000..9860916
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/aggregation/ComparableAggregator.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.aggregation;
+
+import java.lang.reflect.Array;
+import java.lang.reflect.Field;
+import java.util.List;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.common.typeutils.CompositeType.FlatFieldDescriptor;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+import org.apache.flink.api.java.typeutils.runtime.PojoComparator;
+
+public abstract class ComparableAggregator<T> extends AggregationFunction<T> {
+
+ private static final long serialVersionUID = 1L;
+
+ public Comparator comparator;
+ public boolean byAggregate;
+ public boolean first;
+
+ public ComparableAggregator(int pos, AggregationType aggregationType, boolean first) {
+ super(pos);
+ this.comparator = Comparator.getForAggregation(aggregationType);
+ this.byAggregate = (aggregationType == AggregationType.MAXBY)
+ || (aggregationType == AggregationType.MINBY);
+ this.first = first;
+ }
+
+ public static <R> AggregationFunction<R> getAggregator(int positionToAggregate,
+ TypeInformation<R> typeInfo, AggregationType aggregationType) {
+ return getAggregator(positionToAggregate, typeInfo, aggregationType, false);
+ }
+
+ public static <R> AggregationFunction<R> getAggregator(int positionToAggregate,
+ TypeInformation<R> typeInfo, AggregationType aggregationType, boolean first) {
+
+ if (typeInfo.isTupleType()) {
+ return new TupleComparableAggregator<R>(positionToAggregate, aggregationType, first);
+ } else if (typeInfo instanceof BasicArrayTypeInfo
+ || typeInfo instanceof PrimitiveArrayTypeInfo) {
+ return new ArrayComparableAggregator<R>(positionToAggregate, aggregationType, first);
+ } else {
+ return new SimpleComparableAggregator<R>(aggregationType);
+ }
+ }
+
+ public static <R> AggregationFunction<R> getAggregator(String field,
+ TypeInformation<R> typeInfo, AggregationType aggregationType, boolean first,
+ ExecutionConfig config) {
+
+ return new PojoComparableAggregator<R>(field, typeInfo, aggregationType, first, config);
+ }
+
+ private static class TupleComparableAggregator<T> extends ComparableAggregator<T> {
+
+ private static final long serialVersionUID = 1L;
+
+ public TupleComparableAggregator(int pos, AggregationType aggregationType, boolean first) {
+ super(pos, aggregationType, first);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public T reduce(T value1, T value2) throws Exception {
+ Tuple tuple1 = (Tuple) value1;
+ Tuple tuple2 = (Tuple) value2;
+
+ Comparable<Object> o1 = tuple1.getField(position);
+ Object o2 = tuple2.getField(position);
+
+ int c = comparator.isExtremal(o1, o2);
+
+ if (byAggregate) {
+ if (c == 1) {
+ return (T) tuple1;
+ }
+ if (first) {
+ if (c == 0) {
+ return (T) tuple1;
+ }
+ }
+
+ return (T) tuple2;
+
+ } else {
+ if (c == 1) {
+ tuple2.setField(o1, position);
+ }
+ return (T) tuple2;
+ }
+
+ }
+ }
+
+ private static class ArrayComparableAggregator<T> extends ComparableAggregator<T> {
+
+ private static final long serialVersionUID = 1L;
+
+ public ArrayComparableAggregator(int pos, AggregationType aggregationType, boolean first) {
+ super(pos, aggregationType, first);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public T reduce(T array1, T array2) throws Exception {
+
+ Object v1 = Array.get(array1, position);
+ Object v2 = Array.get(array2, position);
+
+ int c = comparator.isExtremal((Comparable<Object>) v1, v2);
+
+ if (byAggregate) {
+ if (c == 1) {
+ return array1;
+ }
+ if (first) {
+ if (c == 0) {
+ return array1;
+ }
+ }
+
+ return array2;
+ } else {
+ if (c == 1) {
+ Array.set(array2, position, v1);
+ }
+
+ return array2;
+ }
+ }
+
+ }
+
+ private static class SimpleComparableAggregator<T> extends ComparableAggregator<T> {
+
+ private static final long serialVersionUID = 1L;
+
+ public SimpleComparableAggregator(AggregationType aggregationType) {
+ super(0, aggregationType, false);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public T reduce(T value1, T value2) throws Exception {
+
+ if (comparator.isExtremal((Comparable<Object>) value1, value2) == 1) {
+ return value1;
+ } else {
+ return value2;
+ }
+ }
+
+ }
+
+ private static class PojoComparableAggregator<T> extends ComparableAggregator<T> {
+
+ private static final long serialVersionUID = 1L;
+ PojoComparator<T> pojoComparator;
+
+ public PojoComparableAggregator(String field, TypeInformation<?> typeInfo,
+ AggregationType aggregationType, boolean first, ExecutionConfig config) {
+ super(0, aggregationType, first);
+ if (!(typeInfo instanceof CompositeType<?>)) {
+ throw new IllegalArgumentException(
+ "Key expressions are only supported on POJO types and Tuples. "
+ + "A type is considered a POJO if all its fields are public, or have both getters and setters defined");
+ }
+
+ @SuppressWarnings("unchecked")
+ CompositeType<T> cType = (CompositeType<T>) typeInfo;
+
+ List<FlatFieldDescriptor> fieldDescriptors = cType.getFlatFields(field);
+ int logicalKeyPosition = fieldDescriptors.get(0).getPosition();
+
+ if (cType instanceof PojoTypeInfo) {
+ pojoComparator = (PojoComparator<T>) cType.createComparator(
+ new int[] { logicalKeyPosition }, new boolean[] { false }, 0, config);
+ } else {
+ throw new IllegalArgumentException(
+ "Key expressions are only supported on POJO types. "
+ + "A type is considered a POJO if all its fields are public, or have both getters and setters defined");
+ }
+ }
+
+ @Override
+ public T reduce(T value1, T value2) throws Exception {
+
+ Field[] keyFields = pojoComparator.getKeyFields();
+ Object field1 = pojoComparator.accessField(keyFields[0], value1);
+ Object field2 = pojoComparator.accessField(keyFields[0], value2);
+
+ @SuppressWarnings("unchecked")
+ int c = comparator.isExtremal((Comparable<Object>) field1, field2);
+
+ if (byAggregate) {
+ if (c == 1) {
+ return value1;
+ }
+ if (first) {
+ if (c == 0) {
+ return value1;
+ }
+ }
+
+ return value2;
+ } else {
+ if (c == 1) {
+ keyFields[0].set(value2, field1);
+ }
+
+ return value2;
+ }
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/aggregation/Comparator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/aggregation/Comparator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/aggregation/Comparator.java
new file mode 100644
index 0000000..f85a2e1
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/aggregation/Comparator.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.aggregation;
+
+import java.io.Serializable;
+
+import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction.AggregationType;
+
+public abstract class Comparator implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ public abstract <R> int isExtremal(Comparable<R> o1, R o2);
+
+ public static Comparator getForAggregation(AggregationType type) {
+ switch (type) {
+ case MAX:
+ return new MaxComparator();
+ case MIN:
+ return new MinComparator();
+ case MINBY:
+ return new MinByComparator();
+ case MAXBY:
+ return new MaxByComparator();
+ default:
+ throw new IllegalArgumentException("Unsupported aggregation type.");
+ }
+ }
+
+ private static class MaxComparator extends Comparator {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public <R> int isExtremal(Comparable<R> o1, R o2) {
+ return o1.compareTo(o2) > 0 ? 1 : 0;
+ }
+
+ }
+
+ private static class MaxByComparator extends Comparator {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public <R> int isExtremal(Comparable<R> o1, R o2) {
+ int c = o1.compareTo(o2);
+ if (c > 0) {
+ return 1;
+ }
+ if (c == 0) {
+ return 0;
+ } else {
+ return -1;
+ }
+ }
+
+ }
+
+ private static class MinByComparator extends Comparator {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public <R> int isExtremal(Comparable<R> o1, R o2) {
+ int c = o1.compareTo(o2);
+ if (c < 0) {
+ return 1;
+ }
+ if (c == 0) {
+ return 0;
+ } else {
+ return -1;
+ }
+ }
+
+ }
+
+ private static class MinComparator extends Comparator {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public <R> int isExtremal(Comparable<R> o1, R o2) {
+ return o1.compareTo(o2) < 0 ? 1 : 0;
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/aggregation/SumAggregator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/aggregation/SumAggregator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/aggregation/SumAggregator.java
new file mode 100644
index 0000000..61337c9
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/aggregation/SumAggregator.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.aggregation;
+
+import java.lang.reflect.Array;
+import java.lang.reflect.Field;
+import java.util.List;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.common.typeutils.CompositeType.FlatFieldDescriptor;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+import org.apache.flink.api.java.typeutils.runtime.PojoComparator;
+
+public abstract class SumAggregator {
+
+ public static <T> ReduceFunction<T> getSumFunction(int pos, Class<?> clazz,
+ TypeInformation<T> typeInfo) {
+
+ if (typeInfo.isTupleType()) {
+ return new TupleSumAggregator<T>(pos, SumFunction.getForClass(clazz));
+ } else if (typeInfo instanceof BasicArrayTypeInfo
+ || typeInfo instanceof PrimitiveArrayTypeInfo) {
+ return new ArraySumAggregator<T>(pos, SumFunction.getForClass(clazz));
+ } else {
+ return new SimpleSumAggregator<T>(SumFunction.getForClass(clazz));
+ }
+
+ }
+
+ public static <T> ReduceFunction<T> getSumFunction(String field, TypeInformation<T> typeInfo,
+ ExecutionConfig config) {
+
+ return new PojoSumAggregator<T>(field, typeInfo, config);
+ }
+
+ private static class TupleSumAggregator<T> extends AggregationFunction<T> {
+
+ private static final long serialVersionUID = 1L;
+
+ SumFunction adder;
+
+ public TupleSumAggregator(int pos, SumFunction adder) {
+ super(pos);
+ this.adder = adder;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public T reduce(T value1, T value2) throws Exception {
+
+ Tuple tuple1 = (Tuple) value1;
+ Tuple tuple2 = (Tuple) value2;
+
+ tuple2.setField(adder.add(tuple1.getField(position), tuple2.getField(position)),
+ position);
+
+ return (T) tuple2;
+ }
+
+ }
+
+ private static class ArraySumAggregator<T> extends AggregationFunction<T> {
+
+ private static final long serialVersionUID = 1L;
+
+ SumFunction adder;
+
+ public ArraySumAggregator(int pos, SumFunction adder) {
+ super(pos);
+ this.adder = adder;
+ }
+
+ @Override
+ public T reduce(T value1, T value2) throws Exception {
+
+ Object v1 = Array.get(value1, position);
+ Object v2 = Array.get(value2, position);
+ Array.set(value2, position, adder.add(v1, v2));
+ return value2;
+ }
+
+ }
+
+ private static class SimpleSumAggregator<T> extends AggregationFunction<T> {
+
+ private static final long serialVersionUID = 1L;
+
+ SumFunction adder;
+
+ public SimpleSumAggregator(SumFunction adder) {
+ super(0);
+ this.adder = adder;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public T reduce(T value1, T value2) throws Exception {
+
+ return (T) adder.add(value1, value2);
+ }
+
+ }
+
+ private static class PojoSumAggregator<T> extends AggregationFunction<T> {
+
+ private static final long serialVersionUID = 1L;
+ SumFunction adder;
+ PojoComparator<T> comparator;
+
+ public PojoSumAggregator(String field, TypeInformation<?> type, ExecutionConfig config) {
+ super(0);
+ if (!(type instanceof CompositeType<?>)) {
+ throw new IllegalArgumentException(
+ "Key expressions are only supported on POJO types and Tuples. "
+ + "A type is considered a POJO if all its fields are public, or have both getters and setters defined");
+ }
+
+ @SuppressWarnings("unchecked")
+ CompositeType<T> cType = (CompositeType<T>) type;
+
+ List<FlatFieldDescriptor> fieldDescriptors = cType.getFlatFields(field);
+
+ int logicalKeyPosition = fieldDescriptors.get(0).getPosition();
+ Class<?> keyClass = fieldDescriptors.get(0).getType().getTypeClass();
+
+ adder = SumFunction.getForClass(keyClass);
+
+ if (cType instanceof PojoTypeInfo) {
+ comparator = (PojoComparator<T>) cType.createComparator(
+ new int[] { logicalKeyPosition }, new boolean[] { false }, 0, config);
+ } else {
+ throw new IllegalArgumentException(
+ "Key expressions are only supported on POJO types. "
+ + "A type is considered a POJO if all its fields are public, or have both getters and setters defined");
+ }
+ }
+
+ @Override
+ public T reduce(T value1, T value2) throws Exception {
+
+ Field[] keyFields = comparator.getKeyFields();
+ Object field1 = comparator.accessField(keyFields[0], value1);
+ Object field2 = comparator.accessField(keyFields[0], value2);
+
+ keyFields[0].set(value2, adder.add(field1, field2));
+
+ return value2;
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/aggregation/SumFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/aggregation/SumFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/aggregation/SumFunction.java
new file mode 100644
index 0000000..e5fa4c3
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/aggregation/SumFunction.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.aggregation;
+
+import java.io.Serializable;
+
+public abstract class SumFunction implements Serializable{
+
+ private static final long serialVersionUID = 1L;
+
+ public abstract Object add(Object o1, Object o2);
+
+ public static SumFunction getForClass(Class<?> clazz) {
+
+ if (clazz == Integer.class) {
+ return new IntSum();
+ } else if (clazz == Long.class) {
+ return new LongSum();
+ } else if (clazz == Short.class) {
+ return new ShortSum();
+ } else if (clazz == Double.class) {
+ return new DoubleSum();
+ } else if (clazz == Float.class) {
+ return new FloatSum();
+ } else if (clazz == Byte.class) {
+ return new ByteSum();
+ } else {
+ throw new RuntimeException("DataStream cannot be summed because the class "
+ + clazz.getSimpleName() + " does not support the + operator.");
+ }
+ }
+
+ public static class IntSum extends SumFunction {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object add(Object value1, Object value2) {
+ return (Integer) value1 + (Integer) value2;
+ }
+ }
+
+ public static class LongSum extends SumFunction {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object add(Object value1, Object value2) {
+ return (Long) value1 + (Long) value2;
+ }
+ }
+
+ public static class DoubleSum extends SumFunction {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object add(Object value1, Object value2) {
+ return (Double) value1 + (Double) value2;
+ }
+ }
+
+ public static class ShortSum extends SumFunction {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object add(Object value1, Object value2) {
+ return (short) ((Short) value1 + (Short) value2);
+ }
+ }
+
+ public static class FloatSum extends SumFunction {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object add(Object value1, Object value2) {
+ return (Float) value1 + (Float) value2;
+ }
+ }
+
+ public static class ByteSum extends SumFunction {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object add(Object value1, Object value2) {
+ return (byte) ((Byte) value1 + (Byte) value2);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/CoFlatMapFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/CoFlatMapFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/CoFlatMapFunction.java
new file mode 100644
index 0000000..1137dda
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/CoFlatMapFunction.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.co;
+
+import java.io.Serializable;
+
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.util.Collector;
+
+/**
+ * A CoFlatMapFunction represents a FlatMap transformation with two different
+ * input types.
+ *
+ * @param <IN1>
+ * Type of the first input.
+ * @param <IN2>
+ * Type of the second input.
+ * @param <OUT>
+ * Output type.
+ */
+public interface CoFlatMapFunction<IN1, IN2, OUT> extends Function, Serializable {
+
+ public void flatMap1(IN1 value, Collector<OUT> out) throws Exception;
+
+ public void flatMap2(IN2 value, Collector<OUT> out) throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/CoMapFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/CoMapFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/CoMapFunction.java
new file mode 100644
index 0000000..67ab672
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/CoMapFunction.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.co;
+
+import java.io.Serializable;
+
+import org.apache.flink.api.common.functions.Function;
+
+/**
+ * A CoMapFunction represents a Map transformation with two different input
+ * types.
+ *
+ * @param <IN1>
+ * Type of the first input.
+ * @param <IN2>
+ * Type of the second input.
+ * @param <OUT>
+ * Output type.
+ */
+public interface CoMapFunction<IN1, IN2, OUT> extends Function, Serializable {
+
+ public OUT map1(IN1 value);
+
+ public OUT map2(IN2 value);
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/CoReduceFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/CoReduceFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/CoReduceFunction.java
new file mode 100644
index 0000000..4342dfd
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/CoReduceFunction.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.co;
+
+import java.io.Serializable;
+
+import org.apache.flink.api.common.functions.Function;
+
+/**
+ * The CoReduceFunction interface represents a Reduce transformation with two
+ * different input streams. The reduce1 function combine groups of elements of
+ * the first input with the same key to a single value, while reduce2 combine
+ * groups of elements of the second input with the same key to a single value.
+ * Each produced values are mapped to the same type by map1 and map2,
+ * respectively, to form one output stream.
+ *
+ * The basic syntax for using a grouped ReduceFunction is as follows:
+ *
+ * <pre>
+ * <blockquote>
+ * ConnectedDataStream<X> input = ...;
+ *
+ * ConnectedDataStream<X> result = input.groupBy(keyPosition1, keyPosition2)
+ * .reduce(new MyCoReduceFunction(), keyPosition1, keyPosition2).addSink(...);
+ * </blockquote>
+ * </pre>
+ * <p>
+ *
+ * @param <IN1>
+ * Type of the first input.
+ * @param <IN2>
+ * Type of the second input.
+ * @param <OUT>
+ * Output type.
+ */
+public interface CoReduceFunction<IN1, IN2, OUT> extends Function, Serializable {
+
+ /**
+ * The core method of CoReduceFunction, combining two values of the first
+ * input into one value of the same type. The reduce1 function is
+ * consecutively applied to all values of a group until only a single value
+ * remains.
+ *
+ * @param value1
+ * The first value to combine.
+ * @param value2
+ * The second value to combine.
+ * @return The combined value of both input values.
+ *
+ * @throws Exception
+ * This method may throw exceptions. Throwing an exception will
+ * cause the operation to fail and may trigger recovery.
+ */
+ public IN1 reduce1(IN1 value1, IN1 value2);
+
+ /**
+ * The core method of ReduceFunction, combining two values of the second
+ * input into one value of the same type. The reduce2 function is
+ * consecutively applied to all values of a group until only a single value
+ * remains.
+ *
+ * @param value1
+ * The first value to combine.
+ * @param value2
+ * The second value to combine.
+ * @return The combined value of both input values.
+ *
+ * @throws Exception
+ * This method may throw exceptions. Throwing an exception will
+ * cause the operation to fail and may trigger recovery.
+ */
+ public IN2 reduce2(IN2 value1, IN2 value2);
+
+ /**
+ * Maps the reduced first input to the output type.
+ *
+ * @param value
+ * Type of the first input.
+ * @return the output type.
+ */
+ public OUT map1(IN1 value);
+
+ /**
+ * Maps the reduced second input to the output type.
+ *
+ * @param value
+ * Type of the second input.
+ * @return the output type.
+ */
+ public OUT map2(IN2 value);
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/CoWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/CoWindowFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/CoWindowFunction.java
new file mode 100644
index 0000000..1e8d03c
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/CoWindowFunction.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.co;
+
+import java.io.Serializable;
+import java.util.List;
+
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.util.Collector;
+
+public interface CoWindowFunction<IN1, IN2, O> extends Function, Serializable {
+
+ public void coWindow(List<IN1> first, List<IN2> second, Collector<O> out) throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/CrossWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/CrossWindowFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/CrossWindowFunction.java
new file mode 100644
index 0000000..e9c0169
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/CrossWindowFunction.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.co;
+
+import java.util.List;
+
+import org.apache.flink.api.common.functions.CrossFunction;
+import org.apache.flink.util.Collector;
+
+public class CrossWindowFunction<IN1, IN2, OUT> implements CoWindowFunction<IN1, IN2, OUT> {
+ private static final long serialVersionUID = 1L;
+
+ private CrossFunction<IN1, IN2, OUT> crossFunction;
+
+ public CrossWindowFunction(CrossFunction<IN1, IN2, OUT> crossFunction) {
+ this.crossFunction = crossFunction;
+ }
+
+ @Override
+ public void coWindow(List<IN1> first, List<IN2> second, Collector<OUT> out) throws Exception {
+ for (IN1 firstValue : first) {
+ for (IN2 secondValue : second) {
+ out.collect(crossFunction.cross(firstValue, secondValue));
+ }
+ }
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/JoinWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/JoinWindowFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/JoinWindowFunction.java
new file mode 100644
index 0000000..6f658c6
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/JoinWindowFunction.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.co;
+
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.util.Collector;
+
+public class JoinWindowFunction<IN1, IN2, OUT> implements CoWindowFunction<IN1, IN2, OUT> {
+ private static final long serialVersionUID = 1L;
+
+ private KeySelector<IN1, ?> keySelector1;
+ private KeySelector<IN2, ?> keySelector2;
+ private JoinFunction<IN1, IN2, OUT> joinFunction;
+
+ public JoinWindowFunction(KeySelector<IN1, ?> keySelector1, KeySelector<IN2, ?> keySelector2,
+ JoinFunction<IN1, IN2, OUT> joinFunction) {
+ this.keySelector1 = keySelector1;
+ this.keySelector2 = keySelector2;
+ this.joinFunction = joinFunction;
+ }
+
+ @Override
+ public void coWindow(List<IN1> first, List<IN2> second, Collector<OUT> out) throws Exception {
+
+ Map<Object, List<IN1>> map = build(first);
+
+ for (IN2 record : second) {
+ Object key = keySelector2.getKey(record);
+ List<IN1> match = map.get(key);
+ if (match != null) {
+ for (IN1 matching : match) {
+ out.collect(joinFunction.join(matching, record));
+ }
+ }
+ }
+
+ }
+
+ private Map<Object, List<IN1>> build(List<IN1> records) throws Exception {
+
+ Map<Object, List<IN1>> map = new HashMap<Object, List<IN1>>();
+
+ for (IN1 record : records) {
+ Object key = keySelector1.getKey(record);
+ List<IN1> current = map.get(key);
+ if (current == null) {
+ current = new LinkedList<IN1>();
+ map.put(key, current);
+ }
+ current.add(record);
+ }
+
+ return map;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/RichCoFlatMapFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/RichCoFlatMapFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/RichCoFlatMapFunction.java
new file mode 100644
index 0000000..6746140
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/RichCoFlatMapFunction.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.co;
+
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.common.functions.RichFunction;
+
+/**
+ * A RichCoFlatMapFunction represents a FlatMap transformation with two different input
+ * types. In addition to that the user can use the features provided by the
+ * {@link RichFunction} interface.
+ *
+ * @param <IN1>
+ * Type of the first input.
+ * @param <IN2>
+ * Type of the second input.
+ * @param <OUT>
+ * Output type.
+ */
+public abstract class RichCoFlatMapFunction<IN1, IN2, OUT> extends AbstractRichFunction implements
+ CoFlatMapFunction<IN1, IN2, OUT> {
+
+ private static final long serialVersionUID = 1L;
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/RichCoMapFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/RichCoMapFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/RichCoMapFunction.java
new file mode 100644
index 0000000..e561408
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/RichCoMapFunction.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.co;
+
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.common.functions.RichFunction;
+
+/**
+ * A RichCoMapFunction represents a Map transformation with two different input
+ * types. In addition to that the user can use the features provided by the
+ * {@link RichFunction} interface.
+ *
+ * @param <IN1>
+ * Type of the first input.
+ * @param <IN2>
+ * Type of the second input.
+ * @param <OUT>
+ * Output type.
+ */
+public abstract class RichCoMapFunction<IN1, IN2, OUT> extends AbstractRichFunction implements
+ CoMapFunction<IN1, IN2, OUT> {
+
+ private static final long serialVersionUID = 1L;
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/RichCoReduceFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/RichCoReduceFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/RichCoReduceFunction.java
new file mode 100644
index 0000000..d3e6f3a
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/RichCoReduceFunction.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.co;
+
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.common.functions.RichFunction;
+
+/**
+ * A RichCoReduceFunction represents a Reduce transformation with two different input
+ * types. In addition to that the user can use the features provided by the
+ * {@link RichFunction} interface.
+ *
+ * @param <IN1>
+ * Type of the first input.
+ * @param <IN2>
+ * Type of the second input.
+ * @param <OUT>
+ * Output type.
+ */
+public abstract class RichCoReduceFunction<IN1, IN2, OUT> extends AbstractRichFunction implements
+ CoReduceFunction<IN1, IN2, OUT> {
+
+ private static final long serialVersionUID = 1L;
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/RichCoWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/RichCoWindowFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/RichCoWindowFunction.java
new file mode 100644
index 0000000..e317065
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/co/RichCoWindowFunction.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.co;
+
+import java.util.List;
+
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.util.Collector;
+
+public abstract class RichCoWindowFunction<IN1, IN2, O> extends AbstractRichFunction implements
+ CoWindowFunction<IN1, IN2, O> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public abstract void coWindow(List<IN1> first, List<IN2> second, Collector<O> out)
+ throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/FileSinkFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/FileSinkFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/FileSinkFunction.java
new file mode 100644
index 0000000..031029f
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/FileSinkFunction.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import org.apache.flink.api.common.io.CleanupWhenUnsuccessful;
+import org.apache.flink.api.common.io.OutputFormat;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Simple implementation of the SinkFunction writing tuples in the specified
+ * OutputFormat format. Tuples are collected to a list and written to the file
+ * periodically. The target path and the overwrite mode are pre-packaged in
+ * format.
+ *
+ * @param <IN>
+ * Input type
+ */
+public abstract class FileSinkFunction<IN> extends RichSinkFunction<IN> {
+ private static final long serialVersionUID = 1L;
+ private static final Logger LOG = LoggerFactory.getLogger(FileSinkFunction.class);
+ protected ArrayList<IN> tupleList = new ArrayList<IN>();
+ protected volatile OutputFormat<IN> format;
+ protected volatile boolean cleanupCalled = false;
+ protected int indexInSubtaskGroup;
+ protected int currentNumberOfSubtasks;
+
+ public FileSinkFunction(OutputFormat<IN> format) {
+ this.format = format;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext();
+ format.configure(context.getTaskStubParameters());
+ indexInSubtaskGroup = context.getIndexOfThisSubtask();
+ currentNumberOfSubtasks = context.getNumberOfParallelSubtasks();
+ format.open(indexInSubtaskGroup, currentNumberOfSubtasks);
+ }
+
+ @Override
+ public void invoke(IN record) throws Exception {
+ tupleList.add(record);
+ if (updateCondition()) {
+ flush();
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (!tupleList.isEmpty()) {
+ flush();
+ }
+ try {
+ format.close();
+ } catch (Exception ex) {
+ try {
+ if (!cleanupCalled && format instanceof CleanupWhenUnsuccessful) {
+ cleanupCalled = true;
+ ((CleanupWhenUnsuccessful) format).tryCleanupOnError();
+ }
+ } catch (Throwable t) {
+ LOG.error("Cleanup on error failed.", t);
+ }
+ }
+ }
+
+ protected void flush() {
+ try {
+ for (IN rec : tupleList) {
+ format.writeRecord(rec);
+ }
+ } catch (Exception ex) {
+ try {
+ if (!cleanupCalled && format instanceof CleanupWhenUnsuccessful) {
+ cleanupCalled = true;
+ ((CleanupWhenUnsuccessful) format).tryCleanupOnError();
+ }
+ } catch (Throwable t) {
+ LOG.error("Cleanup on error failed.", t);
+ }
+ }
+ resetParameters();
+ }
+
+ /**
+ * Condition for writing the contents of tupleList and clearing it.
+ *
+ * @return value of the updating condition
+ */
+ protected abstract boolean updateCondition();
+
+ /**
+ * Statements to be executed after writing a batch goes here.
+ */
+ protected abstract void resetParameters();
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/FileSinkFunctionByMillis.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/FileSinkFunctionByMillis.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/FileSinkFunctionByMillis.java
new file mode 100644
index 0000000..86bbb53
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/FileSinkFunctionByMillis.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink;
+
+import org.apache.flink.api.common.io.OutputFormat;
+
+/**
+ * Implementation of FileSinkFunction. Writes tuples to file in every millis
+ * milliseconds.
+ *
+ * @param <IN>
+ * Input type
+ */
+public class FileSinkFunctionByMillis<IN> extends FileSinkFunction<IN> {
+ private static final long serialVersionUID = 1L;
+
+ private final long millis;
+ private long lastTime;
+
+ public FileSinkFunctionByMillis(OutputFormat<IN> format, long millis) {
+ super(format);
+ this.millis = millis;
+ lastTime = System.currentTimeMillis();
+ }
+
+ /**
+ * Condition for writing the contents of tupleList and clearing it.
+ *
+ * @return value of the updating condition
+ */
+ @Override
+ protected boolean updateCondition() {
+ return System.currentTimeMillis() - lastTime >= millis;
+ }
+
+ /**
+ * Statements to be executed after writing a batch goes here.
+ */
+ @Override
+ protected void resetParameters() {
+ tupleList.clear();
+ lastTime = System.currentTimeMillis();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/PrintSinkFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/PrintSinkFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/PrintSinkFunction.java
new file mode 100644
index 0000000..5a9c7a8
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/PrintSinkFunction.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink;
+
+import java.io.PrintStream;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
+
+/**
+ * Implementation of the SinkFunction writing every tuple to the standard
+ * output or standard error stream.
+ *
+ * @param <IN>
+ * Input record type
+ */
+public class PrintSinkFunction<IN> extends RichSinkFunction<IN> {
+ private static final long serialVersionUID = 1L;
+
+ private static final boolean STD_OUT = false;
+ private static final boolean STD_ERR = true;
+
+ private boolean target;
+ private transient PrintStream stream;
+ private transient String prefix;
+
+ /**
+ * Instantiates a print sink function that prints to standard out.
+ */
+ public PrintSinkFunction() {}
+
+ /**
+ * Instantiates a print sink function that prints to standard out.
+ *
+ * @param stdErr True, if the format should print to standard error instead of standard out.
+ */
+ public PrintSinkFunction(boolean stdErr) {
+ target = stdErr;
+ }
+
+ public void setTargetToStandardOut() {
+ target = STD_OUT;
+ }
+
+ public void setTargetToStandardErr() {
+ target = STD_ERR;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext();
+ // get the target stream
+ stream = target == STD_OUT ? System.out : System.err;
+
+ // set the prefix if we have a >1 parallelism
+ prefix = (context.getNumberOfParallelSubtasks() > 1) ?
+ ((context.getIndexOfThisSubtask() + 1) + "> ") : null;
+ }
+
+ @Override
+ public void invoke(IN record) {
+ if (prefix != null) {
+ stream.println(prefix + record.toString());
+ }
+ else {
+ stream.println(record.toString());
+ }
+ }
+
+ @Override
+ public void close() {
+ this.stream = null;
+ this.prefix = null;
+ }
+
+ @Override
+ public String toString() {
+ return "Print to " + (target == STD_OUT ? "System.out" : "System.err");
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/RichSinkFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/RichSinkFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/RichSinkFunction.java
new file mode 100644
index 0000000..7853758
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/RichSinkFunction.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink;
+
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+
+public abstract class RichSinkFunction<IN> extends AbstractRichFunction implements SinkFunction<IN> {
+
+ private static final long serialVersionUID = 1L;
+
+ public abstract void invoke(IN value) throws Exception;
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SinkFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SinkFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SinkFunction.java
new file mode 100644
index 0000000..9c7ceeb
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SinkFunction.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink;
+
+import java.io.Serializable;
+
+import org.apache.flink.api.common.functions.Function;
+
+/**
+ * Interface for implementing user defined sink functionality.
+ *
+ * @param <IN> Input type parameter.
+ */
+public interface SinkFunction<IN> extends Function, Serializable {
+
+ /**
+ * Function for standard sink behaviour. This function is called for every record.
+ *
+ * @param value The input record.
+ * @throws Exception
+ */
+ public void invoke(IN value) throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
new file mode 100644
index 0000000..cd6c21c
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.Socket;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Socket client that acts as a streaming sink. The data is sent to a Socket as a byte array.
+ *
+ * @param <IN> data to be written into the Socket.
+ */
+public class SocketClientSink<IN> extends RichSinkFunction<IN> {
+ private static final long serialVersionUID = 1L;
+
+ private static final Logger LOG = LoggerFactory.getLogger(SocketClientSink.class);
+
+ private final String hostName;
+ private final int port;
+ private final SerializationSchema<IN, byte[]> schema;
+ private transient Socket client;
+ private transient DataOutputStream dataOutputStream;
+
+ /**
+ * Default constructor.
+ *
+ * @param hostName Host of the Socket server.
+ * @param port Port of the Socket.
+ * @param schema Schema of the data.
+ */
+ public SocketClientSink(String hostName, int port, SerializationSchema<IN, byte[]> schema) {
+ this.hostName = hostName;
+ this.port = port;
+ this.schema = schema;
+ }
+
+ /**
+ * Initializes the connection to Socket.
+ */
+ public void intializeConnection() {
+ OutputStream outputStream;
+ try {
+ client = new Socket(hostName, port);
+ outputStream = client.getOutputStream();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ dataOutputStream = new DataOutputStream(outputStream);
+ }
+
+ /**
+ * Called when new data arrives to the sink, and forwards it to Socket.
+ *
+ * @param value
+ * The incoming data
+ */
+ @Override
+ public void invoke(IN value) {
+ byte[] msg = schema.serialize(value);
+ try {
+ dataOutputStream.write(msg);
+ } catch (IOException e) {
+ if(LOG.isErrorEnabled()){
+ LOG.error("Cannot send message to socket server at " + hostName + ":" + port, e);
+ }
+ }
+ }
+
+ /**
+ * Closes the connection of the Socket client.
+ */
+ private void closeConnection(){
+ try {
+ dataOutputStream.flush();
+ client.close();
+ } catch (IOException e) {
+ throw new RuntimeException("Error while closing connection with socket server at "
+ + hostName + ":" + port, e);
+ } finally {
+ if (client != null) {
+ try {
+ client.close();
+ } catch (IOException e) {
+ LOG.error("Cannot close connection with socket server at "
+ + hostName + ":" + port, e);
+ }
+ }
+ }
+ }
+
+ /**
+ * Initialize the connection with the Socket in the server.
+ * @param parameters Configuration.
+ */
+ @Override
+ public void open(Configuration parameters) {
+ intializeConnection();
+ }
+
+ /**
+ * Closes the connection with the Socket server.
+ */
+ @Override
+ public void close() {
+ closeConnection();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteFormat.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteFormat.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteFormat.java
new file mode 100644
index 0000000..019d35f
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteFormat.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+
+/**
+ * Abstract class for formatting the output of the writeAsText and writeAsCsv
+ * functions.
+ *
+ * @param <IN>
+ * Input tuple type
+ */
+public abstract class WriteFormat<IN> implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * Writes the contents of tupleList to the file specified by path.
+ *
+ * @param path
+ * is the path to the location where the tuples are written
+ * @param tupleList
+ * is the list of tuples to be written
+ */
+ protected abstract void write(String path, ArrayList<IN> tupleList);
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteFormatAsCsv.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteFormatAsCsv.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteFormatAsCsv.java
new file mode 100644
index 0000000..bfae653
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteFormatAsCsv.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink;
+
+import java.io.BufferedWriter;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+
+/**
+ * Writes tuples in csv format.
+ *
+ * @param <IN>
+ * Input tuple type
+ */
+public class WriteFormatAsCsv<IN> extends WriteFormat<IN> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ protected void write(String path, ArrayList<IN> tupleList) {
+ try {
+ PrintWriter outStream = new PrintWriter(new BufferedWriter(new FileWriter(path, true)));
+ for (IN tupleToWrite : tupleList) {
+ String strTuple = tupleToWrite.toString();
+ outStream.println(strTuple.substring(1, strTuple.length() - 1));
+ }
+ outStream.close();
+ } catch (IOException e) {
+ throw new RuntimeException("Exception occured while writing file " + path, e);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteFormatAsText.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteFormatAsText.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteFormatAsText.java
new file mode 100644
index 0000000..03fcb5c
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteFormatAsText.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink;
+
+import java.io.BufferedWriter;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+
+/**
+ * Writes tuples in text format.
+ *
+ * @param <IN>
+ * Input tuple type
+ */
+public class WriteFormatAsText<IN> extends WriteFormat<IN> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void write(String path, ArrayList<IN> tupleList) {
+ try {
+ PrintWriter outStream = new PrintWriter(new BufferedWriter(new FileWriter(path, true)));
+ for (IN tupleToWrite : tupleList) {
+ outStream.println(tupleToWrite);
+ }
+ outStream.close();
+ } catch (IOException e) {
+ throw new RuntimeException("Exception occured while writing file " + path, e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteSinkFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteSinkFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteSinkFunction.java
new file mode 100644
index 0000000..b25806d
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteSinkFunction.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink;
+
+import java.io.FileNotFoundException;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+
+/**
+ * Simple implementation of the SinkFunction writing tuples as simple text to
+ * the file specified by path. Tuples are collected to a list and written to the
+ * file periodically. The file specified by path is created if it does not
+ * exist, cleared if it exists before the writing.
+ *
+ * @param <IN>
+ * Input tuple type
+ */
+public abstract class WriteSinkFunction<IN> implements SinkFunction<IN> {
+ private static final long serialVersionUID = 1L;
+
+ protected final String path;
+ protected ArrayList<IN> tupleList = new ArrayList<IN>();
+ protected WriteFormat<IN> format;
+
+ public WriteSinkFunction(String path, WriteFormat<IN> format) {
+ this.path = path;
+ this.format = format;
+ cleanFile(path);
+ }
+
+ /**
+ * Creates target file if it does not exist, cleans it if it exists.
+ *
+ * @param path
+ * is the path to the location where the tuples are written
+ */
+ protected void cleanFile(String path) {
+ try {
+ PrintWriter writer;
+ writer = new PrintWriter(path);
+ writer.print("");
+ writer.close();
+ } catch (FileNotFoundException e) {
+ throw new RuntimeException("File not found " + path, e);
+ }
+ }
+
+ /**
+ * Condition for writing the contents of tupleList and clearing it.
+ *
+ * @return value of the updating condition
+ */
+ protected abstract boolean updateCondition();
+
+ /**
+ * Statements to be executed after writing a batch goes here.
+ */
+ protected abstract void resetParameters();
+
+ /**
+ * Implementation of the invoke method of the SinkFunction class. Collects
+ * the incoming tuples in tupleList and appends the list to the end of the
+ * target file if updateCondition() is true or the current tuple is the
+ * endTuple.
+ */
+ @Override
+ public void invoke(IN tuple) {
+
+ tupleList.add(tuple);
+ if (updateCondition()) {
+ format.write(path, tupleList);
+ resetParameters();
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteSinkFunctionByMillis.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteSinkFunctionByMillis.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteSinkFunctionByMillis.java
new file mode 100644
index 0000000..0364174
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteSinkFunctionByMillis.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink;
+
+/**
+ * Implementation of WriteSinkFunction. Writes tuples to file in every millis
+ * milliseconds.
+ *
+ * @param <IN>
+ * Input tuple type
+ */
+public class WriteSinkFunctionByMillis<IN> extends WriteSinkFunction<IN> {
+ private static final long serialVersionUID = 1L;
+
+ private final long millis;
+ private long lastTime;
+
+ public WriteSinkFunctionByMillis(String path, WriteFormat<IN> format, long millis) {
+ super(path, format);
+ this.millis = millis;
+ lastTime = System.currentTimeMillis();
+ }
+
+ @Override
+ protected boolean updateCondition() {
+ return System.currentTimeMillis() - lastTime >= millis;
+ }
+
+ @Override
+ protected void resetParameters() {
+ tupleList.clear();
+ lastTime = System.currentTimeMillis();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunction.java
new file mode 100644
index 0000000..68ff532
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunction.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.source;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FileMonitoringFunction implements SourceFunction<Tuple3<String, Long, Long>> {
+ private static final long serialVersionUID = 1L;
+
+ private static final Logger LOG = LoggerFactory.getLogger(FileMonitoringFunction.class);
+
+ public enum WatchType {
+ ONLY_NEW_FILES, // Only new files will be processed.
+ REPROCESS_WITH_APPENDED, // When some files are appended, all contents
+ // of the files will be processed.
+ PROCESS_ONLY_APPENDED // When some files are appended, only appended
+ // contents will be processed.
+ }
+
+ private String path;
+ private long interval;
+ private WatchType watchType;
+
+ private FileSystem fileSystem;
+ private Map<String, Long> offsetOfFiles;
+ private Map<String, Long> modificationTimes;
+
+ private volatile boolean isRunning = false;
+
+ public FileMonitoringFunction(String path, long interval, WatchType watchType) {
+ this.path = path;
+ this.interval = interval;
+ this.watchType = watchType;
+ this.modificationTimes = new HashMap<String, Long>();
+ this.offsetOfFiles = new HashMap<String, Long>();
+ }
+
+ @Override
+ public void run(Collector<Tuple3<String, Long, Long>> collector) throws Exception {
+ isRunning = true;
+ fileSystem = FileSystem.get(new URI(path));
+
+ while (isRunning) {
+ List<String> files = listNewFiles();
+ for (String filePath : files) {
+ if (watchType == WatchType.ONLY_NEW_FILES
+ || watchType == WatchType.REPROCESS_WITH_APPENDED) {
+ collector.collect(new Tuple3<String, Long, Long>(filePath, 0L, -1L));
+ offsetOfFiles.put(filePath, -1L);
+ } else if (watchType == WatchType.PROCESS_ONLY_APPENDED) {
+ long offset = 0;
+ long fileSize = fileSystem.getFileStatus(new Path(filePath)).getLen();
+ if (offsetOfFiles.containsKey(filePath)) {
+ offset = offsetOfFiles.get(filePath);
+ }
+
+ collector.collect(new Tuple3<String, Long, Long>(filePath, offset, fileSize));
+ offsetOfFiles.put(filePath, fileSize);
+
+ LOG.info("File processed: {}, {}, {}", filePath, offset, fileSize);
+ }
+ }
+
+ Thread.sleep(interval);
+ }
+ }
+
+ private List<String> listNewFiles() throws IOException {
+ List<String> files = new ArrayList<String>();
+
+ FileStatus[] statuses = fileSystem.listStatus(new Path(path));
+
+ for (FileStatus status : statuses) {
+ Path filePath = status.getPath();
+ String fileName = filePath.getName();
+ long modificationTime = status.getModificationTime();
+
+ if (!isFiltered(fileName, modificationTime)) {
+ files.add(filePath.toString());
+ modificationTimes.put(fileName, modificationTime);
+ }
+ }
+ return files;
+ }
+
+ private boolean isFiltered(String fileName, long modificationTime) {
+
+ if ((watchType == WatchType.ONLY_NEW_FILES && modificationTimes.containsKey(fileName))
+ || fileName.startsWith(".") || fileName.contains("_COPYING_")) {
+ return true;
+ } else {
+ Long lastModification = modificationTimes.get(fileName);
+ if (lastModification == null) {
+ return false;
+ } else {
+ return lastModification >= modificationTime;
+ }
+ }
+ }
+
+ @Override
+ public void cancel() {
+ isRunning = false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileReadFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileReadFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileReadFunction.java
new file mode 100644
index 0000000..945f953
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileReadFunction.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.Collector;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.net.URI;
+
+public class FileReadFunction implements FlatMapFunction<Tuple3<String, Long, Long>, String> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void flatMap(Tuple3<String, Long, Long> value, Collector<String> out) throws Exception {
+ FSDataInputStream stream = FileSystem.get(new URI(value.f0)).open(new Path(value.f0));
+ stream.seek(value.f1);
+
+ BufferedReader reader = new BufferedReader(new InputStreamReader(stream));
+ String line;
+
+ try {
+ while ((line = reader.readLine()) != null && (value.f2 == -1L || stream.getPos() <= value.f2)) {
+ out.collect(line);
+ }
+ } finally {
+ reader.close();
+ }
+ }
+}