You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/08/18 19:26:23 UTC
[46/51] [abbrv] git commit: [streaming] Exception handling update &
minor refactor
[streaming] Exception handling update & minor refactor
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/35cf874c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/35cf874c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/35cf874c
Branch: refs/heads/master
Commit: 35cf874c36fc522e9fae6af4625901ee3c3bc9a3
Parents: e6766fd
Author: ghermann <re...@gmail.com>
Authored: Mon Aug 11 19:56:35 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Aug 18 16:23:43 2014 +0200
----------------------------------------------------------------------
.../streaming/connectors/rabbitmq/RMQSink.java | 2 +-
.../flink/streaming/api/StreamConfig.java | 10 ---
.../api/collector/DirectedStreamCollector.java | 6 +-
.../environment/RemoteStreamEnvironment.java | 6 +-
.../streaming/api/invokable/SinkInvokable.java | 9 ++-
.../api/invokable/SourceInvokable.java | 1 +
.../api/invokable/StreamComponentInvokable.java | 3 +-
.../api/invokable/StreamRecordInvokable.java | 20 ++++-
.../operator/BatchGroupReduceInvokable.java | 22 ++++--
.../operator/BatchReduceInvokable.java | 1 +
.../api/invokable/operator/FilterInvokable.java | 12 ++-
.../invokable/operator/FlatMapInvokable.java | 9 ++-
.../operator/GroupReduceInvokable.java | 15 +++-
.../api/invokable/operator/MapInvokable.java | 9 ++-
.../operator/StreamReduceInvokable.java | 9 ++-
.../operator/WindowGroupReduceInvokable.java | 17 +++-
.../api/invokable/operator/co/CoInvokable.java | 2 -
.../AbstractStreamComponent.java | 82 ++++++++++++++++----
.../api/streamcomponent/CoStreamTask.java | 33 +-------
.../streamcomponent/StreamIterationSource.java | 20 ++---
.../api/streamcomponent/StreamRecordWriter.java | 23 +++---
.../api/streamcomponent/StreamSource.java | 37 +--------
.../api/streamcomponent/StreamTask.java | 38 ++-------
.../flink/streaming/state/StateManager.java | 15 ++--
.../flink/streaming/util/ClusterUtil.java | 14 ++--
.../flink/streaming/util/TestDataUtil.java | 20 ++---
26 files changed, 229 insertions(+), 206 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/35cf874c/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
index 12b6341..4a89243 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
@@ -60,7 +60,7 @@ public abstract class RMQSink<IN> implements SinkFunction<IN> {
channel = connection.createChannel();
} catch (IOException e) {
- e.printStackTrace();
+ throw new RuntimeException(e);
}
initDone = true;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/35cf874c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
index 3d124c2..4dd4cc1 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
@@ -119,12 +119,6 @@ public class StreamConfig {
}
}
- // @SuppressWarnings("unchecked")
- // public <T extends StreamComponentInvokable> Class<? extends T>
- // getUserInvokableClass() {
- // return (Class<? extends T>) config.getClass(USER_FUNCTION, null);
- // }
-
public <T> StreamComponentInvokable<T> getUserInvokableObject() {
try {
return deserializeObject(config.getBytes(SERIALIZEDUDF, null));
@@ -156,10 +150,6 @@ public class StreamConfig {
}
}
- // public void setFunctionName(String functionName) {
- // config.setString(FUNCTION_NAME, functionName);
- // }
-
public String getFunctionName() {
return config.getString(FUNCTION_NAME, "");
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/35cf874c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedStreamCollector.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedStreamCollector.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedStreamCollector.java
index 285a7b4..9750d44 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedStreamCollector.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedStreamCollector.java
@@ -40,7 +40,7 @@ import org.apache.flink.util.StringUtils;
public class DirectedStreamCollector<OUT> extends StreamCollector<OUT> {
OutputSelector<OUT> outputSelector;
- private static final Log log = LogFactory.getLog(DirectedStreamCollector.class);
+ private static final Log LOG = LogFactory.getLog(DirectedStreamCollector.class);
private List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> emitted;
/**
@@ -97,8 +97,8 @@ public class DirectedStreamCollector<OUT> extends StreamCollector<OUT> {
}
}
} catch (Exception e) {
- if (log.isErrorEnabled()) {
- log.error(String.format("Emit to %s failed due to: %s", outputName,
+ if (LOG.isErrorEnabled()) {
+ LOG.error(String.format("Emit to %s failed due to: %s", outputName,
StringUtils.stringifyException(e)));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/35cf874c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
index 19a2d48..ec6bc6f 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
@@ -34,7 +34,7 @@ import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.jobgraph.JobGraph;
public class RemoteStreamEnvironment extends StreamExecutionEnvironment {
- private static final Log log = LogFactory.getLog(RemoteStreamEnvironment.class);
+ private static final Log LOG = LogFactory.getLog(RemoteStreamEnvironment.class);
private String host;
private int port;
@@ -72,8 +72,8 @@ public class RemoteStreamEnvironment extends StreamExecutionEnvironment {
@Override
public void execute() {
- if (log.isInfoEnabled()) {
- log.info("Running remotely at " + host + ":" + port);
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Running remotely at " + host + ":" + port);
}
JobGraph jobGraph = jobGraphBuilder.getJobGraph();
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/35cf874c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
index 887df8b..9219f80 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
@@ -34,7 +34,7 @@ public class SinkInvokable<IN> extends StreamRecordInvokable<IN, IN> {
@Override
protected void immutableInvoke() throws Exception {
while ((reuse = recordIterator.next(reuse)) != null) {
- sinkFunction.invoke((IN) reuse.getObject());
+ callUserFunctionAndLogException();
resetReuse();
}
}
@@ -42,8 +42,13 @@ public class SinkInvokable<IN> extends StreamRecordInvokable<IN, IN> {
@Override
protected void mutableInvoke() throws Exception {
while ((reuse = recordIterator.next(reuse)) != null) {
- sinkFunction.invoke((IN) reuse.getObject());
+ callUserFunctionAndLogException();
}
}
+ @Override
+ protected void callUserFunction() throws Exception {
+ sinkFunction.invoke((IN) reuse.getObject());
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/35cf874c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java
index a4be1e8..666427f 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java
@@ -35,6 +35,7 @@ public class SourceInvokable<OUT> extends StreamComponentInvokable<OUT> implemen
this.sourceFunction = sourceFunction;
}
+ @Override
public void invoke() throws Exception {
sourceFunction.invoke(collector);
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/35cf874c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamComponentInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamComponentInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamComponentInvokable.java
index ed718f1..4b49252 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamComponentInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamComponentInvokable.java
@@ -65,5 +65,6 @@ public abstract class StreamComponentInvokable<OUT> extends AbstractRichFunction
((RichFunction) userFunction).close();
}
}
-
+
+ public abstract void invoke() throws Exception;
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/35cf874c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamRecordInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamRecordInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamRecordInvokable.java
index 27dc05a..d4fc92c 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamRecordInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamRecordInvokable.java
@@ -21,11 +21,14 @@ package org.apache.flink.streaming.api.invokable;
import java.io.IOException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.streaming.api.streamrecord.StreamRecord;
import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
import org.apache.flink.util.Collector;
import org.apache.flink.util.MutableObjectIterator;
+import org.apache.flink.util.StringUtils;
public abstract class StreamRecordInvokable<IN, OUT> extends
StreamComponentInvokable<OUT> {
@@ -35,6 +38,7 @@ public abstract class StreamRecordInvokable<IN, OUT> extends
}
private static final long serialVersionUID = 1L;
+ private static final Log LOG = LogFactory.getLog(StreamComponentInvokable.class);
protected MutableObjectIterator<StreamRecord<IN>> recordIterator;
StreamRecordSerializer<IN> serializer;
@@ -59,7 +63,7 @@ public abstract class StreamRecordInvokable<IN, OUT> extends
try {
reuse = recordIterator.next(reuse);
} catch (IOException e) {
- e.printStackTrace();
+ throw new RuntimeException(e);
}
return reuse;
}
@@ -68,6 +72,20 @@ public abstract class StreamRecordInvokable<IN, OUT> extends
protected abstract void mutableInvoke() throws Exception;
+ protected abstract void callUserFunction() throws Exception;
+
+ protected void callUserFunctionAndLogException() {
+ try {
+ callUserFunction();
+ } catch (Exception e) {
+ if (LOG.isErrorEnabled()) {
+ LOG.error(String.format("Calling user function failed due to: %s",
+ StringUtils.stringifyException(e)));
+ }
+ }
+ }
+
+ @Override
public void invoke() throws Exception {
if (this.isMutable) {
mutableInvoke();
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/35cf874c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchGroupReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchGroupReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchGroupReduceInvokable.java
index c64bd33..327ddaf 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchGroupReduceInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchGroupReduceInvokable.java
@@ -27,6 +27,8 @@ import org.apache.flink.streaming.state.MutableTableState;
public class BatchGroupReduceInvokable<IN> extends BatchReduceInvokable<IN, IN> {
+ private static final long serialVersionUID = 1L;
+
int keyPosition;
protected ReduceFunction<IN> reducer;
private Iterator<StreamRecord<IN>> iterator;
@@ -40,18 +42,22 @@ public class BatchGroupReduceInvokable<IN> extends BatchReduceInvokable<IN, IN>
values = new MutableTableState<Object, IN>();
}
+ private IN reduced;
+ private IN nextValue;
+ private IN currentValue;
+
@Override
- protected void reduce() throws Exception {
+ protected void reduce() {
iterator = state.getStreamRecordIterator();
while (iterator.hasNext()) {
StreamRecord<IN> nextRecord = iterator.next();
- IN nextValue = nextRecord.getObject();
+ nextValue = nextRecord.getObject();
Object key = nextRecord.getField(keyPosition);
- IN currentValue = values.get(key);
+ currentValue = values.get(key);
if (currentValue != null) {
- IN reduced = reducer.reduce(currentValue, nextValue);
+ callUserFunctionAndLogException();
values.put(key, reduced);
collector.collect(reduced);
} else {
@@ -59,12 +65,12 @@ public class BatchGroupReduceInvokable<IN> extends BatchReduceInvokable<IN, IN>
collector.collect(nextValue);
}
}
- System.out.println(values);
values.clear();
- System.out.println(values);
-
}
- private static final long serialVersionUID = 1L;
+ @Override
+ protected void callUserFunction() throws Exception {
+ reduced = reducer.reduce(currentValue, nextValue);
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/35cf874c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java
index 3d1d813..a6516f8 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java
@@ -71,4 +71,5 @@ public class BatchReduceInvokable<IN, OUT> extends StreamReduceInvokable<IN, OUT
}
}
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/35cf874c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java
index 388920c..9e0edd7 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java
@@ -33,10 +33,13 @@ public class FilterInvokable<IN> extends UserTaskInvokable<IN, IN> {
this.filterFunction = filterFunction;
}
+ private boolean canCollect;
+
@Override
protected void immutableInvoke() throws Exception {
while ((reuse = recordIterator.next(reuse)) != null) {
- if (filterFunction.filter(reuse.getObject())) {
+ callUserFunctionAndLogException();
+ if (canCollect) {
collector.collect(reuse.getObject());
}
resetReuse();
@@ -46,10 +49,15 @@ public class FilterInvokable<IN> extends UserTaskInvokable<IN, IN> {
@Override
protected void mutableInvoke() throws Exception {
while ((reuse = recordIterator.next(reuse)) != null) {
- if (filterFunction.filter(reuse.getObject())) {
+ callUserFunctionAndLogException();
+ if (canCollect) {
collector.collect(reuse.getObject());
}
}
}
+ @Override
+ protected void callUserFunction() throws Exception {
+ canCollect = filterFunction.filter(reuse.getObject());
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/35cf874c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java
index 4cb4712..0b4b4d1 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java
@@ -35,7 +35,7 @@ public class FlatMapInvokable<IN, OUT> extends UserTaskInvokable<IN, OUT> {
@Override
protected void immutableInvoke() throws Exception {
while ((reuse = recordIterator.next(reuse)) != null) {
- flatMapper.flatMap(reuse.getObject(), collector);
+ callUserFunctionAndLogException();
resetReuse();
}
}
@@ -43,8 +43,13 @@ public class FlatMapInvokable<IN, OUT> extends UserTaskInvokable<IN, OUT> {
@Override
protected void mutableInvoke() throws Exception {
while ((reuse = recordIterator.next(reuse)) != null) {
- flatMapper.flatMap(reuse.getObject(), collector);
+ callUserFunctionAndLogException();
}
}
+ @Override
+ protected void callUserFunction() throws Exception {
+ flatMapper.flatMap(reuse.getObject(), collector);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/35cf874c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupReduceInvokable.java
index 277078b..43ab483 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupReduceInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupReduceInvokable.java
@@ -52,12 +52,16 @@ public class GroupReduceInvokable<IN> extends UserTaskInvokable<IN, IN> {
}
}
+ private IN reduced;
+ private IN nextValue;
+ private IN currentValue;
+
private void reduce() throws Exception {
Object key = reuse.getField(keyPosition);
- IN currentValue = values.get(key);
- IN nextValue = reuse.getObject();
+ currentValue = values.get(key);
+ nextValue = reuse.getObject();
if (currentValue != null) {
- IN reduced = reducer.reduce(currentValue, nextValue);
+ callUserFunctionAndLogException();
values.put(key, reduced);
collector.collect(reduced);
} else {
@@ -66,4 +70,9 @@ public class GroupReduceInvokable<IN> extends UserTaskInvokable<IN, IN> {
}
}
+ @Override
+ protected void callUserFunction() throws Exception {
+ reduced = reducer.reduce(currentValue, nextValue);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/35cf874c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java
index 53e85e0..2e501ca 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java
@@ -35,7 +35,7 @@ public class MapInvokable<IN, OUT> extends UserTaskInvokable<IN, OUT> {
@Override
protected void immutableInvoke() throws Exception {
while ((reuse = recordIterator.next(reuse)) != null) {
- collector.collect(mapper.map(reuse.getObject()));
+ callUserFunctionAndLogException();
resetReuse();
}
}
@@ -43,7 +43,12 @@ public class MapInvokable<IN, OUT> extends UserTaskInvokable<IN, OUT> {
@Override
protected void mutableInvoke() throws Exception {
while ((reuse = recordIterator.next(reuse)) != null) {
- collector.collect(mapper.map(reuse.getObject()));
+ callUserFunctionAndLogException();
}
}
+
+ @Override
+ protected void callUserFunction() throws Exception {
+ collector.collect(mapper.map(reuse.getObject()));
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/35cf874c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java
index f6515dc..c73bb95 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java
@@ -71,11 +71,16 @@ public abstract class StreamReduceInvokable<IN, OUT> extends UserTaskInvokable<I
}
}
- protected void reduce() throws Exception {
+ protected void reduce() {
userIterator = state.getIterator();
+ callUserFunctionAndLogException();
+ }
+
+ @Override
+ protected void callUserFunction() throws Exception {
reducer.reduce(userIterable, collector);
}
-
+
private void collectOneUnit() {
ArrayList<StreamRecord<IN>> list;
list = new ArrayList<StreamRecord<IN>>(listSize);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/35cf874c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowGroupReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowGroupReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowGroupReduceInvokable.java
index 065df25..ac57220 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowGroupReduceInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowGroupReduceInvokable.java
@@ -40,18 +40,22 @@ public class WindowGroupReduceInvokable<IN> extends WindowReduceInvokable<IN, IN
values = new MutableTableState<Object, IN>();
}
+ private IN reduced;
+ private IN nextValue;
+ private IN currentValue;
+
@Override
- protected void reduce() throws Exception {
+ protected void reduce() {
iterator = state.getStreamRecordIterator();
while (iterator.hasNext()) {
StreamRecord<IN> nextRecord = iterator.next();
- IN nextValue = nextRecord.getObject();
+ nextValue = nextRecord.getObject();
Object key = nextRecord.getField(keyPosition);
- IN currentValue = values.get(key);
+ currentValue = values.get(key);
if (currentValue != null) {
- IN reduced = reducer.reduce(currentValue, nextValue);
+ callUserFunctionAndLogException();
values.put(key, reduced);
collector.collect(reduced);
} else {
@@ -62,6 +66,11 @@ public class WindowGroupReduceInvokable<IN> extends WindowReduceInvokable<IN, IN
values.clear();
}
+ @Override
+ protected void callUserFunction() throws Exception {
+ reduced = reducer.reduce(currentValue, nextValue);
+ }
+
private static final long serialVersionUID = 1L;
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/35cf874c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
index 1e8623c..8e002ee 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
@@ -65,6 +65,4 @@ public abstract class CoInvokable<IN1, IN2, OUT> extends
this.reuse1 = serializer1.createInstance();
this.reuse2 = serializer2.createInstance();
}
-
- public abstract void invoke() throws Exception;
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/35cf874c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/AbstractStreamComponent.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/AbstractStreamComponent.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/AbstractStreamComponent.java
index c973894..9171a00 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/AbstractStreamComponent.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/AbstractStreamComponent.java
@@ -20,6 +20,7 @@
package org.apache.flink.streaming.api.streamcomponent;
import java.io.IOException;
+import java.util.LinkedList;
import java.util.List;
import org.apache.commons.lang.SerializationUtils;
@@ -54,7 +55,8 @@ public abstract class AbstractStreamComponent<OUT> extends AbstractInvokable {
protected TypeInformation<OUT> outTypeInfo = null;
protected StreamRecordSerializer<OUT> outSerializer = null;
protected SerializationDelegate<StreamRecord<OUT>> outSerializationDelegate = null;
-
+ protected OutputHandler outputHandler = createEmptyOutputHandler();
+
protected StreamConfig configuration;
protected TypeSerializerWrapper<?, ?, OUT> typeWrapper;
protected StreamCollector<OUT> collector;
@@ -79,6 +81,71 @@ public abstract class AbstractStreamComponent<OUT> extends AbstractInvokable {
setCollector();
}
+ protected class OutputHandler {
+ private List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> outputs;
+
+ public OutputHandler() {
+ this.outputs = new LinkedList<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>>();
+ }
+
+ public List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> getOutputs() {
+ return outputs;
+ }
+
+ public void setConfigOutputs() {
+ setSerializers();
+ setCollector();
+
+ int numberOfOutputs = configuration.getNumberOfOutputs();
+ bufferTimeout = configuration.getBufferTimeout();
+
+ for (int i = 0; i < numberOfOutputs; i++) {
+ setPartitioner(i, outputs);
+ }
+ }
+
+ public void flushOutputs() throws IOException, InterruptedException {
+ for (RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output : outputs) {
+ output.flush();
+ }
+ }
+
+ public void initializeOutputSerializers() {
+ for (RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output : outputHandler.getOutputs()) {
+ output.initializeSerializers();
+ }
+ }
+
+ public void invokeUserFunction(String componentTypeName, StreamComponentInvokable<OUT> userInvokable) throws IOException, InterruptedException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(componentTypeName + " " + name + " invoked with instance id "
+ + instanceID);
+ }
+
+ initializeOutputSerializers();
+
+ try {
+ userInvokable.open(getTaskConfiguration());
+ userInvokable.invoke();
+ userInvokable.close();
+ } catch (Exception e) {
+ flushOutputs();
+ throw new RuntimeException(e);
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(componentTypeName + " " + name + " invoke finished with instance id "
+ + instanceID);
+ }
+
+ flushOutputs();
+ }
+ }
+
+ private OutputHandler createEmptyOutputHandler() {
+ return new OutputHandler();
+ }
+
protected void initialize() {
this.configuration = new StreamConfig(getTaskConfiguration());
this.name = configuration.getComponentName();
@@ -111,19 +178,6 @@ public abstract class AbstractStreamComponent<OUT> extends AbstractInvokable {
outSerializationDelegate.setInstance(outSerializer.createInstance());
}
- protected void setConfigOutputs(
- List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> outputs) {
- setSerializers();
- setCollector();
-
- int numberOfOutputs = configuration.getNumberOfOutputs();
- bufferTimeout = configuration.getBufferTimeout();
-
- for (int i = 0; i < numberOfOutputs; i++) {
- setPartitioner(i, outputs);
- }
- }
-
private void setPartitioner(int outputNumber,
List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> outputs) {
StreamPartitioner<OUT> outputPartitioner = null;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/35cf874c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/CoStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/CoStreamTask.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/CoStreamTask.java
index c06e664..a595cfa 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/CoStreamTask.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/CoStreamTask.java
@@ -20,19 +20,13 @@
package org.apache.flink.streaming.api.streamcomponent;
import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.runtime.io.network.api.MutableReader;
import org.apache.flink.runtime.io.network.api.MutableRecordReader;
import org.apache.flink.runtime.io.network.api.MutableUnionRecordReader;
-import org.apache.flink.runtime.io.network.api.RecordWriter;
-import org.apache.flink.runtime.plugable.SerializationDelegate;
import org.apache.flink.streaming.api.function.co.CoMapFunction;
import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable;
import org.apache.flink.streaming.api.streamrecord.StreamRecord;
@@ -42,7 +36,6 @@ import org.apache.flink.util.MutableObjectIterator;
public class CoStreamTask<IN1 extends Tuple, IN2 extends Tuple, OUT extends Tuple> extends
AbstractStreamComponent<OUT> {
- private static final Log LOG = LogFactory.getLog(CoStreamTask.class);
protected StreamRecordSerializer<IN1> inputDeserializer1 = null;
protected StreamRecordSerializer<IN2> inputDeserializer2 = null;
@@ -52,13 +45,11 @@ public class CoStreamTask<IN1 extends Tuple, IN2 extends Tuple, OUT extends Tupl
MutableObjectIterator<StreamRecord<IN1>> inputIter1;
MutableObjectIterator<StreamRecord<IN2>> inputIter2;
- private List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> outputs;
private CoInvokable<IN1, IN2, OUT> userInvokable;
private static int numTasks;
public CoStreamTask() {
-
- outputs = new LinkedList<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>>();
+ outputHandler = new OutputHandler();
userInvokable = null;
numTasks = newComponent();
instanceID = numTasks;
@@ -94,7 +85,7 @@ public class CoStreamTask<IN1 extends Tuple, IN2 extends Tuple, OUT extends Tupl
@Override
public void setInputsOutputs() {
- setConfigOutputs(outputs);
+ outputHandler.setConfigOutputs();
setConfigInputs();
inputIter1 = createInputIterator(inputs1, inputDeserializer1);
@@ -148,25 +139,7 @@ public class CoStreamTask<IN1 extends Tuple, IN2 extends Tuple, OUT extends Tupl
@Override
public void invoke() throws Exception {
- if (LOG.isDebugEnabled()) {
- LOG.debug("TASK " + name + " invoked with instance id " + instanceID);
- }
-
- for (RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output : outputs) {
- output.initializeSerializers();
- }
-
- userInvokable.open(getTaskConfiguration());
- userInvokable.invoke();
- userInvokable.close();
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("TASK " + name + " invoke finished with instance id " + instanceID);
- }
-
- for (RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output : outputs) {
- output.flush();
- }
+ outputHandler.invokeUserFunction("CO-TASK", userInvokable);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/35cf874c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSource.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSource.java
index 2514eb0..8f32dd7 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSource.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSource.java
@@ -19,8 +19,6 @@
package org.apache.flink.streaming.api.streamcomponent;
-import java.util.LinkedList;
-import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
@@ -37,7 +35,6 @@ public class StreamIterationSource<OUT extends Tuple> extends
private static final Log LOG = LogFactory.getLog(StreamIterationSource.class);
- private List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> outputs;
private static int numSources;
private String iterationId;
@SuppressWarnings("rawtypes")
@@ -48,7 +45,7 @@ public class StreamIterationSource<OUT extends Tuple> extends
@SuppressWarnings("rawtypes")
public StreamIterationSource() {
- outputs = new LinkedList<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>>();
+ outputHandler = new OutputHandler();
numSources = newComponent();
instanceID = numSources;
dataChannel = new ArrayBlockingQueue<StreamRecord>(1);
@@ -57,9 +54,8 @@ public class StreamIterationSource<OUT extends Tuple> extends
@Override
public void setInputsOutputs() {
try {
- setConfigOutputs(outputs);
+ outputHandler.setConfigOutputs();
} catch (StreamComponentException e) {
- e.printStackTrace();
throw new StreamComponentException("Cannot register outputs", e);
}
@@ -82,9 +78,8 @@ public class StreamIterationSource<OUT extends Tuple> extends
LOG.debug("SOURCE " + name + " invoked with instance id " + instanceID);
}
- for (RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output : outputs) {
- output.initializeSerializers();
- }
+ outputHandler.initializeOutputSerializers();
+
StreamRecord<OUT> nextRecord;
while (true) {
@@ -96,16 +91,13 @@ public class StreamIterationSource<OUT extends Tuple> extends
if (nextRecord == null) {
break;
}
- for (RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output : outputs) {
+ for (RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output : outputHandler.getOutputs()) {
outSerializationDelegate.setInstance(nextRecord);
output.emit(outSerializationDelegate);
}
}
- for (RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output : outputs) {
- output.flush();
- }
-
+ outputHandler.flushOutputs();
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/35cf874c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamRecordWriter.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamRecordWriter.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamRecordWriter.java
index a89935a..f695cb1 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamRecordWriter.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamRecordWriter.java
@@ -12,7 +12,8 @@ import org.apache.flink.runtime.io.network.serialization.RecordSerializer;
import org.apache.flink.runtime.io.network.serialization.SpanningRecordSerializer;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-public class StreamRecordWriter<T extends IOReadableWritable> extends RecordWriter<T> {
+public class StreamRecordWriter<T extends IOReadableWritable> extends
+ RecordWriter<T> {
private final BufferProvider bufferPool;
@@ -31,12 +32,13 @@ public class StreamRecordWriter<T extends IOReadableWritable> extends RecordWrit
this(invokable, new RoundRobinChannelSelector<T>(), 1000);
}
- public StreamRecordWriter(AbstractInvokable invokable, ChannelSelector<T> channelSelector) {
+ public StreamRecordWriter(AbstractInvokable invokable,
+ ChannelSelector<T> channelSelector) {
this(invokable, channelSelector, 1000);
}
- public StreamRecordWriter(AbstractInvokable invokable, ChannelSelector<T> channelSelector,
- long timeout) {
+ public StreamRecordWriter(AbstractInvokable invokable,
+ ChannelSelector<T> channelSelector, long timeout) {
// initialize the gate
super(invokable);
@@ -60,20 +62,24 @@ public class StreamRecordWriter<T extends IOReadableWritable> extends RecordWrit
@Override
public void emit(final T record) throws IOException, InterruptedException {
- for (int targetChannel : this.channelSelector.selectChannels(record, this.numChannels)) {
+ for (int targetChannel : this.channelSelector.selectChannels(record,
+ this.numChannels)) {
// serialize with corresponding serializer and send full buffer
RecordSerializer<T> serializer = this.serializers[targetChannel];
synchronized (serializer) {
- RecordSerializer.SerializationResult result = serializer.addRecord(record);
+ RecordSerializer.SerializationResult result = serializer
+ .addRecord(record);
while (result.isFullBuffer()) {
Buffer buffer = serializer.getCurrentBuffer();
if (buffer != null) {
sendBuffer(buffer, targetChannel);
}
- buffer = this.bufferPool.requestBufferBlocking(this.bufferPool.getBufferSize());
+ buffer = this.bufferPool
+ .requestBufferBlocking(this.bufferPool
+ .getBufferSize());
result = serializer.setNextBuffer(buffer);
}
}
@@ -105,9 +111,8 @@ public class StreamRecordWriter<T extends IOReadableWritable> extends RecordWrit
Thread.sleep(timeout);
flush();
} catch (Exception e) {
- e.printStackTrace();
+ throw new RuntimeException(e);
}
-
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/35cf874c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSource.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSource.java
index 6644d6f..494dfc1 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSource.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSource.java
@@ -19,28 +19,16 @@
package org.apache.flink.streaming.api.streamcomponent;
-import java.util.LinkedList;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.runtime.io.network.api.RecordWriter;
-import org.apache.flink.runtime.plugable.SerializationDelegate;
import org.apache.flink.streaming.api.invokable.SourceInvokable;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
public class StreamSource<OUT extends Tuple> extends SingleInputAbstractStreamComponent<Tuple, OUT> {
- private static final Log LOG = LogFactory.getLog(StreamSource.class);
-
- private List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> outputs;
private SourceInvokable<OUT> userInvokable;
private static int numSources;
public StreamSource() {
-
- outputs = new LinkedList<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>>();
+ outputHandler = new OutputHandler();
userInvokable = null;
numSources = newComponent();
instanceID = numSources;
@@ -49,7 +37,7 @@ public class StreamSource<OUT extends Tuple> extends SingleInputAbstractStreamCo
@Override
public void setInputsOutputs() {
try {
- setConfigOutputs(outputs);
+ outputHandler.setConfigOutputs();
} catch (StreamComponentException e) {
throw new StreamComponentException("Cannot register outputs for "
+ getClass().getSimpleName(), e);
@@ -59,31 +47,12 @@ public class StreamSource<OUT extends Tuple> extends SingleInputAbstractStreamCo
@Override
protected void setInvokable() {
userInvokable = getInvokable();
- // setCollector();
userInvokable.setCollector(collector);
}
@Override
public void invoke() throws Exception {
- if (LOG.isDebugEnabled()) {
- LOG.debug("SOURCE " + name + " invoked with instance id " + instanceID);
- }
-
- for (RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output : outputs) {
- output.initializeSerializers();
- }
-
- userInvokable.open(getTaskConfiguration());
- userInvokable.invoke();
- userInvokable.close();
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("SOURCE " + name + " invoke finished with instance id " + instanceID);
- }
-
- for (RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output : outputs) {
- output.flush();
- }
+ outputHandler.invokeUserFunction("SOURCE", userInvokable);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/35cf874c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamTask.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamTask.java
index f809dae..32c10b6 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamTask.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamTask.java
@@ -19,28 +19,17 @@
package org.apache.flink.streaming.api.streamcomponent;
-import java.util.LinkedList;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.runtime.io.network.api.RecordWriter;
-import org.apache.flink.runtime.plugable.SerializationDelegate;
import org.apache.flink.streaming.api.invokable.StreamRecordInvokable;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
public class StreamTask<IN extends Tuple, OUT extends Tuple> extends
SingleInputAbstractStreamComponent<IN, OUT> {
- private static final Log LOG = LogFactory.getLog(StreamTask.class);
-
- private List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> outputs;
private StreamRecordInvokable<IN, OUT> userInvokable;
private static int numTasks;
public StreamTask() {
- outputs = new LinkedList<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>>();
+ outputHandler = new OutputHandler();
userInvokable = null;
numTasks = newComponent();
instanceID = numTasks;
@@ -49,7 +38,7 @@ public class StreamTask<IN extends Tuple, OUT extends Tuple> extends
@Override
public void setInputsOutputs() {
setConfigInputs();
- setConfigOutputs(outputs);
+ outputHandler.setConfigOutputs();
inputIter = createInputIterator(inputs, inputSerializer);
}
@@ -57,29 +46,12 @@ public class StreamTask<IN extends Tuple, OUT extends Tuple> extends
@Override
protected void setInvokable() {
userInvokable = getInvokable();
- userInvokable.initialize(collector, inputIter, inputSerializer, isMutable);
+ userInvokable.initialize(collector, inputIter, inputSerializer,
+ isMutable);
}
@Override
public void invoke() throws Exception {
- if (LOG.isDebugEnabled()) {
- LOG.debug("TASK " + name + " invoked with instance id " + instanceID);
- }
-
- for (RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output : outputs) {
- output.initializeSerializers();
- }
-
- userInvokable.open(getTaskConfiguration());
- userInvokable.invoke();
- userInvokable.close();
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("TASK " + name + " invoke finished with instance id " + instanceID);
- }
-
- for (RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output : outputs) {
- output.flush();
- }
+ outputHandler.invokeUserFunction("TASK", userInvokable);
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/35cf874c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/StateManager.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/StateManager.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/StateManager.java
index 46089f9..17d911a 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/StateManager.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/StateManager.java
@@ -47,15 +47,12 @@ public class StateManager implements Runnable, Serializable {
ObjectInputStream ois = null;
try {
ois = new ObjectInputStream(new FileInputStream(filename));
- } catch (Exception e) {
- e.printStackTrace();
- }
- for (Object state : stateList) {
- try {
+ for (Object state : stateList) {
state = ois.readObject();
- } catch (Exception e) {
- e.printStackTrace();
}
+ ois.close();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
}
}
@@ -67,7 +64,7 @@ public class StateManager implements Runnable, Serializable {
try {
oos = new ObjectOutputStream(new FileOutputStream(filename));
} catch (Exception e) {
- e.printStackTrace();
+ throw new RuntimeException(e);
}
// take snapshot of every registered state.
while (true) {
@@ -78,7 +75,7 @@ public class StateManager implements Runnable, Serializable {
oos.flush();
}
} catch (Exception e) {
- e.printStackTrace();
+ throw new RuntimeException(e);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/35cf874c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
index 3ea5b39..fc5978e 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
@@ -30,7 +30,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.JobGraph;
public class ClusterUtil {
- private static final Log log = LogFactory.getLog(ClusterUtil.class);
+ private static final Log LOG = LogFactory.getLog(ClusterUtil.class);
/**
* Executes the given JobGraph locally, on a NepheleMiniCluster
@@ -51,8 +51,8 @@ public class ClusterUtil {
exec.setNumTaskTracker(numberOfTaskTrackers);
Client client = new Client(new InetSocketAddress("localhost", 6498), configuration);
- if (log.isInfoEnabled()) {
- log.info("Running on mini cluster");
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Running on mini cluster");
}
try {
@@ -62,7 +62,7 @@ public class ClusterUtil {
exec.stop();
} catch (Exception e) {
- throw new RuntimeException(e.getMessage());
+ throw new RuntimeException(e);
}
}
@@ -71,8 +71,8 @@ public class ClusterUtil {
}
public static void runOnLocalCluster(JobGraph jobGraph, String IP, int port) {
- if (log.isInfoEnabled()) {
- log.info("Running on mini cluster");
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Running on mini cluster");
}
Configuration configuration = jobGraph.getJobConfiguration();
@@ -82,7 +82,7 @@ public class ClusterUtil {
try {
client.run(jobGraph, true);
} catch (ProgramInvocationException e) {
- throw new RuntimeException(e.getMessage());
+ throw new RuntimeException(e);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/35cf874c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/TestDataUtil.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/TestDataUtil.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/TestDataUtil.java
index 44e0aa7..b9e3c4e 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/TestDataUtil.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/TestDataUtil.java
@@ -39,7 +39,7 @@ public class TestDataUtil {
// TODO: Exception handling
// TODO: check checksum after download
- private static final Log log = LogFactory.getLog(TestDataUtil.class);
+ private static final Log LOG = LogFactory.getLog(TestDataUtil.class);
public static final String testDataDir = "src/test/resources/testdata/";
public static final String testRepoUrl = "http://info.ilab.sztaki.hu/~mbalassi/flink-streaming/testdata/";
public static final String testChekSumDir = "src/test/resources/testdata_checksum/";
@@ -67,8 +67,8 @@ public class TestDataUtil {
}
if (file.exists()) {
- if (log.isInfoEnabled()) {
- log.info(fileName + " already exists.");
+ if (LOG.isInfoEnabled()) {
+ LOG.info(fileName + " already exists.");
}
try {
@@ -78,16 +78,16 @@ public class TestDataUtil {
+ file.getAbsolutePath(), e);
}
if (!checkSumActaul.equals(checkSumDesired)) {
- if (log.isInfoEnabled()) {
- log.info("Checksum is incorrect.");
- log.info("Downloading file.");
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Checksum is incorrect.");
+ LOG.info("Downloading file.");
}
download(fileName);
}
} else {
- if (log.isInfoEnabled()) {
- log.info("File does not exist.");
- log.info("Downloading file.");
+ if (LOG.isInfoEnabled()) {
+ LOG.info("File does not exist.");
+ LOG.info("Downloading file.");
}
download(fileName);
}
@@ -95,7 +95,7 @@ public class TestDataUtil {
}
public static void download(String fileName) {
- log.info("downloading " + fileName);
+ LOG.info("downloading " + fileName);
try {
URL website = new URL(testRepoUrl + fileName);