You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/08/29 21:03:59 UTC
[26/28] git commit: [streaming] StreamInvokable refactor and javadoc
update + StreamRecordSerializer update
[streaming] StreamInvokable refactor and javadoc update + StreamRecordSerializer update
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/723cb27c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/723cb27c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/723cb27c
Branch: refs/heads/master
Commit: 723cb27c3402d7b2524fadc0697d6bc3a175d58e
Parents: 0731f77
Author: gyfora <gy...@gmail.com>
Authored: Wed Aug 27 17:57:00 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Aug 29 21:01:57 2014 +0200
----------------------------------------------------------------------
.../flink/streaming/api/JobGraphBuilder.java | 14 +--
.../flink/streaming/api/StreamConfig.java | 4 +-
.../api/collector/DirectedStreamCollector.java | 2 +-
.../api/collector/StreamCollector.java | 2 +-
.../streaming/api/datastream/DataStream.java | 4 +-
.../streaming/api/invokable/SinkInvokable.java | 2 +-
.../api/invokable/SourceInvokable.java | 2 +-
.../api/invokable/StreamComponentInvokable.java | 68 -----------
.../api/invokable/StreamInvokable.java | 79 ++++++++++++
.../api/invokable/StreamOperatorInvokable.java | 119 +++++++++++++++++++
.../api/invokable/StreamRecordInvokable.java | 82 -------------
.../api/invokable/UserTaskInvokable.java | 32 -----
.../operator/BatchReduceInvokable.java | 4 +-
.../api/invokable/operator/FilterInvokable.java | 4 +-
.../invokable/operator/FlatMapInvokable.java | 4 +-
.../api/invokable/operator/MapInvokable.java | 4 +-
.../operator/StreamReduceInvokable.java | 4 +-
.../api/invokable/operator/co/CoInvokable.java | 4 +-
.../AbstractStreamComponent.java | 4 +-
.../api/streamcomponent/OutputHandler.java | 4 +-
.../api/streamcomponent/StreamSink.java | 4 +-
.../api/streamcomponent/StreamTask.java | 4 +-
.../api/streamrecord/StreamRecord.java | 14 ++-
.../streamrecord/StreamRecordSerializer.java | 38 +-----
.../flink/streaming/util/MockInvokable.java | 4 +-
25 files changed, 253 insertions(+), 253 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/723cb27c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
index e41abd6..77c860b 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
@@ -36,8 +36,8 @@ import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.streaming.api.collector.OutputSelector;
import org.apache.flink.streaming.api.invokable.SinkInvokable;
import org.apache.flink.streaming.api.invokable.SourceInvokable;
-import org.apache.flink.streaming.api.invokable.StreamComponentInvokable;
-import org.apache.flink.streaming.api.invokable.UserTaskInvokable;
+import org.apache.flink.streaming.api.invokable.StreamInvokable;
+import org.apache.flink.streaming.api.invokable.StreamOperatorInvokable;
import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable;
import org.apache.flink.streaming.api.streamcomponent.CoStreamTask;
import org.apache.flink.streaming.api.streamcomponent.StreamIterationSink;
@@ -68,7 +68,7 @@ public class JobGraphBuilder {
private Map<String, List<String>> inEdgeList;
private Map<String, List<StreamPartitioner<?>>> connectionTypes;
private Map<String, String> operatorNames;
- private Map<String, StreamComponentInvokable<?>> invokableObjects;
+ private Map<String, StreamInvokable<?>> invokableObjects;
private Map<String, TypeSerializerWrapper<?>> typeWrapperIn1;
private Map<String, TypeSerializerWrapper<?>> typeWrapperIn2;
private Map<String, TypeSerializerWrapper<?>> typeWrapperOut1;
@@ -110,7 +110,7 @@ public class JobGraphBuilder {
inEdgeList = new HashMap<String, List<String>>();
connectionTypes = new HashMap<String, List<StreamPartitioner<?>>>();
operatorNames = new HashMap<String, String>();
- invokableObjects = new HashMap<String, StreamComponentInvokable<?>>();
+ invokableObjects = new HashMap<String, StreamInvokable<?>>();
typeWrapperIn1 = new HashMap<String, TypeSerializerWrapper<?>>();
typeWrapperIn2 = new HashMap<String, TypeSerializerWrapper<?>>();
typeWrapperOut1 = new HashMap<String, TypeSerializerWrapper<?>>();
@@ -232,7 +232,7 @@ public class JobGraphBuilder {
* Number of parallel instances created
*/
public <IN, OUT> void addTask(String componentName,
- UserTaskInvokable<IN, OUT> taskInvokableObject,
+ StreamOperatorInvokable<IN, OUT> taskInvokableObject,
TypeSerializerWrapper<?> inTypeWrapper,
TypeSerializerWrapper<?> outTypeWrapper, String operatorName,
byte[] serializedFunction, int parallelism) {
@@ -348,7 +348,7 @@ public class JobGraphBuilder {
*/
private void addComponent(String componentName,
Class<? extends AbstractInvokable> componentClass,
- StreamComponentInvokable<?> invokableObject, String operatorName,
+ StreamInvokable<?> invokableObject, String operatorName,
byte[] serializedFunction, int parallelism) {
componentClasses.put(componentName, componentClass);
@@ -387,7 +387,7 @@ public class JobGraphBuilder {
// Get vertex attributes
Class<? extends AbstractInvokable> componentClass = componentClasses
.get(componentName);
- StreamComponentInvokable<?> invokableObject = invokableObjects
+ StreamInvokable<?> invokableObject = invokableObjects
.get(componentName);
String operatorName = operatorNames.get(componentName);
byte[] serializedFunction = serializedFunctions.get(componentName);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/723cb27c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
index 51dc4e3..445020a 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
@@ -26,7 +26,7 @@ import org.apache.commons.lang3.SerializationUtils;
import org.apache.flink.api.common.functions.AbstractRichFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.collector.OutputSelector;
-import org.apache.flink.streaming.api.invokable.StreamComponentInvokable;
+import org.apache.flink.streaming.api.invokable.StreamInvokable;
import org.apache.flink.streaming.api.streamcomponent.StreamComponentException;
import org.apache.flink.streaming.partitioner.ShufflePartitioner;
import org.apache.flink.streaming.partitioner.StreamPartitioner;
@@ -144,7 +144,7 @@ public class StreamConfig {
return config.getLong(BUFFER_TIMEOUT, DEFAULT_TIMEOUT);
}
- public void setUserInvokable(StreamComponentInvokable<?> invokableObject) {
+ public void setUserInvokable(StreamInvokable<?> invokableObject) {
if (invokableObject != null) {
config.setClass(USER_FUNCTION, invokableObject.getClass());
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/723cb27c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedStreamCollector.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedStreamCollector.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedStreamCollector.java
index 82f3c50..162c5df 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedStreamCollector.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedStreamCollector.java
@@ -82,7 +82,7 @@ public class DirectedStreamCollector<OUT> extends StreamCollector<OUT> {
*/
private void emit(StreamRecord<OUT> streamRecord) {
Collection<String> outputNames = outputSelector.getOutputs(streamRecord.getObject());
- streamRecord.setId(channelID);
+ streamRecord.newId(channelID);
serializationDelegate.setInstance(streamRecord);
emitted.clear();
for (String outputName : outputNames) {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/723cb27c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamCollector.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamCollector.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamCollector.java
index cd29b01..65098d4 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamCollector.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamCollector.java
@@ -117,7 +117,7 @@ public class StreamCollector<OUT> implements Collector<OUT> {
* StreamRecord to emit.
*/
private void emit(StreamRecord<OUT> streamRecord) {
- streamRecord.setId(channelID);
+ streamRecord.newId(channelID);
serializationDelegate.setInstance(streamRecord);
for (RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output : outputs) {
try {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/723cb27c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 5d5909c..ead9c35 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -43,7 +43,7 @@ import org.apache.flink.streaming.api.function.sink.WriteFormatAsText;
import org.apache.flink.streaming.api.function.sink.WriteSinkFunctionByBatches;
import org.apache.flink.streaming.api.function.sink.WriteSinkFunctionByMillis;
import org.apache.flink.streaming.api.invokable.SinkInvokable;
-import org.apache.flink.streaming.api.invokable.UserTaskInvokable;
+import org.apache.flink.streaming.api.invokable.StreamOperatorInvokable;
import org.apache.flink.streaming.api.invokable.operator.BatchReduceInvokable;
import org.apache.flink.streaming.api.invokable.operator.FilterInvokable;
import org.apache.flink.streaming.api.invokable.operator.FlatMapInvokable;
@@ -836,7 +836,7 @@ public abstract class DataStream<OUT> {
String functionName, final Function function,
TypeSerializerWrapper<OUT> inTypeWrapper,
TypeSerializerWrapper<R> outTypeWrapper,
- UserTaskInvokable<OUT, R> functionInvokable) {
+ StreamOperatorInvokable<OUT, R> functionInvokable) {
DataStream<OUT> inputStream = this.copy();
@SuppressWarnings({ "unchecked", "rawtypes" })
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/723cb27c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
index 1353a5a..75a0c34 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
@@ -19,7 +19,7 @@ package org.apache.flink.streaming.api.invokable;
import org.apache.flink.streaming.api.function.sink.SinkFunction;
-public class SinkInvokable<IN> extends StreamRecordInvokable<IN, IN> {
+public class SinkInvokable<IN> extends StreamOperatorInvokable<IN, IN> {
private static final long serialVersionUID = 1L;
private SinkFunction<IN> sinkFunction;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/723cb27c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java
index 6411878..ad75157 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java
@@ -21,7 +21,7 @@ import java.io.Serializable;
import org.apache.flink.streaming.api.function.source.SourceFunction;
-public class SourceInvokable<OUT> extends StreamComponentInvokable<OUT> implements Serializable {
+public class SourceInvokable<OUT> extends StreamInvokable<OUT> implements Serializable {
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/723cb27c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamComponentInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamComponentInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamComponentInvokable.java
deleted file mode 100644
index 5d9d5e5..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamComponentInvokable.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable;
-
-import java.io.Serializable;
-
-import org.apache.flink.api.common.functions.AbstractRichFunction;
-import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.api.common.functions.RichFunction;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.util.Collector;
-
-public abstract class StreamComponentInvokable<OUT> extends AbstractRichFunction implements
- Serializable {
-
- private static final long serialVersionUID = 1L;
-
- @SuppressWarnings("unused")
- private String componentName;
- @SuppressWarnings("unused")
- private int channelID;
- protected Collector<OUT> collector;
- protected Function userFunction;
-
- public StreamComponentInvokable(Function userFunction) {
- this.userFunction = userFunction;
- }
-
- public void setCollector(Collector<OUT> collector) {
- this.collector = collector;
- }
-
- public void setAttributes(String componentName, int channelID) {
- this.componentName = componentName;
- this.channelID = channelID;
- }
-
- @Override
- public void open(Configuration parameters) throws Exception {
- if (userFunction instanceof RichFunction) {
- ((RichFunction) userFunction).open(parameters);
- }
- }
-
- @Override
- public void close() throws Exception {
- if (userFunction instanceof RichFunction) {
- ((RichFunction) userFunction).close();
- }
- }
-
- public abstract void invoke() throws Exception;
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/723cb27c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
new file mode 100644
index 0000000..9a6f2cc
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable;
+
+import java.io.Serializable;
+
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.RichFunction;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.Collector;
+
+/**
+ * The StreamInvokable represents the base class for all invokables in
+ * the streaming topology.
+ *
+ * @param <OUT>
+ * The output type of the invokable
+ */
+public abstract class StreamInvokable<OUT> implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ protected Collector<OUT> collector;
+ protected Function userFunction;
+
+ public StreamInvokable(Function userFunction) {
+ this.userFunction = userFunction;
+ }
+
+ public void setCollector(Collector<OUT> collector) {
+ this.collector = collector;
+ }
+
+ /**
+ * Open method to be used if the user defined function extends the
+ * RichFunction class
+ *
+ * @param parameters
+ * The configuration parameters for the operator
+ */
+ public void open(Configuration parameters) throws Exception {
+ if (userFunction instanceof RichFunction) {
+ ((RichFunction) userFunction).open(parameters);
+ }
+ }
+
+ /**
+ * Close method to be used if the user defined function extends the
+ * RichFunction class
+ *
+ */
+ public void close() throws Exception {
+ if (userFunction instanceof RichFunction) {
+ ((RichFunction) userFunction).close();
+ }
+ }
+
+ /**
+ * The method that will be called once when the operator is created, the
+ * working mechanics of the operator should be implemented here
+ *
+ */
+ public abstract void invoke() throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/723cb27c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamOperatorInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamOperatorInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamOperatorInvokable.java
new file mode 100644
index 0000000..d92d1f0
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamOperatorInvokable.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.streaming.api.streamrecord.StreamRecord;
+import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+import org.apache.flink.util.StringUtils;
+
+/**
+ * The StreamOperatorInvokable represents the base class for all operators in
+ * the streaming topology.
+ *
+ * @param <IN>
+ * Input type of the operator
+ * @param <OUT>
+ * Output type of the operator
+ */
+public abstract class StreamOperatorInvokable<IN, OUT> extends StreamInvokable<OUT> {
+
+ public StreamOperatorInvokable(Function userFunction) {
+ super(userFunction);
+ }
+
+ private static final long serialVersionUID = 1L;
+ private static final Log LOG = LogFactory.getLog(StreamInvokable.class);
+
+ protected MutableObjectIterator<StreamRecord<IN>> recordIterator;
+ protected StreamRecordSerializer<IN> serializer;
+ protected StreamRecord<IN> reuse;
+ protected boolean isMutable;
+
+ /**
+ * Initializes the {@link StreamOperatorInvokable} for input and output
+ * handling
+ *
+ * @param collector
+ * Collector object for collecting the outputs for the operator
+ * @param recordIterator
+ * Iterator for reading in the input records
+ * @param serializer
+ * Serializer used to deserialize inputs
+ * @param isMutable
+ * Mutability setting for the operator
+ */
+ public void initialize(Collector<OUT> collector,
+ MutableObjectIterator<StreamRecord<IN>> recordIterator,
+ StreamRecordSerializer<IN> serializer, boolean isMutable) {
+ setCollector(collector);
+ this.recordIterator = recordIterator;
+ this.serializer = serializer;
+ this.reuse = serializer.createInstance();
+ this.isMutable = isMutable;
+ }
+
+ /**
+ * Re-initializes the object in which the next input record will be read in
+ */
+ protected void resetReuse() {
+ this.reuse = serializer.createInstance();
+ }
+
+ /**
+ * Method that will be called if the mutability setting is set to immutable
+ */
+ protected abstract void immutableInvoke() throws Exception;
+
+ /**
+ * Method that will be called if the mutability setting is set to mutable
+ */
+ protected abstract void mutableInvoke() throws Exception;
+
+ /**
+ * The call of the user implemented function should be implemented here
+ */
+ protected abstract void callUserFunction() throws Exception;
+
+ /**
+ * Method for logging exceptions thrown during the user function call
+ */
+ protected void callUserFunctionAndLogException() {
+ try {
+ callUserFunction();
+ } catch (Exception e) {
+ if (LOG.isErrorEnabled()) {
+ LOG.error(String.format("Calling user function failed due to: %s",
+ StringUtils.stringifyException(e)));
+ }
+ }
+ }
+
+ @Override
+ public void invoke() throws Exception {
+ if (this.isMutable) {
+ mutableInvoke();
+ } else {
+ immutableInvoke();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/723cb27c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamRecordInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamRecordInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamRecordInvokable.java
deleted file mode 100644
index c5a6f0f..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamRecordInvokable.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.MutableObjectIterator;
-import org.apache.flink.util.StringUtils;
-
-public abstract class StreamRecordInvokable<IN, OUT> extends StreamComponentInvokable<OUT> {
-
- public StreamRecordInvokable(Function userFunction) {
- super(userFunction);
- }
-
- private static final long serialVersionUID = 1L;
- private static final Log LOG = LogFactory.getLog(StreamComponentInvokable.class);
-
- protected MutableObjectIterator<StreamRecord<IN>> recordIterator;
- StreamRecordSerializer<IN> serializer;
- protected StreamRecord<IN> reuse;
- protected boolean isMutable;
-
- public void initialize(Collector<OUT> collector,
- MutableObjectIterator<StreamRecord<IN>> recordIterator,
- StreamRecordSerializer<IN> serializer, boolean isMutable) {
- setCollector(collector);
- this.recordIterator = recordIterator;
- this.serializer = serializer;
- this.reuse = serializer.createInstance();
- this.isMutable = isMutable;
- }
-
- protected void resetReuse() {
- this.reuse = serializer.createInstance();
- }
-
- protected abstract void immutableInvoke() throws Exception;
-
- protected abstract void mutableInvoke() throws Exception;
-
- protected abstract void callUserFunction() throws Exception;
-
- protected void callUserFunctionAndLogException() {
- try {
- callUserFunction();
- } catch (Exception e) {
- if (LOG.isErrorEnabled()) {
- LOG.error(String.format("Calling user function failed due to: %s",
- StringUtils.stringifyException(e)));
- }
- }
- }
-
- @Override
- public void invoke() throws Exception {
- if (this.isMutable) {
- mutableInvoke();
- } else {
- immutableInvoke();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/723cb27c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/UserTaskInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/UserTaskInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/UserTaskInvokable.java
deleted file mode 100644
index e374773..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/UserTaskInvokable.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable;
-
-import java.io.Serializable;
-
-import org.apache.flink.api.common.functions.Function;
-
-public abstract class UserTaskInvokable<IN, OUT> extends
- StreamRecordInvokable<IN, OUT> implements Serializable {
-
- public UserTaskInvokable(Function userFunction) {
- super(userFunction);
- }
-
- private static final long serialVersionUID = 1L;
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/723cb27c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java
index df07675..693b1c8 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java
@@ -24,11 +24,11 @@ import java.util.Iterator;
import org.apache.commons.math.util.MathUtils;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.invokable.UserTaskInvokable;
+import org.apache.flink.streaming.api.invokable.StreamOperatorInvokable;
import org.apache.flink.streaming.api.streamrecord.StreamRecord;
import org.apache.flink.streaming.state.SlidingWindowState;
-public class BatchReduceInvokable<IN, OUT> extends UserTaskInvokable<IN, OUT> {
+public class BatchReduceInvokable<IN, OUT> extends StreamOperatorInvokable<IN, OUT> {
private static final long serialVersionUID = 1L;
protected GroupReduceFunction<IN, OUT> reducer;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/723cb27c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java
index df7d62b..6b31037 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java
@@ -18,9 +18,9 @@
package org.apache.flink.streaming.api.invokable.operator;
import org.apache.flink.api.common.functions.FilterFunction;
-import org.apache.flink.streaming.api.invokable.UserTaskInvokable;
+import org.apache.flink.streaming.api.invokable.StreamOperatorInvokable;
-public class FilterInvokable<IN> extends UserTaskInvokable<IN, IN> {
+public class FilterInvokable<IN> extends StreamOperatorInvokable<IN, IN> {
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/723cb27c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java
index 2cabc88..c7c3e1e 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java
@@ -18,9 +18,9 @@
package org.apache.flink.streaming.api.invokable.operator;
import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.streaming.api.invokable.UserTaskInvokable;
+import org.apache.flink.streaming.api.invokable.StreamOperatorInvokable;
-public class FlatMapInvokable<IN, OUT> extends UserTaskInvokable<IN, OUT> {
+public class FlatMapInvokable<IN, OUT> extends StreamOperatorInvokable<IN, OUT> {
private static final long serialVersionUID = 1L;
private FlatMapFunction<IN, OUT> flatMapper;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/723cb27c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java
index 73dd350..7c0352e 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java
@@ -18,9 +18,9 @@
package org.apache.flink.streaming.api.invokable.operator;
import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.streaming.api.invokable.UserTaskInvokable;
+import org.apache.flink.streaming.api.invokable.StreamOperatorInvokable;
-public class MapInvokable<IN, OUT> extends UserTaskInvokable<IN, OUT> {
+public class MapInvokable<IN, OUT> extends StreamOperatorInvokable<IN, OUT> {
private static final long serialVersionUID = 1L;
private MapFunction<IN, OUT> mapper;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/723cb27c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java
index 52039be..8b76d49 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java
@@ -18,9 +18,9 @@
package org.apache.flink.streaming.api.invokable.operator;
import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.streaming.api.invokable.UserTaskInvokable;
+import org.apache.flink.streaming.api.invokable.StreamOperatorInvokable;
-public class StreamReduceInvokable<IN> extends UserTaskInvokable<IN, IN> {
+public class StreamReduceInvokable<IN> extends StreamOperatorInvokable<IN, IN> {
private static final long serialVersionUID = 1L;
protected ReduceFunction<IN> reducer;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/723cb27c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
index 7baf687..585bd72 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
@@ -18,13 +18,13 @@
package org.apache.flink.streaming.api.invokable.operator.co;
import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.streaming.api.invokable.StreamComponentInvokable;
+import org.apache.flink.streaming.api.invokable.StreamInvokable;
import org.apache.flink.streaming.api.streamrecord.StreamRecord;
import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
import org.apache.flink.streaming.io.CoReaderIterator;
import org.apache.flink.util.Collector;
-public abstract class CoInvokable<IN1, IN2, OUT> extends StreamComponentInvokable<OUT> {
+public abstract class CoInvokable<IN1, IN2, OUT> extends StreamInvokable<OUT> {
public CoInvokable(Function userFunction) {
super(userFunction);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/723cb27c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/AbstractStreamComponent.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/AbstractStreamComponent.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/AbstractStreamComponent.java
index 88b8db1..a7f70ec 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/AbstractStreamComponent.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/AbstractStreamComponent.java
@@ -19,7 +19,7 @@ package org.apache.flink.streaming.api.streamcomponent;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.streaming.api.StreamConfig;
-import org.apache.flink.streaming.api.invokable.StreamComponentInvokable;
+import org.apache.flink.streaming.api.invokable.StreamInvokable;
public abstract class AbstractStreamComponent extends AbstractInvokable {
@@ -51,7 +51,7 @@ public abstract class AbstractStreamComponent extends AbstractInvokable {
this.function = configuration.getFunction();
}
- protected <T> void invokeUserFunction(StreamComponentInvokable<T> userInvokable) throws Exception {
+ protected <T> void invokeUserFunction(StreamInvokable<T> userInvokable) throws Exception {
userInvokable.open(getTaskConfiguration());
userInvokable.invoke();
userInvokable.close();
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/723cb27c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/OutputHandler.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/OutputHandler.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/OutputHandler.java
index 71fb015..e19eeaa 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/OutputHandler.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/OutputHandler.java
@@ -29,7 +29,7 @@ import org.apache.flink.streaming.api.StreamConfig;
import org.apache.flink.streaming.api.collector.DirectedStreamCollector;
import org.apache.flink.streaming.api.collector.OutputSelector;
import org.apache.flink.streaming.api.collector.StreamCollector;
-import org.apache.flink.streaming.api.invokable.StreamComponentInvokable;
+import org.apache.flink.streaming.api.invokable.StreamInvokable;
import org.apache.flink.streaming.api.streamrecord.StreamRecord;
import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
import org.apache.flink.streaming.io.StreamRecordWriter;
@@ -153,7 +153,7 @@ public class OutputHandler<OUT> {
long startTime;
public void invokeUserFunction(String componentTypeName,
- StreamComponentInvokable<OUT> userInvokable) throws IOException, InterruptedException {
+ StreamInvokable<OUT> userInvokable) throws IOException, InterruptedException {
if (LOG.isDebugEnabled()) {
LOG.debug(componentTypeName + " " + streamComponent.getName()
+ " invoked with instance id " + streamComponent.getInstanceID());
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/723cb27c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSink.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSink.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSink.java
index f98e891..145b709 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSink.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSink.java
@@ -19,7 +19,7 @@ package org.apache.flink.streaming.api.streamcomponent;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.flink.streaming.api.invokable.StreamRecordInvokable;
+import org.apache.flink.streaming.api.invokable.StreamOperatorInvokable;
public class StreamSink<IN> extends AbstractStreamComponent {
@@ -27,7 +27,7 @@ public class StreamSink<IN> extends AbstractStreamComponent {
private InputHandler<IN> inputHandler;
- private StreamRecordInvokable<IN, IN> userInvokable;
+ private StreamOperatorInvokable<IN, IN> userInvokable;
public StreamSink() {
userInvokable = null;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/723cb27c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamTask.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamTask.java
index 898720e..55b2f98 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamTask.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamTask.java
@@ -18,14 +18,14 @@
package org.apache.flink.streaming.api.streamcomponent;
import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.streaming.api.invokable.StreamRecordInvokable;
+import org.apache.flink.streaming.api.invokable.StreamOperatorInvokable;
public class StreamTask<IN extends Tuple, OUT extends Tuple> extends AbstractStreamComponent {
private InputHandler<IN> inputHandler;
private OutputHandler<OUT> outputHandler;
- private StreamRecordInvokable<IN, OUT> userInvokable;
+ private StreamOperatorInvokable<IN, OUT> userInvokable;
private static int numTasks;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/723cb27c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamrecord/StreamRecord.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamrecord/StreamRecord.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamrecord/StreamRecord.java
index ef19755..2d02949 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamrecord/StreamRecord.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamrecord/StreamRecord.java
@@ -33,7 +33,7 @@ public class StreamRecord<T> implements Serializable {
public boolean isTuple;
/**
- * Creates an empty StreamRecord and initializes an empty ID
+ * Creates an empty StreamRecord
*/
public StreamRecord() {
uid = new UID();
@@ -53,12 +53,22 @@ public class StreamRecord<T> implements Serializable {
* ID of the emitting task
* @return The StreamRecord object
*/
- public StreamRecord<T> setId(int channelID) {
+ public StreamRecord<T> newId(int channelID) {
uid = new UID(channelID);
return this;
}
/**
+ * Sets the ID of the StreamRecord
+ *
+ * @param id
+ * id to set
+ */
+ public void setId(UID id) {
+ this.uid = id;
+ }
+
+ /**
* Gets the wrapped object from the StreamRecord
*
* @return The object wrapped
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/723cb27c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamrecord/StreamRecordSerializer.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamrecord/StreamRecordSerializer.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamrecord/StreamRecordSerializer.java
index 66cb0bd..9395293 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamrecord/StreamRecordSerializer.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamrecord/StreamRecordSerializer.java
@@ -50,8 +50,7 @@ public final class StreamRecordSerializer<T> extends TypeSerializer<StreamRecord
@Override
public StreamRecord<T> createInstance() {
try {
- @SuppressWarnings("unchecked")
- StreamRecord<T> t = StreamRecord.class.newInstance();
+ StreamRecord<T> t = new StreamRecord<T>();
t.isTuple = isTuple;
t.setObject(typeSerializer.createInstance());
return t;
@@ -62,15 +61,10 @@ public final class StreamRecordSerializer<T> extends TypeSerializer<StreamRecord
@Override
public StreamRecord<T> copy(StreamRecord<T> from, StreamRecord<T> reuse) {
-
- return null;
- // for (int i = 0; i < arity; i++) {
- // Object copy = fieldSerializers[i].copy(from.getField(i),
- // reuse.getField(i));
- // reuse.setField(copy, i);
- // }
- //
- // return reuse;
+ reuse.isTuple = from.isTuple;
+ reuse.setId(from.getId().copy());
+ reuse.setObject(typeSerializer.copy(from.getObject(), reuse.getObject()));
+ return reuse;
}
@Override
@@ -94,27 +88,7 @@ public final class StreamRecordSerializer<T> extends TypeSerializer<StreamRecord
@Override
public void copy(DataInputView source, DataOutputView target) throws IOException {
-
+ //Needs to be implemented
}
- // @Override
- // public int hashCode() {
- // int hashCode = arity * 47;
- // for (TypeSerializer<?> ser : this.fieldSerializers) {
- // hashCode = (hashCode << 7) | (hashCode >>> -7);
- // hashCode += ser.hashCode();
- // }
- // return hashCode;
- // }
-
- // @Override
- // public boolean equals(Object obj) {
- // if (obj != null && obj instanceof StreamRecordSerializer) {
- // StreamRecordSerializer otherTS = (StreamRecordSerializer) obj;
- // return (otherTS.tupleClass == this.tupleClass)
- // && Arrays.deepEquals(this.fieldSerializers, otherTS.fieldSerializers);
- // } else {
- // return false;
- // }
- // }
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/723cb27c/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockInvokable.java
index dd8b029..47a64ac 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockInvokable.java
@@ -24,7 +24,7 @@ import java.util.Iterator;
import java.util.List;
import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.streaming.api.invokable.UserTaskInvokable;
+import org.apache.flink.streaming.api.invokable.StreamOperatorInvokable;
import org.apache.flink.streaming.api.streamrecord.StreamRecord;
import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
import org.apache.flink.types.TypeInformation;
@@ -88,7 +88,7 @@ public class MockInvokable<IN, OUT> {
return iterator;
}
- public static <IN, OUT> List<OUT> createAndExecute(UserTaskInvokable<IN, OUT> invokable, List<IN> inputs) {
+ public static <IN, OUT> List<OUT> createAndExecute(StreamOperatorInvokable<IN, OUT> invokable, List<IN> inputs) {
MockInvokable<IN, OUT> mock = new MockInvokable<IN, OUT>(inputs);
invokable.initialize(mock.getCollector(), mock.getIterator(), mock.getInDeserializer(), false);
try {