You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/08/29 21:03:43 UTC
[10/28] [streaming] Refactored stream components with InputHandler &
OutputHandler
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1b046f4e/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/SingleInputAbstractStreamComponent.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/SingleInputAbstractStreamComponent.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/SingleInputAbstractStreamComponent.java
deleted file mode 100644
index 0c042bc..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/SingleInputAbstractStreamComponent.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.flink.streaming.api.streamcomponent;
-
-import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.runtime.io.network.api.MutableReader;
-import org.apache.flink.runtime.io.network.api.MutableRecordReader;
-import org.apache.flink.runtime.io.network.api.MutableUnionRecordReader;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
-import org.apache.flink.types.TypeInformation;
-import org.apache.flink.util.MutableObjectIterator;
-
-public abstract class SingleInputAbstractStreamComponent<IN, OUT> extends
- AbstractStreamComponent<OUT> {
-
- protected StreamRecordSerializer<IN> inputSerializer = null;
- protected MutableObjectIterator<StreamRecord<IN>> inputIter;
- protected MutableReader<IOReadableWritable> inputs;
-
- protected void setDeserializers() {
- if (functionName.equals(SOURCE)) {
- setSerializer();
- } else {
- setDeserializer();
- }
- }
-
- @SuppressWarnings("unchecked")
- private void setDeserializer() {
- TypeInformation<IN> inTupleTypeInfo = (TypeInformation<IN>) typeWrapper
- .getInputTypeInfo1();
- inputSerializer = new StreamRecordSerializer<IN>(inTupleTypeInfo);
- }
-
- @SuppressWarnings("unchecked")
- protected void setSinkSerializer() {
- try {
- TypeInformation<IN> inputTypeInfo = (TypeInformation<IN>) typeWrapper
- .getOutputTypeInfo();
- inputSerializer = new StreamRecordSerializer<IN>(inputTypeInfo);
- } catch (RuntimeException e) {
- // User implemented sink, nothing to do
- }
- }
-
- @SuppressWarnings("unchecked")
- protected void setConfigInputs() throws StreamComponentException {
- setDeserializers();
-
- int numberOfInputs = configuration.getNumberOfInputs();
-
- if (numberOfInputs < 2) {
-
- inputs = new MutableRecordReader<IOReadableWritable>(this);
-
- } else {
- MutableRecordReader<IOReadableWritable>[] recordReaders = (MutableRecordReader<IOReadableWritable>[]) new MutableRecordReader<?>[numberOfInputs];
-
- for (int i = 0; i < numberOfInputs; i++) {
- recordReaders[i] = new MutableRecordReader<IOReadableWritable>(this);
- }
- inputs = new MutableUnionRecordReader<IOReadableWritable>(recordReaders);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1b046f4e/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSink.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSink.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSink.java
index 069e846..715d0cd 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSink.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSink.java
@@ -29,10 +29,12 @@ import org.apache.flink.streaming.api.streamrecord.StreamRecord;
import org.apache.flink.util.StringUtils;
public class StreamIterationSink<IN extends Tuple> extends
- SingleInputAbstractStreamComponent<IN, IN> {
+ AbstractStreamComponent {
private static final Log LOG = LogFactory.getLog(StreamIterationSink.class);
+ private InputHandler<IN> inputHandler;
+
private String iterationId;
@SuppressWarnings("rawtypes")
private BlockingQueue<StreamRecord> dataChannel;
@@ -45,16 +47,12 @@ public class StreamIterationSink<IN extends Tuple> extends
@Override
public void setInputsOutputs() {
try {
- setConfigInputs();
- setSinkSerializer();
-
- inputIter = createInputIterator(inputs, inputSerializer);
+ inputHandler = new InputHandler<IN>(this);
iterationId = configuration.getIterationId();
iterationWaitTime = configuration.getIterationWaitTime();
shouldWait = iterationWaitTime > 0;
dataChannel = BlockingQueueBroker.instance().get(iterationId);
-
} catch (Exception e) {
throw new StreamComponentException(String.format(
"Cannot register inputs of StreamIterationSink %s", iterationId), e);
@@ -64,24 +62,24 @@ public class StreamIterationSink<IN extends Tuple> extends
@Override
public void invoke() throws Exception {
if (LOG.isDebugEnabled()) {
- LOG.debug("SINK " + name + " invoked");
+ LOG.debug("SINK " + getName() + " invoked");
}
forwardRecords();
if (LOG.isDebugEnabled()) {
- LOG.debug("SINK " + name + " invoke finished");
+ LOG.debug("SINK " + getName() + " invoke finished");
}
}
protected void forwardRecords() throws Exception {
- StreamRecord<IN> reuse = inputSerializer.createInstance();
- while ((reuse = inputIter.next(reuse)) != null) {
+ StreamRecord<IN> reuse = inputHandler.getInputSerializer().createInstance();
+ while ((reuse = inputHandler.getInputIter().next(reuse)) != null) {
if (!pushToQueue(reuse)) {
break;
}
// TODO: Fix object reuse for iteration
- reuse = inputSerializer.createInstance();
+ reuse = inputHandler.getInputSerializer().createInstance();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1b046f4e/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSource.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSource.java
index 8f32dd7..d278342 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSource.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSource.java
@@ -30,11 +30,12 @@ import org.apache.flink.runtime.io.network.api.RecordWriter;
import org.apache.flink.runtime.plugable.SerializationDelegate;
import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-public class StreamIterationSource<OUT extends Tuple> extends
- SingleInputAbstractStreamComponent<Tuple, OUT> {
+public class StreamIterationSource<OUT extends Tuple> extends AbstractStreamComponent {
private static final Log LOG = LogFactory.getLog(StreamIterationSource.class);
+ private OutputHandler<OUT> outputHandler;
+
private static int numSources;
private String iterationId;
@SuppressWarnings("rawtypes")
@@ -44,8 +45,6 @@ public class StreamIterationSource<OUT extends Tuple> extends
@SuppressWarnings("rawtypes")
public StreamIterationSource() {
-
- outputHandler = new OutputHandler();
numSources = newComponent();
instanceID = numSources;
dataChannel = new ArrayBlockingQueue<StreamRecord>(1);
@@ -53,11 +52,7 @@ public class StreamIterationSource<OUT extends Tuple> extends
@Override
public void setInputsOutputs() {
- try {
- outputHandler.setConfigOutputs();
- } catch (StreamComponentException e) {
- throw new StreamComponentException("Cannot register outputs", e);
- }
+ outputHandler = new OutputHandler<OUT>(this);
iterationId = configuration.getIterationId();
iterationWaitTime = configuration.getIterationWaitTime();
@@ -68,18 +63,17 @@ public class StreamIterationSource<OUT extends Tuple> extends
} catch (Exception e) {
}
-
}
@SuppressWarnings("unchecked")
@Override
public void invoke() throws Exception {
if (LOG.isDebugEnabled()) {
- LOG.debug("SOURCE " + name + " invoked with instance id " + instanceID);
+ LOG.debug("SOURCE " + getName() + " invoked with instance id " + getInstanceID());
}
outputHandler.initializeOutputSerializers();
-
+
StreamRecord<OUT> nextRecord;
while (true) {
@@ -91,9 +85,10 @@ public class StreamIterationSource<OUT extends Tuple> extends
if (nextRecord == null) {
break;
}
- for (RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output : outputHandler.getOutputs()) {
- outSerializationDelegate.setInstance(nextRecord);
- output.emit(outSerializationDelegate);
+ for (RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output : outputHandler
+ .getOutputs()) {
+ outputHandler.outSerializationDelegate.setInstance(nextRecord);
+ output.emit(outputHandler.outSerializationDelegate);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1b046f4e/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSink.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSink.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSink.java
index df95bda..31dd44f 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSink.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSink.java
@@ -23,10 +23,12 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.flink.streaming.api.invokable.StreamRecordInvokable;
-public class StreamSink<IN> extends SingleInputAbstractStreamComponent<IN, IN> {
+public class StreamSink<IN> extends AbstractStreamComponent {
private static final Log LOG = LogFactory.getLog(StreamSink.class);
+ private InputHandler<IN> inputHandler;
+
private StreamRecordInvokable<IN, IN> userInvokable;
public StreamSink() {
@@ -35,35 +37,26 @@ public class StreamSink<IN> extends SingleInputAbstractStreamComponent<IN, IN> {
@Override
public void setInputsOutputs() {
- try {
- setConfigInputs();
- setSinkSerializer();
-
- inputIter = createInputIterator(inputs, inputSerializer);
- } catch (Exception e) {
- throw new StreamComponentException("Cannot register inputs for "
- + getClass().getSimpleName(), e);
- }
+ inputHandler = new InputHandler<IN>(this);
}
@Override
protected void setInvokable() {
- userInvokable = getInvokable();
- userInvokable.initialize(collector, inputIter, inputSerializer, isMutable);
+ userInvokable = configuration.getUserInvokable();
+ userInvokable.initialize(null, inputHandler.getInputIter(), inputHandler.getInputSerializer(),
+ isMutable);
}
@Override
public void invoke() throws Exception {
if (LOG.isDebugEnabled()) {
- LOG.debug("SINK " + name + " invoked");
+ LOG.debug("SINK " + getName() + " invoked");
}
- userInvokable.open(getTaskConfiguration());
- userInvokable.invoke();
- userInvokable.close();
+ invokeUserFunction(userInvokable);
if (LOG.isDebugEnabled()) {
- LOG.debug("SINK " + name + " invoke finished");
+ LOG.debug("SINK " + getName() + " invoke finished");
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1b046f4e/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSource.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSource.java
index 494dfc1..6413cb4 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSource.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSource.java
@@ -22,37 +22,33 @@ package org.apache.flink.streaming.api.streamcomponent;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.streaming.api.invokable.SourceInvokable;
-public class StreamSource<OUT extends Tuple> extends SingleInputAbstractStreamComponent<Tuple, OUT> {
+public class StreamSource<OUT extends Tuple> extends AbstractStreamComponent {
- private SourceInvokable<OUT> userInvokable;
+ protected OutputHandler<OUT> outputHandler;
+
+ private SourceInvokable<OUT> sourceInvokable;
+
private static int numSources;
public StreamSource() {
- outputHandler = new OutputHandler();
- userInvokable = null;
+ sourceInvokable = null;
numSources = newComponent();
instanceID = numSources;
}
@Override
public void setInputsOutputs() {
- try {
- outputHandler.setConfigOutputs();
- } catch (StreamComponentException e) {
- throw new StreamComponentException("Cannot register outputs for "
- + getClass().getSimpleName(), e);
- }
+ outputHandler = new OutputHandler<OUT>(this);
}
@Override
protected void setInvokable() {
- userInvokable = getInvokable();
- userInvokable.setCollector(collector);
+ sourceInvokable = configuration.getUserInvokable();
+ sourceInvokable.setCollector(outputHandler.getCollector());
}
@Override
public void invoke() throws Exception {
- outputHandler.invokeUserFunction("SOURCE", userInvokable);
+ outputHandler.invokeUserFunction("SOURCE", sourceInvokable);
}
-
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1b046f4e/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamTask.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamTask.java
index 32c10b6..cb017c4 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamTask.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamTask.java
@@ -22,14 +22,16 @@ package org.apache.flink.streaming.api.streamcomponent;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.streaming.api.invokable.StreamRecordInvokable;
-public class StreamTask<IN extends Tuple, OUT extends Tuple> extends
- SingleInputAbstractStreamComponent<IN, OUT> {
+public class StreamTask<IN extends Tuple, OUT extends Tuple> extends AbstractStreamComponent {
+
+ private InputHandler<IN> inputHandler;
+ private OutputHandler<OUT> outputHandler;
private StreamRecordInvokable<IN, OUT> userInvokable;
+
private static int numTasks;
public StreamTask() {
- outputHandler = new OutputHandler();
userInvokable = null;
numTasks = newComponent();
instanceID = numTasks;
@@ -37,17 +39,15 @@ public class StreamTask<IN extends Tuple, OUT extends Tuple> extends
@Override
public void setInputsOutputs() {
- setConfigInputs();
- outputHandler.setConfigOutputs();
-
- inputIter = createInputIterator(inputs, inputSerializer);
+ inputHandler = new InputHandler<IN>(this);
+ outputHandler = new OutputHandler<OUT>(this);
}
@Override
protected void setInvokable() {
- userInvokable = getInvokable();
- userInvokable.initialize(collector, inputIter, inputSerializer,
- isMutable);
+ userInvokable = configuration.getUserInvokable();
+ userInvokable.initialize(outputHandler.getCollector(), inputHandler.getInputIter(),
+ inputHandler.getInputSerializer(), isMutable);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1b046f4e/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/FunctionTypeWrapper.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/FunctionTypeWrapper.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/FunctionTypeWrapper.java
index 2ac6a47..acde311 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/FunctionTypeWrapper.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/FunctionTypeWrapper.java
@@ -24,22 +24,18 @@ import java.io.IOException;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.java.typeutils.TypeExtractor;
-public class FunctionTypeWrapper<IN1, IN2, OUT> extends TypeSerializerWrapper<IN1, IN2, OUT> {
+public class FunctionTypeWrapper<T> extends TypeSerializerWrapper<T> {
private static final long serialVersionUID = 1L;
private Function function;
private Class<? extends Function> functionSuperClass;
- private int inTypeParameter1;
- private int inTypeParameter2;
- private int outTypeParameter;
+ private int typeParameterNumber;
public FunctionTypeWrapper(Function function, Class<? extends Function> functionSuperClass,
- int inTypeParameter1, int inTypeParameter2, int outTypeParameter) {
+ int typeParameterNumber) {
this.function = function;
this.functionSuperClass = functionSuperClass;
- this.inTypeParameter1 = inTypeParameter1;
- this.inTypeParameter2 = inTypeParameter2;
- this.outTypeParameter = outTypeParameter;
+ this.typeParameterNumber = typeParameterNumber;
setTypeInfo();
}
@@ -51,19 +47,9 @@ public class FunctionTypeWrapper<IN1, IN2, OUT> extends TypeSerializerWrapper<IN
@Override
protected void setTypeInfo() {
- if (inTypeParameter1 != -1) {
- inTypeInfo1 = TypeExtractor.createTypeInfo(functionSuperClass, function.getClass(),
- inTypeParameter1, null, null);
- }
-
- if (inTypeParameter2 != -1) {
- inTypeInfo2 = TypeExtractor.createTypeInfo(functionSuperClass, function.getClass(),
- inTypeParameter2, null, null);
- }
-
- if (outTypeParameter != -1) {
- outTypeInfo = TypeExtractor.createTypeInfo(functionSuperClass, function.getClass(),
- outTypeParameter, null, null);
+ if (typeParameterNumber != -1) {
+ typeInfo = TypeExtractor.createTypeInfo(functionSuperClass, function.getClass(),
+ typeParameterNumber, null, null);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1b046f4e/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/ObjectTypeWrapper.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/ObjectTypeWrapper.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/ObjectTypeWrapper.java
index c1bf52c..7e079d1 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/ObjectTypeWrapper.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/ObjectTypeWrapper.java
@@ -23,18 +23,14 @@ import java.io.IOException;
import org.apache.flink.api.java.typeutils.TypeExtractor;
-public class ObjectTypeWrapper<IN1, IN2, OUT> extends
- TypeSerializerWrapper<IN1, IN2, OUT> {
+public class ObjectTypeWrapper<T> extends
+ TypeSerializerWrapper<T> {
private static final long serialVersionUID = 1L;
- private IN1 inInstance1;
- private IN2 inInstance2;
- private OUT outInstance;
+ private T instance;
- public ObjectTypeWrapper(IN1 inInstance1, IN2 inInstance2, OUT outInstance) {
- this.inInstance1 = inInstance1;
- this.inInstance2 = inInstance2;
- this.outInstance = outInstance;
+ public ObjectTypeWrapper(T instance) {
+ this.instance = instance;
setTypeInfo();
}
@@ -46,14 +42,8 @@ public class ObjectTypeWrapper<IN1, IN2, OUT> extends
@Override
protected void setTypeInfo() {
- if (inInstance1 != null) {
- inTypeInfo1 = TypeExtractor.getForObject(inInstance1);
- }
- if (inInstance2 != null) {
- inTypeInfo2 = TypeExtractor.getForObject(inInstance2);
- }
- if (outInstance != null) {
- outTypeInfo = TypeExtractor.getForObject(outInstance);
+ if (instance != null) {
+ typeInfo = TypeExtractor.getForObject(instance);
}
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1b046f4e/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/TypeSerializerWrapper.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/TypeSerializerWrapper.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/TypeSerializerWrapper.java
index 2aa50f2..33e5a94 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/TypeSerializerWrapper.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/TypeSerializerWrapper.java
@@ -23,33 +23,17 @@ import java.io.Serializable;
import org.apache.flink.types.TypeInformation;
-public abstract class TypeSerializerWrapper<IN1, IN2, OUT>
+public abstract class TypeSerializerWrapper<T>
implements Serializable {
private static final long serialVersionUID = 1L;
- protected transient TypeInformation<IN1> inTypeInfo1 = null;
- protected transient TypeInformation<IN2> inTypeInfo2 = null;
- protected transient TypeInformation<OUT> outTypeInfo = null;
-
- public TypeInformation<IN1> getInputTypeInfo1() {
- if (inTypeInfo1 == null) {
- throw new RuntimeException("There is no TypeInfo for the first input");
- }
- return inTypeInfo1;
- }
-
- public TypeInformation<IN2> getInputTypeInfo2() {
- if (inTypeInfo2 == null) {
- throw new RuntimeException("There is no TypeInfo for the second input");
- }
- return inTypeInfo2;
- }
-
- public TypeInformation<OUT> getOutputTypeInfo() {
- if (outTypeInfo == null) {
- throw new RuntimeException("There is no TypeInfo for the output");
+ protected transient TypeInformation<T> typeInfo = null;
+
+ public TypeInformation<T> getTypeInfo() {
+ if (typeInfo == null) {
+ throw new RuntimeException("There is no TypeInformation in the wrapper");
}
- return outTypeInfo;
+ return typeInfo;
}
protected abstract void setTypeInfo();
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1b046f4e/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PrintTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PrintTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PrintTest.java
index cdcd993..58ec692 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PrintTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PrintTest.java
@@ -17,6 +17,8 @@
package org.apache.flink.streaming.api;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.LogUtils;
@@ -26,12 +28,30 @@ public class PrintTest{
private static final long MEMORYSIZE = 32;
+ private static final class IdentityMap implements MapFunction<Long, Long> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Long map(Long value) throws Exception {
+ return value;
+ }
+ }
+
+ private static final class FilterAll implements FilterFunction<Long> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public boolean filter(Long value) throws Exception {
+ return true;
+ }
+ }
+
@Test
public void test() throws Exception {
LogUtils.initializeDefaultTestConsoleLogger();
LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
- env.generateSequence(1, 10).print();
+ env.generateSequence(1, 10).map(new IdentityMap()).filter(new FilterAll()).print();
env.executeTest(MEMORYSIZE);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1b046f4e/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/serialization/TypeSerializationTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/serialization/TypeSerializationTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/serialization/TypeSerializationTest.java
index 7347f62..05a9e76 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/serialization/TypeSerializationTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/serialization/TypeSerializationTest.java
@@ -21,22 +21,18 @@ package org.apache.flink.streaming.util.serialization;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.flink.api.java.functions.RichMapFunction;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.tuple.Tuple1;
import org.junit.Test;
public class TypeSerializationTest {
- private static class MyMap extends RichMapFunction<Tuple1<Integer>, Tuple1<String>> {
+ private static class MyMap extends RichMapFunction<Integer, String> {
private static final long serialVersionUID = 1L;
@Override
- public Tuple1<String> map(Tuple1<Integer> value) throws Exception {
+ public String map(Integer value) throws Exception {
return null;
}
}
@@ -44,60 +40,35 @@ public class TypeSerializationTest {
@SuppressWarnings("unchecked")
@Test
public void functionTypeSerializationTest() {
- TypeSerializerWrapper<Tuple1<Integer>, Tuple, Tuple1<Integer>> ser = new FunctionTypeWrapper<Tuple1<Integer>, Tuple, Tuple1<Integer>>(
- new MyMap(), RichMapFunction.class, 0, -1, 1);
+ TypeSerializerWrapper<Integer> ser = new FunctionTypeWrapper<Integer>(new MyMap(),
+ RichMapFunction.class, 0);
byte[] serializedType = SerializationUtils.serialize(ser);
- TypeSerializerWrapper<Tuple1<Integer>, Tuple, Tuple1<Integer>> ser2 = (TypeSerializerWrapper<Tuple1<Integer>, Tuple, Tuple1<Integer>>) SerializationUtils
+ TypeSerializerWrapper<Integer> ser2 = (TypeSerializerWrapper<Integer>) SerializationUtils
.deserialize(serializedType);
- assertNotNull(ser.getInputTypeInfo1());
- assertNotNull(ser2.getInputTypeInfo1());
+ assertNotNull(ser.getTypeInfo());
+ assertNotNull(ser2.getTypeInfo());
- assertNotNull(ser.getOutputTypeInfo());
- assertNotNull(ser2.getOutputTypeInfo());
-
- assertEquals(ser.getInputTypeInfo1(), ser2.getInputTypeInfo1());
- try {
- ser.getInputTypeInfo2();
- fail();
- } catch (RuntimeException e) {
- assertTrue(true);
- }
- assertEquals(ser.getOutputTypeInfo(), ser2.getOutputTypeInfo());
+ assertEquals(ser.getTypeInfo(), ser2.getTypeInfo());
}
@SuppressWarnings("unchecked")
@Test
public void objectTypeSerializationTest() {
- Integer instance1 = new Integer(22);
- Integer instance2 = null;
- Integer instance3 = new Integer(34);
-
- TypeSerializerWrapper<Integer, Integer, Integer> ser = new ObjectTypeWrapper<Integer, Integer, Integer>(
- instance1, instance2, instance3);
-
- // System.out.println(ser.getInputTupleTypeInfo1());
-
+ Integer instance = new Integer(22);
+
+ TypeSerializerWrapper<Integer> ser = new ObjectTypeWrapper<Integer>(instance);
+
byte[] serializedType = SerializationUtils.serialize(ser);
- TypeSerializerWrapper<Tuple1<Integer>, Tuple, Tuple1<Integer>> ser2 = (TypeSerializerWrapper<Tuple1<Integer>, Tuple, Tuple1<Integer>>) SerializationUtils
+ TypeSerializerWrapper<Integer> ser2 = (TypeSerializerWrapper<Integer>) SerializationUtils
.deserialize(serializedType);
- assertNotNull(ser.getInputTypeInfo1());
- assertNotNull(ser2.getInputTypeInfo1());
-
- assertNotNull(ser.getOutputTypeInfo());
- assertNotNull(ser2.getOutputTypeInfo());
+ assertNotNull(ser.getTypeInfo());
+ assertNotNull(ser2.getTypeInfo());
- assertEquals(ser.getInputTypeInfo1(), ser2.getInputTypeInfo1());
- try {
- ser.getInputTypeInfo2();
- fail();
- } catch (RuntimeException e) {
- assertTrue(true);
- }
- assertEquals(ser.getOutputTypeInfo(), ser2.getOutputTypeInfo());
+ assertEquals(ser.getTypeInfo(), ser2.getTypeInfo());
}
}