You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/08/29 21:03:56 UTC
[23/28] git commit: [streaming] GroupReduceInvokable update
[streaming] GroupReduceInvokable update
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/f149197f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/f149197f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/f149197f
Branch: refs/heads/master
Commit: f149197f952c85d5eeb643b91aeac4f26fd573a4
Parents: 723cb27
Author: szape <ne...@gmail.com>
Authored: Thu Aug 28 13:07:59 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Aug 29 21:01:57 2014 +0200
----------------------------------------------------------------------
.../api/datastream/ConnectedDataStream.java | 26 --------
.../api/function/co/CoGroupReduceFunction.java | 31 +++++++++
.../api/function/co/CoReduceFunction.java | 8 +--
.../operator/co/CoFlatMapInvokable.java | 28 ++------
.../operator/co/CoGroupReduceInvokable.java | 52 +++++++--------
.../api/invokable/operator/co/CoInvokable.java | 37 +++++++++--
.../invokable/operator/co/CoMapInvokable.java | 32 +++------
.../operator/co/CoReduceInvokable.java | 70 --------------------
.../operator/co/CoStreamReduceInvokable.java | 70 ++++++++++++++++++++
.../invokable/operator/CoGroupReduceTest.java | 2 +-
10 files changed, 177 insertions(+), 179 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f149197f/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
index bd0c607..920278c 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
@@ -27,12 +27,10 @@ import org.apache.flink.streaming.api.JobGraphBuilder;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.function.co.CoFlatMapFunction;
import org.apache.flink.streaming.api.function.co.CoMapFunction;
-import org.apache.flink.streaming.api.function.co.CoReduceFunction;
import org.apache.flink.streaming.api.function.co.RichCoMapFunction;
import org.apache.flink.streaming.api.invokable.operator.co.CoFlatMapInvokable;
import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable;
import org.apache.flink.streaming.api.invokable.operator.co.CoMapInvokable;
-import org.apache.flink.streaming.api.invokable.operator.co.CoReduceInvokable;
import org.apache.flink.streaming.util.serialization.FunctionTypeWrapper;
import org.apache.flink.streaming.util.serialization.TypeSerializerWrapper;
@@ -154,30 +152,6 @@ public class ConnectedDataStream<IN1, IN2> {
return addCoFunction("coFlatMap", coFlatMapper, in1TypeWrapper, in2TypeWrapper,
outTypeWrapper, new CoFlatMapInvokable<IN1, IN2, OUT>(coFlatMapper));
}
-
- /**
- * Applies a CoReduce transformation on the data stream. The transformation calls
- * {@link CoReduceFunction#reduce1} and {@link CoReduceFunction#map1} for
- * each element of the first input and {@link CoReduceFunction#reduce2} and
- * {@link CoReduceFunction#map2} for each element of the second input.
- *
- * @param coReducer
- * The {@link CoReduceFunction} that will be called for every two
- * element of each input DataStream.
- * @return The transformed DataStream.
- */
- public <OUT> SingleOutputStreamOperator<OUT, ?> reduce(CoReduceFunction<IN1, IN2, OUT> coReducer) {
-
- FunctionTypeWrapper<IN1> in1TypeWrapper = new FunctionTypeWrapper<IN1>(coReducer,
- CoReduceFunction.class, 0);
- FunctionTypeWrapper<IN2> in2TypeWrapper = new FunctionTypeWrapper<IN2>(coReducer,
- CoReduceFunction.class, 1);
- FunctionTypeWrapper<OUT> outTypeWrapper = new FunctionTypeWrapper<OUT>(coReducer,
- CoReduceFunction.class, 2);
-
- return addCoFunction("coReduce", coReducer, in1TypeWrapper, in2TypeWrapper, outTypeWrapper,
- new CoReduceInvokable<IN1, IN2, OUT>(coReducer));
- }
protected <OUT> SingleOutputStreamOperator<OUT, ?> addCoFunction(String functionName,
final Function function, TypeSerializerWrapper<IN1> in1TypeWrapper,
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f149197f/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CoGroupReduceFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CoGroupReduceFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CoGroupReduceFunction.java
new file mode 100644
index 0000000..ba31ea9
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CoGroupReduceFunction.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.function.co;
+
+import java.io.Serializable;
+
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.util.Collector;
+
+public interface CoGroupReduceFunction<IN1, IN2, OUT> extends Function, Serializable {
+
+ void reduce1(Iterable<IN1> values, Collector<OUT> out) throws Exception;
+
+ void reduce2(Iterable<IN2> values, Collector<OUT> out) throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f149197f/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CoReduceFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CoReduceFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CoReduceFunction.java
index fa58991..92b66c6 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CoReduceFunction.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CoReduceFunction.java
@@ -67,7 +67,7 @@ public interface CoReduceFunction<IN1, IN2, OUT> extends Function, Serializable
* This method may throw exceptions. Throwing an exception will
* cause the operation to fail and may trigger recovery.
*/
- public abstract IN1 reduce1(IN1 value1, IN1 value2);
+ public IN1 reduce1(IN1 value1, IN1 value2);
/**
* The core method of ReduceFunction, combining two values of the second
@@ -85,7 +85,7 @@ public interface CoReduceFunction<IN1, IN2, OUT> extends Function, Serializable
* This method may throw exceptions. Throwing an exception will
* cause the operation to fail and may trigger recovery.
*/
- public abstract IN2 reduce2(IN2 value1, IN2 value2);
+ public IN2 reduce2(IN2 value1, IN2 value2);
/**
* Maps the reduced first input to the output type.
@@ -97,7 +97,7 @@ public interface CoReduceFunction<IN1, IN2, OUT> extends Function, Serializable
* @param <OUT>
* Output type.
*/
- public abstract OUT map1(IN1 value);
+ public OUT map1(IN1 value);
/**
* Maps the reduced second input to the output type.
@@ -109,5 +109,5 @@ public interface CoReduceFunction<IN1, IN2, OUT> extends Function, Serializable
* @param <OUT>
* Output type.
*/
- public abstract OUT map2(IN2 value);
+ public OUT map2(IN2 value);
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f149197f/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoFlatMapInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoFlatMapInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoFlatMapInvokable.java
index bde9368..9561e74 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoFlatMapInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoFlatMapInvokable.java
@@ -17,8 +17,6 @@
package org.apache.flink.streaming.api.invokable.operator.co;
-import org.apache.flink.api.common.functions.RichFunction;
-import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.function.co.CoFlatMapFunction;
public class CoFlatMapInvokable<IN1, IN2, OUT> extends CoInvokable<IN1, IN2, OUT> {
@@ -33,38 +31,24 @@ public class CoFlatMapInvokable<IN1, IN2, OUT> extends CoInvokable<IN1, IN2, OUT
@Override
public void handleStream1() throws Exception {
- flatMapper.flatMap1(reuse1.getObject(), collector);
+ callUserFunctionAndLogException1();
}
@Override
public void handleStream2() throws Exception {
- flatMapper.flatMap2(reuse2.getObject(), collector);
+ callUserFunctionAndLogException2();
}
@Override
- public void open(Configuration parameters) throws Exception {
- if (flatMapper instanceof RichFunction) {
- ((RichFunction) flatMapper).open(parameters);
- }
- }
+ protected void callUserFunction1() throws Exception {
+ flatMapper.flatMap1(reuse1.getObject(), collector);
- @Override
- public void close() throws Exception {
- if (flatMapper instanceof RichFunction) {
- ((RichFunction) flatMapper).close();
- }
}
@Override
- protected void coUSerFunction1() throws Exception {
- // TODO Auto-generated method stub
-
- }
+ protected void callUserFunction2() throws Exception {
+ flatMapper.flatMap2(reuse2.getObject(), collector);
- @Override
- protected void coUserFunction2() throws Exception {
- // TODO Auto-generated method stub
-
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f149197f/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupReduceInvokable.java
index e5f4059..d7ad32e 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupReduceInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupReduceInvokable.java
@@ -20,15 +20,15 @@ package org.apache.flink.streaming.api.invokable.operator.co;
import org.apache.flink.streaming.api.function.co.CoReduceFunction;
import org.apache.flink.streaming.state.MutableTableState;
-public class CoGroupReduceInvokable<IN1, IN2, OUT> extends CoInvokable<IN1, IN2, OUT> {
+public class CoGroupReduceInvokable<IN1, IN2, OUT> extends CoStreamReduceInvokable<IN1, IN2, OUT> {
private static final long serialVersionUID = 1L;
- private CoReduceFunction<IN1, IN2, OUT> coReducer;
private int keyPosition1;
- private MutableTableState<Object, IN1> values1;
-
private int keyPosition2;
+ private MutableTableState<Object, IN1> values1;
private MutableTableState<Object, IN2> values2;
+ IN1 reduced1;
+ IN2 reduced2;
public CoGroupReduceInvokable(CoReduceFunction<IN1, IN2, OUT> coReducer, int keyPosition1,
int keyPosition2) {
@@ -43,43 +43,43 @@ public class CoGroupReduceInvokable<IN1, IN2, OUT> extends CoInvokable<IN1, IN2,
@Override
public void handleStream1() throws Exception {
Object key = reuse1.getField(keyPosition1);
- IN1 currentValue = values1.get(key);
- IN1 nextValue = reuse1.getObject();
- if (currentValue != null) {
- IN1 reduced = coReducer.reduce1(currentValue, nextValue);
- values1.put(key, reduced);
- collector.collect(coReducer.map1(reduced));
+ currentValue1 = values1.get(key);
+ nextValue1 = reuse1.getObject();
+ if (currentValue1 != null) {
+ callUserFunctionAndLogException1();
+ values1.put(key, reduced1);
+ collector.collect(coReducer.map1(reduced1));
} else {
- values1.put(key, nextValue);
- collector.collect(coReducer.map1(nextValue));
+ values1.put(key, nextValue1);
+ collector.collect(coReducer.map1(nextValue1));
}
}
@Override
public void handleStream2() throws Exception {
Object key = reuse2.getField(keyPosition2);
- IN2 currentValue = values2.get(key);
- IN2 nextValue = reuse2.getObject();
- if (currentValue != null) {
- IN2 reduced = coReducer.reduce2(currentValue, nextValue);
- values2.put(key, reduced);
- collector.collect(coReducer.map2(reduced));
+ currentValue2 = values2.get(key);
+ nextValue2 = reuse2.getObject();
+ if (currentValue2 != null) {
+ callUserFunctionAndLogException2();
+ values2.put(key, reduced2);
+ collector.collect(coReducer.map2(reduced2));
} else {
- values2.put(key, nextValue);
- collector.collect(coReducer.map2(nextValue));
+ values2.put(key, nextValue2);
+ collector.collect(coReducer.map2(nextValue2));
}
}
@Override
- protected void coUSerFunction1() throws Exception {
- // TODO Auto-generated method stub
-
+ protected void callUserFunction1() throws Exception {
+ reduced1 = coReducer.reduce1(currentValue1, nextValue1);
+
}
@Override
- protected void coUserFunction2() throws Exception {
- // TODO Auto-generated method stub
-
+ protected void callUserFunction2() throws Exception {
+ reduced2 = coReducer.reduce2(currentValue2, nextValue2);
+
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f149197f/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
index 585bd72..e9fefc3 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
@@ -17,12 +17,15 @@
package org.apache.flink.streaming.api.invokable.operator.co;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.streaming.api.invokable.StreamInvokable;
import org.apache.flink.streaming.api.streamrecord.StreamRecord;
import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
import org.apache.flink.streaming.io.CoReaderIterator;
import org.apache.flink.util.Collector;
+import org.apache.flink.util.StringUtils;
public abstract class CoInvokable<IN1, IN2, OUT> extends StreamInvokable<OUT> {
@@ -31,6 +34,7 @@ public abstract class CoInvokable<IN1, IN2, OUT> extends StreamInvokable<OUT> {
}
private static final long serialVersionUID = 1L;
+ private static final Log LOG = LogFactory.getLog(StreamInvokable.class);
protected CoReaderIterator<StreamRecord<IN1>, StreamRecord<IN2>> recordIterator;
protected StreamRecord<IN1> reuse1;
@@ -67,7 +71,6 @@ public abstract class CoInvokable<IN1, IN2, OUT> extends StreamInvokable<OUT> {
this.reuse2 = serializer2.createInstance();
}
-
public void invoke() throws Exception {
if (this.isMutable) {
mutableInvoke();
@@ -75,7 +78,7 @@ public abstract class CoInvokable<IN1, IN2, OUT> extends StreamInvokable<OUT> {
immutableInvoke();
}
}
-
+
protected void immutableInvoke() throws Exception {
while (true) {
int next = recordIterator.next(reuse1, reuse2);
@@ -90,7 +93,7 @@ public abstract class CoInvokable<IN1, IN2, OUT> extends StreamInvokable<OUT> {
}
}
}
-
+
protected void mutableInvoke() throws Exception {
while (true) {
int next = recordIterator.next(reuse1, reuse2);
@@ -107,9 +110,31 @@ public abstract class CoInvokable<IN1, IN2, OUT> extends StreamInvokable<OUT> {
protected abstract void handleStream1() throws Exception;
protected abstract void handleStream2() throws Exception;
+
+ protected abstract void callUserFunction1() throws Exception;
+
+ protected abstract void callUserFunction2() throws Exception;
- protected abstract void coUSerFunction1() throws Exception;
-
- protected abstract void coUserFunction2() throws Exception;
+ protected void callUserFunctionAndLogException1() {
+ try {
+ callUserFunction1();
+ } catch (Exception e) {
+ if (LOG.isErrorEnabled()) {
+ LOG.error(String.format("Calling user function failed due to: %s",
+ StringUtils.stringifyException(e)));
+ }
+ }
+ }
+
+ protected void callUserFunctionAndLogException2() {
+ try {
+ callUserFunction2();
+ } catch (Exception e) {
+ if (LOG.isErrorEnabled()) {
+ LOG.error(String.format("Calling user function failed due to: %s",
+ StringUtils.stringifyException(e)));
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f149197f/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoMapInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoMapInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoMapInvokable.java
index 6179d2e..6c179e7 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoMapInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoMapInvokable.java
@@ -17,8 +17,6 @@
package org.apache.flink.streaming.api.invokable.operator.co;
-import org.apache.flink.api.common.functions.RichFunction;
-import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.function.co.CoMapFunction;
public class CoMapInvokable<IN1, IN2, OUT> extends CoInvokable<IN1, IN2, OUT> {
@@ -30,41 +28,27 @@ public class CoMapInvokable<IN1, IN2, OUT> extends CoInvokable<IN1, IN2, OUT> {
super(mapper);
this.mapper = mapper;
}
-
+
@Override
public void handleStream1() throws Exception {
- collector.collect(mapper.map1(reuse1.getObject()));
+ callUserFunctionAndLogException1();
}
-
+
@Override
public void handleStream2() throws Exception {
- collector.collect(mapper.map2(reuse2.getObject()));
+ callUserFunctionAndLogException2();
}
@Override
- public void open(Configuration parameters) throws Exception {
- if (mapper instanceof RichFunction) {
- ((RichFunction) mapper).open(parameters);
- }
- }
+ protected void callUserFunction1() throws Exception {
+ collector.collect(mapper.map1(reuse1.getObject()));
- @Override
- public void close() throws Exception {
- if (mapper instanceof RichFunction) {
- ((RichFunction) mapper).close();
- }
}
@Override
- protected void coUSerFunction1() throws Exception {
- // TODO Auto-generated method stub
-
- }
+ protected void callUserFunction2() throws Exception {
+ collector.collect(mapper.map2(reuse2.getObject()));
- @Override
- protected void coUserFunction2() throws Exception {
- // TODO Auto-generated method stub
-
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f149197f/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoReduceInvokable.java
deleted file mode 100755
index 4a4a280..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoReduceInvokable.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator.co;
-
-import org.apache.flink.streaming.api.function.co.CoReduceFunction;
-
-public class CoReduceInvokable<IN1, IN2, OUT> extends CoInvokable<IN1, IN2, OUT> {
- private static final long serialVersionUID = 1L;
-
- private CoReduceFunction<IN1, IN2, OUT> coReducer;
- private IN1 currentValue1 = null;
- private IN2 currentValue2 = null;
-
- public CoReduceInvokable(CoReduceFunction<IN1, IN2, OUT> coReducer) {
- super(coReducer);
- this.coReducer = coReducer;
- }
-
- @Override
- public void handleStream1() throws Exception {
- IN1 nextValue = reuse1.getObject();
- if (currentValue1 != null) {
- currentValue1 = coReducer.reduce1(currentValue1, nextValue);
- collector.collect(coReducer.map1(currentValue1));
- } else {
- currentValue1 = nextValue;
- collector.collect(coReducer.map1(nextValue));
- }
- }
-
- @Override
- public void handleStream2() throws Exception {
- IN2 nextValue = reuse2.getObject();
- if (currentValue2 != null) {
- currentValue2 = coReducer.reduce2(currentValue2, nextValue);
- collector.collect(coReducer.map2(currentValue2));
- } else {
- currentValue2 = nextValue;
- collector.collect(coReducer.map2(nextValue));
- }
- }
-
- @Override
- protected void coUSerFunction1() throws Exception {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- protected void coUserFunction2() throws Exception {
- // TODO Auto-generated method stub
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f149197f/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoStreamReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoStreamReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoStreamReduceInvokable.java
new file mode 100644
index 0000000..e4acb99
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoStreamReduceInvokable.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable.operator.co;
+
+import org.apache.flink.streaming.api.function.co.CoReduceFunction;
+
+public abstract class CoStreamReduceInvokable<IN1, IN2, OUT> extends CoInvokable<IN1, IN2, OUT> {
+ private static final long serialVersionUID = 1L;
+
+ protected CoReduceFunction<IN1, IN2, OUT> coReducer;
+ protected IN1 currentValue1 = null;
+ protected IN2 currentValue2 = null;
+ protected IN1 nextValue1 = null;
+ protected IN2 nextValue2 = null;
+
+ public CoStreamReduceInvokable(CoReduceFunction<IN1, IN2, OUT> coReducer) {
+ super(coReducer);
+ this.coReducer = coReducer;
+ currentValue1 = null;
+ currentValue2 = null;
+ }
+
+ @Override
+ public void handleStream1() throws Exception {
+ nextValue1 = reuse1.getObject();
+ callUserFunctionAndLogException1();
+ }
+
+ @Override
+ public void handleStream2() throws Exception {
+ nextValue2 = reuse2.getObject();
+ callUserFunctionAndLogException2();
+ }
+
+ @Override
+ protected void callUserFunction1() throws Exception {
+ if (currentValue1 != null) {
+ currentValue1 = coReducer.reduce1(currentValue1, nextValue1);
+ } else {
+ currentValue1 = nextValue1;
+ }
+ collector.collect(coReducer.map1(currentValue1));
+ }
+
+ @Override
+ protected void callUserFunction2() throws Exception {
+ if (currentValue2 != null) {
+ currentValue2 = coReducer.reduce2(currentValue2, nextValue2);
+ } else {
+ currentValue2 = nextValue2;
+ }
+ collector.collect(coReducer.map2(nextValue2));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f149197f/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupReduceTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupReduceTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupReduceTest.java
index e1d10b8..bf9a772 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupReduceTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupReduceTest.java
@@ -60,7 +60,7 @@ public class CoGroupReduceTest {
@SuppressWarnings("unchecked")
@Test
- public void coFlatMapTest() {
+ public void coGroupReduceTest() {
Tuple3<String, String, String> word1 = new Tuple3<String, String, String>("a", "word1", "b");
Tuple3<String, String, String> word2 = new Tuple3<String, String, String>("b", "word2", "a");
Tuple3<String, String, String> word3 = new Tuple3<String, String, String>("a", "word3", "a");