You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/08/29 21:03:56 UTC

[23/28] git commit: [streaming] GroupReduceInvokable update

[streaming] GroupReduceInvokable update


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/f149197f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/f149197f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/f149197f

Branch: refs/heads/master
Commit: f149197f952c85d5eeb643b91aeac4f26fd573a4
Parents: 723cb27
Author: szape <ne...@gmail.com>
Authored: Thu Aug 28 13:07:59 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Aug 29 21:01:57 2014 +0200

----------------------------------------------------------------------
 .../api/datastream/ConnectedDataStream.java     | 26 --------
 .../api/function/co/CoGroupReduceFunction.java  | 31 +++++++++
 .../api/function/co/CoReduceFunction.java       |  8 +--
 .../operator/co/CoFlatMapInvokable.java         | 28 ++------
 .../operator/co/CoGroupReduceInvokable.java     | 52 +++++++--------
 .../api/invokable/operator/co/CoInvokable.java  | 37 +++++++++--
 .../invokable/operator/co/CoMapInvokable.java   | 32 +++------
 .../operator/co/CoReduceInvokable.java          | 70 --------------------
 .../operator/co/CoStreamReduceInvokable.java    | 70 ++++++++++++++++++++
 .../invokable/operator/CoGroupReduceTest.java   |  2 +-
 10 files changed, 177 insertions(+), 179 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f149197f/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
index bd0c607..920278c 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
@@ -27,12 +27,10 @@ import org.apache.flink.streaming.api.JobGraphBuilder;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.function.co.CoFlatMapFunction;
 import org.apache.flink.streaming.api.function.co.CoMapFunction;
-import org.apache.flink.streaming.api.function.co.CoReduceFunction;
 import org.apache.flink.streaming.api.function.co.RichCoMapFunction;
 import org.apache.flink.streaming.api.invokable.operator.co.CoFlatMapInvokable;
 import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable;
 import org.apache.flink.streaming.api.invokable.operator.co.CoMapInvokable;
-import org.apache.flink.streaming.api.invokable.operator.co.CoReduceInvokable;
 import org.apache.flink.streaming.util.serialization.FunctionTypeWrapper;
 import org.apache.flink.streaming.util.serialization.TypeSerializerWrapper;
 
@@ -154,30 +152,6 @@ public class ConnectedDataStream<IN1, IN2> {
 		return addCoFunction("coFlatMap", coFlatMapper, in1TypeWrapper, in2TypeWrapper,
 				outTypeWrapper, new CoFlatMapInvokable<IN1, IN2, OUT>(coFlatMapper));
 	}
-	
-	/**
-	 * Applies a CoReduce transformation on the data stream. The transformation calls
-	 * {@link CoReduceFunction#reduce1} and {@link CoReduceFunction#map1} for
-	 * each element of the first input and {@link CoReduceFunction#reduce2} and
-	 * {@link CoReduceFunction#map2} for each element of the second input. 
-	 * 
-	 * @param coReducer
-	 *            The {@link CoReduceFunction} that will be called for every two
-	 *            element of each input DataStream.
-	 * @return The transformed DataStream.
-	 */
-	public <OUT> SingleOutputStreamOperator<OUT, ?> reduce(CoReduceFunction<IN1, IN2, OUT> coReducer) {
-
-		FunctionTypeWrapper<IN1> in1TypeWrapper = new FunctionTypeWrapper<IN1>(coReducer,
-				CoReduceFunction.class, 0);
-		FunctionTypeWrapper<IN2> in2TypeWrapper = new FunctionTypeWrapper<IN2>(coReducer,
-				CoReduceFunction.class, 1);
-		FunctionTypeWrapper<OUT> outTypeWrapper = new FunctionTypeWrapper<OUT>(coReducer,
-				CoReduceFunction.class, 2);
-
-		return addCoFunction("coReduce", coReducer, in1TypeWrapper, in2TypeWrapper, outTypeWrapper,
-				new CoReduceInvokable<IN1, IN2, OUT>(coReducer));
-	}
 
 	protected <OUT> SingleOutputStreamOperator<OUT, ?> addCoFunction(String functionName,
 			final Function function, TypeSerializerWrapper<IN1> in1TypeWrapper,

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f149197f/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CoGroupReduceFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CoGroupReduceFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CoGroupReduceFunction.java
new file mode 100644
index 0000000..ba31ea9
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CoGroupReduceFunction.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.function.co;
+
+import java.io.Serializable;
+
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.util.Collector;
+
+public interface CoGroupReduceFunction<IN1, IN2, OUT> extends Function, Serializable {
+
+	void reduce1(Iterable<IN1> values, Collector<OUT> out) throws Exception;
+	
+	void reduce2(Iterable<IN2> values, Collector<OUT> out) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f149197f/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CoReduceFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CoReduceFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CoReduceFunction.java
index fa58991..92b66c6 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CoReduceFunction.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CoReduceFunction.java
@@ -67,7 +67,7 @@ public interface CoReduceFunction<IN1, IN2, OUT> extends Function, Serializable
 	 *             This method may throw exceptions. Throwing an exception will
 	 *             cause the operation to fail and may trigger recovery.
 	 */
-	public abstract IN1 reduce1(IN1 value1, IN1 value2);
+	public IN1 reduce1(IN1 value1, IN1 value2);
 
 	/**
 	 * The core method of ReduceFunction, combining two values of the second
@@ -85,7 +85,7 @@ public interface CoReduceFunction<IN1, IN2, OUT> extends Function, Serializable
 	 *             This method may throw exceptions. Throwing an exception will
 	 *             cause the operation to fail and may trigger recovery.
 	 */
-	public abstract IN2 reduce2(IN2 value1, IN2 value2);
+	public IN2 reduce2(IN2 value1, IN2 value2);
 
 	/**
 	 * Maps the reduced first input to the output type.
@@ -97,7 +97,7 @@ public interface CoReduceFunction<IN1, IN2, OUT> extends Function, Serializable
 	 * @param <OUT>
 	 *            Output type.
 	 */
-	public abstract OUT map1(IN1 value);
+	public OUT map1(IN1 value);
 
 	/**
 	 * Maps the reduced second input to the output type.
@@ -109,5 +109,5 @@ public interface CoReduceFunction<IN1, IN2, OUT> extends Function, Serializable
 	 * @param <OUT>
 	 *            Output type.
 	 */
-	public abstract OUT map2(IN2 value);
+	public OUT map2(IN2 value);
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f149197f/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoFlatMapInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoFlatMapInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoFlatMapInvokable.java
index bde9368..9561e74 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoFlatMapInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoFlatMapInvokable.java
@@ -17,8 +17,6 @@
 
 package org.apache.flink.streaming.api.invokable.operator.co;
 
-import org.apache.flink.api.common.functions.RichFunction;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.function.co.CoFlatMapFunction;
 
 public class CoFlatMapInvokable<IN1, IN2, OUT> extends CoInvokable<IN1, IN2, OUT> {
@@ -33,38 +31,24 @@ public class CoFlatMapInvokable<IN1, IN2, OUT> extends CoInvokable<IN1, IN2, OUT
 
 	@Override
 	public void handleStream1() throws Exception {
-		flatMapper.flatMap1(reuse1.getObject(), collector);
+		callUserFunctionAndLogException1();
 	}
 
 	@Override
 	public void handleStream2() throws Exception {
-		flatMapper.flatMap2(reuse2.getObject(), collector);
+		callUserFunctionAndLogException2();
 	}
 
 	@Override
-	public void open(Configuration parameters) throws Exception {
-		if (flatMapper instanceof RichFunction) {
-			((RichFunction) flatMapper).open(parameters);
-		}
-	}
+	protected void callUserFunction1() throws Exception {
+		flatMapper.flatMap1(reuse1.getObject(), collector);
 
-	@Override
-	public void close() throws Exception {
-		if (flatMapper instanceof RichFunction) {
-			((RichFunction) flatMapper).close();
-		}
 	}
 
 	@Override
-	protected void coUSerFunction1() throws Exception {
-		// TODO Auto-generated method stub
-		
-	}
+	protected void callUserFunction2() throws Exception {
+		flatMapper.flatMap2(reuse2.getObject(), collector);
 
-	@Override
-	protected void coUserFunction2() throws Exception {
-		// TODO Auto-generated method stub
-		
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f149197f/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupReduceInvokable.java
index e5f4059..d7ad32e 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupReduceInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupReduceInvokable.java
@@ -20,15 +20,15 @@ package org.apache.flink.streaming.api.invokable.operator.co;
 import org.apache.flink.streaming.api.function.co.CoReduceFunction;
 import org.apache.flink.streaming.state.MutableTableState;
 
-public class CoGroupReduceInvokable<IN1, IN2, OUT> extends CoInvokable<IN1, IN2, OUT> {
+public class CoGroupReduceInvokable<IN1, IN2, OUT> extends CoStreamReduceInvokable<IN1, IN2, OUT> {
 	private static final long serialVersionUID = 1L;
 
-	private CoReduceFunction<IN1, IN2, OUT> coReducer;
 	private int keyPosition1;
-	private MutableTableState<Object, IN1> values1;
-
 	private int keyPosition2;
+	private MutableTableState<Object, IN1> values1;
 	private MutableTableState<Object, IN2> values2;
+	IN1 reduced1;
+	IN2 reduced2;
 
 	public CoGroupReduceInvokable(CoReduceFunction<IN1, IN2, OUT> coReducer, int keyPosition1,
 			int keyPosition2) {
@@ -43,43 +43,43 @@ public class CoGroupReduceInvokable<IN1, IN2, OUT> extends CoInvokable<IN1, IN2,
 	@Override
 	public void handleStream1() throws Exception {
 		Object key = reuse1.getField(keyPosition1);
-		IN1 currentValue = values1.get(key);
-		IN1 nextValue = reuse1.getObject();
-		if (currentValue != null) {
-			IN1 reduced = coReducer.reduce1(currentValue, nextValue);
-			values1.put(key, reduced);
-			collector.collect(coReducer.map1(reduced));
+		currentValue1 = values1.get(key);
+		nextValue1 = reuse1.getObject();
+		if (currentValue1 != null) {
+			callUserFunctionAndLogException1();
+			values1.put(key, reduced1);
+			collector.collect(coReducer.map1(reduced1));
 		} else {
-			values1.put(key, nextValue);
-			collector.collect(coReducer.map1(nextValue));
+			values1.put(key, nextValue1);
+			collector.collect(coReducer.map1(nextValue1));
 		}
 	}
 
 	@Override
 	public void handleStream2() throws Exception {
 		Object key = reuse2.getField(keyPosition2);
-		IN2 currentValue = values2.get(key);
-		IN2 nextValue = reuse2.getObject();
-		if (currentValue != null) {
-			IN2 reduced = coReducer.reduce2(currentValue, nextValue);
-			values2.put(key, reduced);
-			collector.collect(coReducer.map2(reduced));
+		currentValue2 = values2.get(key);
+		nextValue2 = reuse2.getObject();
+		if (currentValue2 != null) {
+			callUserFunctionAndLogException2();
+			values2.put(key, reduced2);
+			collector.collect(coReducer.map2(reduced2));
 		} else {
-			values2.put(key, nextValue);
-			collector.collect(coReducer.map2(nextValue));
+			values2.put(key, nextValue2);
+			collector.collect(coReducer.map2(nextValue2));
 		}
 	}
 
 	@Override
-	protected void coUSerFunction1() throws Exception {
-		// TODO Auto-generated method stub
-		
+	protected void callUserFunction1() throws Exception {
+		reduced1 = coReducer.reduce1(currentValue1, nextValue1);
+
 	}
 
 	@Override
-	protected void coUserFunction2() throws Exception {
-		// TODO Auto-generated method stub
-		
+	protected void callUserFunction2() throws Exception {
+		reduced2 = coReducer.reduce2(currentValue2, nextValue2);
+
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f149197f/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
index 585bd72..e9fefc3 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
@@ -17,12 +17,15 @@
 
 package org.apache.flink.streaming.api.invokable.operator.co;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.streaming.api.invokable.StreamInvokable;
 import org.apache.flink.streaming.api.streamrecord.StreamRecord;
 import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
 import org.apache.flink.streaming.io.CoReaderIterator;
 import org.apache.flink.util.Collector;
+import org.apache.flink.util.StringUtils;
 
 public abstract class CoInvokable<IN1, IN2, OUT> extends StreamInvokable<OUT> {
 
@@ -31,6 +34,7 @@ public abstract class CoInvokable<IN1, IN2, OUT> extends StreamInvokable<OUT> {
 	}
 
 	private static final long serialVersionUID = 1L;
+	private static final Log LOG = LogFactory.getLog(StreamInvokable.class);
 
 	protected CoReaderIterator<StreamRecord<IN1>, StreamRecord<IN2>> recordIterator;
 	protected StreamRecord<IN1> reuse1;
@@ -67,7 +71,6 @@ public abstract class CoInvokable<IN1, IN2, OUT> extends StreamInvokable<OUT> {
 		this.reuse2 = serializer2.createInstance();
 	}
 
-	
 	public void invoke() throws Exception {
 		if (this.isMutable) {
 			mutableInvoke();
@@ -75,7 +78,7 @@ public abstract class CoInvokable<IN1, IN2, OUT> extends StreamInvokable<OUT> {
 			immutableInvoke();
 		}
 	}
-	
+
 	protected void immutableInvoke() throws Exception {
 		while (true) {
 			int next = recordIterator.next(reuse1, reuse2);
@@ -90,7 +93,7 @@ public abstract class CoInvokable<IN1, IN2, OUT> extends StreamInvokable<OUT> {
 			}
 		}
 	}
-	
+
 	protected void mutableInvoke() throws Exception {
 		while (true) {
 			int next = recordIterator.next(reuse1, reuse2);
@@ -107,9 +110,31 @@ public abstract class CoInvokable<IN1, IN2, OUT> extends StreamInvokable<OUT> {
 	protected abstract void handleStream1() throws Exception;
 
 	protected abstract void handleStream2() throws Exception;
+
+	protected abstract void callUserFunction1() throws Exception;
+
+	protected abstract void callUserFunction2() throws Exception;
 	
-	protected abstract void coUSerFunction1() throws Exception;
-	
-	protected abstract void coUserFunction2() throws Exception;
+	protected void callUserFunctionAndLogException1() {
+		try {
+			callUserFunction1();
+		} catch (Exception e) {
+			if (LOG.isErrorEnabled()) {
+				LOG.error(String.format("Calling user function failed due to: %s",
+						StringUtils.stringifyException(e)));
+			}
+		}
+	}
+
+	protected void callUserFunctionAndLogException2() {
+		try {
+			callUserFunction2();
+		} catch (Exception e) {
+			if (LOG.isErrorEnabled()) {
+				LOG.error(String.format("Calling user function failed due to: %s",
+						StringUtils.stringifyException(e)));
+			}
+		}
+	}
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f149197f/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoMapInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoMapInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoMapInvokable.java
index 6179d2e..6c179e7 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoMapInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoMapInvokable.java
@@ -17,8 +17,6 @@
 
 package org.apache.flink.streaming.api.invokable.operator.co;
 
-import org.apache.flink.api.common.functions.RichFunction;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.function.co.CoMapFunction;
 
 public class CoMapInvokable<IN1, IN2, OUT> extends CoInvokable<IN1, IN2, OUT> {
@@ -30,41 +28,27 @@ public class CoMapInvokable<IN1, IN2, OUT> extends CoInvokable<IN1, IN2, OUT> {
 		super(mapper);
 		this.mapper = mapper;
 	}
-	
+
 	@Override
 	public void handleStream1() throws Exception {
-		collector.collect(mapper.map1(reuse1.getObject()));
+		callUserFunctionAndLogException1();
 	}
-	
+
 	@Override
 	public void handleStream2() throws Exception {
-		collector.collect(mapper.map2(reuse2.getObject()));
+		callUserFunctionAndLogException2();
 	}
 
 	@Override
-	public void open(Configuration parameters) throws Exception {
-		if (mapper instanceof RichFunction) {
-			((RichFunction) mapper).open(parameters);
-		}
-	}
+	protected void callUserFunction1() throws Exception {
+		collector.collect(mapper.map1(reuse1.getObject()));
 
-	@Override
-	public void close() throws Exception {
-		if (mapper instanceof RichFunction) {
-			((RichFunction) mapper).close();
-		}
 	}
 
 	@Override
-	protected void coUSerFunction1() throws Exception {
-		// TODO Auto-generated method stub
-		
-	}
+	protected void callUserFunction2() throws Exception {
+		collector.collect(mapper.map2(reuse2.getObject()));
 
-	@Override
-	protected void coUserFunction2() throws Exception {
-		// TODO Auto-generated method stub
-		
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f149197f/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoReduceInvokable.java
deleted file mode 100755
index 4a4a280..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoReduceInvokable.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator.co;
-
-import org.apache.flink.streaming.api.function.co.CoReduceFunction;
-
-public class CoReduceInvokable<IN1, IN2, OUT> extends CoInvokable<IN1, IN2, OUT> {
-	private static final long serialVersionUID = 1L;
-
-	private CoReduceFunction<IN1, IN2, OUT> coReducer;
-	private IN1 currentValue1 = null;
-	private IN2 currentValue2 = null;
-
-	public CoReduceInvokable(CoReduceFunction<IN1, IN2, OUT> coReducer) {
-		super(coReducer);
-		this.coReducer = coReducer;
-	}
-
-	@Override
-	public void handleStream1() throws Exception {
-		IN1 nextValue = reuse1.getObject();
-		if (currentValue1 != null) {
-			currentValue1 = coReducer.reduce1(currentValue1, nextValue);
-			collector.collect(coReducer.map1(currentValue1));
-		} else {
-			currentValue1 = nextValue;
-			collector.collect(coReducer.map1(nextValue));
-		}
-	}
-
-	@Override
-	public void handleStream2() throws Exception {
-		IN2 nextValue = reuse2.getObject();
-		if (currentValue2 != null) {
-			currentValue2 = coReducer.reduce2(currentValue2, nextValue);
-			collector.collect(coReducer.map2(currentValue2));
-		} else {
-			currentValue2 = nextValue;
-			collector.collect(coReducer.map2(nextValue));
-		}
-	}
-
-	@Override
-	protected void coUSerFunction1() throws Exception {
-		// TODO Auto-generated method stub
-		
-	}
-
-	@Override
-	protected void coUserFunction2() throws Exception {
-		// TODO Auto-generated method stub
-		
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f149197f/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoStreamReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoStreamReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoStreamReduceInvokable.java
new file mode 100644
index 0000000..e4acb99
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoStreamReduceInvokable.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable.operator.co;
+
+import org.apache.flink.streaming.api.function.co.CoReduceFunction;
+
+public abstract class CoStreamReduceInvokable<IN1, IN2, OUT> extends CoInvokable<IN1, IN2, OUT> {
+	private static final long serialVersionUID = 1L;
+
+	protected CoReduceFunction<IN1, IN2, OUT> coReducer;
+	protected IN1 currentValue1 = null;
+	protected IN2 currentValue2 = null;
+	protected IN1 nextValue1 = null;
+	protected IN2 nextValue2 = null;
+
+	public CoStreamReduceInvokable(CoReduceFunction<IN1, IN2, OUT> coReducer) {
+		super(coReducer);
+		this.coReducer = coReducer;
+		currentValue1 = null;
+		currentValue2 = null;
+	}
+
+	@Override
+	public void handleStream1() throws Exception {
+		nextValue1 = reuse1.getObject();
+		callUserFunctionAndLogException1();
+	}
+
+	@Override
+	public void handleStream2() throws Exception {
+		nextValue2 = reuse2.getObject();
+		callUserFunctionAndLogException2();
+	}
+
+	@Override
+	protected void callUserFunction1() throws Exception {
+		if (currentValue1 != null) {
+			currentValue1 = coReducer.reduce1(currentValue1, nextValue1);
+		} else {
+			currentValue1 = nextValue1;
+		}
+		collector.collect(coReducer.map1(currentValue1));
+	}
+
+	@Override
+	protected void callUserFunction2() throws Exception {
+		if (currentValue2 != null) {
+			currentValue2 = coReducer.reduce2(currentValue2, nextValue2);
+		} else {
+			currentValue2 = nextValue2;
+		}
+		collector.collect(coReducer.map2(nextValue2));
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f149197f/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupReduceTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupReduceTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupReduceTest.java
index e1d10b8..bf9a772 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupReduceTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupReduceTest.java
@@ -60,7 +60,7 @@ public class CoGroupReduceTest {
 
 	@SuppressWarnings("unchecked")
 	@Test
-	public void coFlatMapTest() {
+	public void coGroupReduceTest() {
 		Tuple3<String, String, String> word1 = new Tuple3<String, String, String>("a", "word1", "b");
 		Tuple3<String, String, String> word2 = new Tuple3<String, String, String>("b", "word2", "a");
 		Tuple3<String, String, String> word3 = new Tuple3<String, String, String>("a", "word3", "a");