You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/08/18 19:25:43 UTC
[06/51] [abbrv] git commit: [streaming] Added mutability switch for
operators + cleanup
[streaming] Added mutability switch for operators + cleanup
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/c21f0e35
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/c21f0e35
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/c21f0e35
Branch: refs/heads/master
Commit: c21f0e351e0976b66436ee55a5bc12fb461ae3fc
Parents: 0c1ef6d
Author: gyfora <gy...@gmail.com>
Authored: Mon Jul 21 11:24:49 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Aug 18 16:14:13 2014 +0200
----------------------------------------------------------------------
.../apache/flink/streaming/api/DataStream.java | 6 ++
.../flink/streaming/api/JobGraphBuilder.java | 19 ++++--
.../api/StreamExecutionEnvironment.java | 52 ++++++++-------
.../api/invokable/DefaultSinkInvokable.java | 30 ---------
.../api/invokable/DefaultSourceInvokable.java | 35 ----------
.../api/invokable/DefaultTaskInvokable.java | 31 ---------
.../streaming/api/invokable/SinkInvokable.java | 15 +++--
.../api/invokable/StreamRecordInvokable.java | 10 ++-
.../api/invokable/UserSinkInvokable.java | 31 ---------
.../operator/BatchReduceInvokable.java | 17 +++--
.../api/invokable/operator/FilterInvokable.java | 16 ++++-
.../invokable/operator/FlatMapInvokable.java | 11 +++-
.../api/invokable/operator/MapInvokable.java | 11 +++-
.../AbstractStreamComponent.java | 2 +
.../api/streamcomponent/StreamSink.java | 11 ++--
.../api/streamcomponent/StreamSource.java | 3 +-
.../api/streamcomponent/StreamTask.java | 5 +-
.../api/streamrecord/StreamRecordTest.java | 68 --------------------
.../partitioner/BroadcastPartitionerTest.java | 10 +--
.../partitioner/FieldsPartitionerTest.java | 37 ++++++-----
.../partitioner/GlobalPartitionerTest.java | 14 ++--
.../partitioner/ShufflePartitionerTest.java | 24 +++----
22 files changed, 173 insertions(+), 285 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c21f0e35/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/DataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/DataStream.java
index e3d19f9..d327fad 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/DataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/DataStream.java
@@ -129,6 +129,11 @@ public class DataStream<T extends Tuple> {
public String getId() {
return id;
}
+
+ public DataStream<T> setMutability(boolean isMutable){
+ environment.setMutability(this,isMutable);
+ return this;
+ }
/**
* Sets the degree of parallelism for this operator. The degree must be 1 or
@@ -590,6 +595,7 @@ public class DataStream<T extends Tuple> {
* @return The closed DataStream
*/
public DataStream<T> writeAsCsv(String path, int batchSize, T endTuple) {
+ setMutability(false);
environment.writeAsCsv(this, path, new WriteFormatAsCsv<T>(), batchSize, endTuple);
return new DataStream<T>(this);
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c21f0e35/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
index 02a73b1..2b3ee5a 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
@@ -43,8 +43,8 @@ import org.apache.flink.runtime.jobgraph.JobTaskVertex;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.operators.util.TaskConfig;
import org.apache.flink.streaming.api.collector.OutputSelector;
+import org.apache.flink.streaming.api.invokable.SinkInvokable;
import org.apache.flink.streaming.api.invokable.StreamComponentInvokable;
-import org.apache.flink.streaming.api.invokable.UserSinkInvokable;
import org.apache.flink.streaming.api.invokable.UserSourceInvokable;
import org.apache.flink.streaming.api.invokable.UserTaskInvokable;
import org.apache.flink.streaming.api.streamcomponent.StreamIterationSink;
@@ -70,6 +70,7 @@ public class JobGraphBuilder {
// Graph attributes
private Map<String, AbstractJobVertex> components;
private Map<String, Integer> componentParallelism;
+ private Map<String, Boolean> mutability;
private Map<String, List<String>> outEdgeList;
private Map<String, List<String>> inEdgeList;
private Map<String, List<StreamPartitioner<? extends Tuple>>> connectionTypes;
@@ -100,6 +101,7 @@ public class JobGraphBuilder {
components = new HashMap<String, AbstractJobVertex>();
componentParallelism = new HashMap<String, Integer>();
+ mutability = new HashMap<String, Boolean>();
outEdgeList = new HashMap<String, List<String>>();
inEdgeList = new HashMap<String, List<String>>();
connectionTypes = new HashMap<String, List<StreamPartitioner<? extends Tuple>>>();
@@ -216,7 +218,7 @@ public class JobGraphBuilder {
* @param parallelism
* Number of parallel instances created
*/
- public void addSink(String componentName, UserSinkInvokable<? extends Tuple> InvokableObject,
+ public void addSink(String componentName, SinkInvokable<? extends Tuple> InvokableObject,
String operatorName, byte[] serializedFunction, int parallelism) {
addComponent(componentName, StreamSink.class, InvokableObject, operatorName,
@@ -286,6 +288,7 @@ public class JobGraphBuilder {
componentClasses.put(componentName, componentClass);
setParallelism(componentName, parallelism);
+ mutability.put(componentName, false);
invokableObjects.put(componentName, invokableObject);
operatorNames.put(componentName, operatorName);
serializedFunctions.put(componentName, serializedFunction);
@@ -333,6 +336,8 @@ public class JobGraphBuilder {
Configuration config = new TaskConfig(component.getConfiguration()).getConfiguration();
+ config.setBoolean("isMutable", mutability.get(componentName));
+
// Set vertex config
if (invokableObject != null) {
config.setClass("userfunction", invokableObject.getClass());
@@ -423,6 +428,10 @@ public class JobGraphBuilder {
componentParallelism.put(componentName, parallelism);
}
+ public void setMutability(String componentName, boolean isMutable) {
+ mutability.put(componentName, isMutable);
+ }
+
/**
* Connects two vertices in the JobGraph using the selected partitioner
* settings
@@ -574,8 +583,8 @@ public class JobGraphBuilder {
}
} catch (JobGraphDefinitionException e) {
- throw new RuntimeException("Cannot connect components: "
- + upStreamComponentName + " to " + downStreamComponentName, e);
+ throw new RuntimeException("Cannot connect components: " + upStreamComponentName
+ + " to " + downStreamComponentName, e);
}
int outputIndex = upStreamComponent.getNumberOfForwardConnections() - 1;
@@ -638,7 +647,7 @@ public class JobGraphBuilder {
public <T extends Tuple> void setOutputSelector(String componentName,
byte[] serializedOutputSelector) {
outputSelectors.put(componentName, serializedOutputSelector);
-
+
if (LOG.isDebugEnabled()) {
LOG.debug("Outputselector set for " + componentName);
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c21f0e35/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamExecutionEnvironment.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamExecutionEnvironment.java
index a14b150..c1eca4a 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamExecutionEnvironment.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamExecutionEnvironment.java
@@ -121,6 +121,10 @@ public abstract class StreamExecutionEnvironment {
this.degreeOfParallelism = degreeOfParallelism;
}
+ protected void setMutability(DataStream<?> stream, boolean isMutable) {
+ jobGraphBuilder.setMutability(stream.getId(), isMutable);
+ }
+
/**
* Sets the number of hardware contexts (CPU cores / threads) used when
* executed in {@link LocalStreamEnvironment}.
@@ -226,7 +230,7 @@ public abstract class StreamExecutionEnvironment {
} catch (SerializationException e) {
throw new RuntimeException("Cannot serialize collection");
}
-
+
return returnStream;
}
@@ -331,7 +335,7 @@ public abstract class StreamExecutionEnvironment {
jobGraphBuilder.addIterationSink(returnStream.getId(), inputStream.getId(), iterationID,
inputStream.getParallelism(), iterationName);
-
+
jobGraphBuilder.setIterationSourceParallelism(iterationID, inputStream.getParallelism());
for (int i = 0; i < inputStream.connectIDs.size(); i++) {
@@ -396,7 +400,7 @@ public abstract class StreamExecutionEnvironment {
return returnStream;
}
-
+
/**
* Writes a DataStream to the file specified by path in text format. The
* writing is performed periodically, in every millis milliseconds. For
@@ -420,6 +424,7 @@ public abstract class StreamExecutionEnvironment {
DataStream<T> returnStream = addSink(inputStream, new WriteSinkFunctionByMillis<T>(path,
format, millis, endTuple));
jobGraphBuilder.setBytesFrom(inputStream.getId(), returnStream.getId());
+ jobGraphBuilder.setMutability(returnStream.getId(), false);
return returnStream;
}
@@ -447,6 +452,7 @@ public abstract class StreamExecutionEnvironment {
DataStream<T> returnStream = addSink(inputStream, new WriteSinkFunctionByBatches<T>(path,
format, batchSize, endTuple));
jobGraphBuilder.setBytesFrom(inputStream.getId(), returnStream.getId());
+ jobGraphBuilder.setMutability(returnStream.getId(), false);
return returnStream;
}
@@ -473,6 +479,7 @@ public abstract class StreamExecutionEnvironment {
DataStream<T> returnStream = addSink(inputStream, new WriteSinkFunctionByMillis<T>(path,
format, millis, endTuple));
jobGraphBuilder.setBytesFrom(inputStream.getId(), returnStream.getId());
+ jobGraphBuilder.setMutability(returnStream.getId(), false);
return returnStream;
}
@@ -500,6 +507,7 @@ public abstract class StreamExecutionEnvironment {
DataStream<T> returnStream = addSink(inputStream, new WriteSinkFunctionByBatches<T>(path,
format, batchSize, endTuple));
jobGraphBuilder.setBytesFrom(inputStream.getId(), returnStream.getId());
+ jobGraphBuilder.setMutability(returnStream.getId(), false);
return returnStream;
}
@@ -558,25 +566,25 @@ public abstract class StreamExecutionEnvironment {
jobGraphBuilder.setParallelism(inputStream.getId(), inputStream.degreeOfParallelism);
}
-// /**
-// * Converts object to byte array using default java serialization
-// *
-// * @param object
-// * Object to be serialized
-// * @return Serialized object
-// */
-// static byte[] serializeToByteArray(Serializable object) {
-// SerializationUtils.serialize(object);
-// ByteArrayOutputStream baos = new ByteArrayOutputStream();
-// ObjectOutputStream oos;
-// try {
-// oos = new ObjectOutputStream(baos);
-// oos.writeObject(object);
-// } catch (IOException e) {
-// throw new RuntimeException("Cannot serialize object: " + object);
-// }
-// return baos.toByteArray();
-// }
+ // /**
+ // * Converts object to byte array using default java serialization
+ // *
+ // * @param object
+ // * Object to be serialized
+ // * @return Serialized object
+ // */
+ // static byte[] serializeToByteArray(Serializable object) {
+ // SerializationUtils.serialize(object);
+ // ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ // ObjectOutputStream oos;
+ // try {
+ // oos = new ObjectOutputStream(baos);
+ // oos.writeObject(object);
+ // } catch (IOException e) {
+ // throw new RuntimeException("Cannot serialize object: " + object);
+ // }
+ // return baos.toByteArray();
+ // }
// --------------------------------------------------------------------------------------------
// Instantiation of Execution Contexts
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c21f0e35/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/DefaultSinkInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/DefaultSinkInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/DefaultSinkInvokable.java
deleted file mode 100644
index 09e766b..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/DefaultSinkInvokable.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.flink.streaming.api.invokable;
-
-import org.apache.flink.api.java.tuple.Tuple;
-
-public class DefaultSinkInvokable<T extends Tuple> extends UserSinkInvokable<T> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void invoke() throws Exception {
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c21f0e35/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/DefaultSourceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/DefaultSourceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/DefaultSourceInvokable.java
deleted file mode 100644
index 0920ea0..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/DefaultSourceInvokable.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.flink.streaming.api.invokable;
-
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.util.Collector;
-
-public class DefaultSourceInvokable extends UserSourceInvokable<Tuple> {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public void invoke(Collector<Tuple> collector) throws Exception {
- collector.collect(new Tuple1<String>("hello flink!"));
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c21f0e35/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/DefaultTaskInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/DefaultTaskInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/DefaultTaskInvokable.java
deleted file mode 100644
index c86ffcb..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/DefaultTaskInvokable.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.flink.streaming.api.invokable;
-
-import org.apache.flink.api.java.tuple.Tuple;
-
-public class DefaultTaskInvokable<T extends Tuple> extends UserTaskInvokable<T, T> {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public void invoke() throws Exception {
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c21f0e35/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
index c6e4e36..3c14490 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
@@ -22,7 +22,7 @@ package org.apache.flink.streaming.api.invokable;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.streaming.api.function.sink.SinkFunction;
-public class SinkInvokable<IN extends Tuple> extends UserSinkInvokable<IN> {
+public class SinkInvokable<IN extends Tuple> extends StreamRecordInvokable<IN, IN> {
private static final long serialVersionUID = 1L;
private SinkFunction<IN> sinkFunction;
@@ -33,8 +33,15 @@ public class SinkInvokable<IN extends Tuple> extends UserSinkInvokable<IN> {
@Override
public void invoke() throws Exception {
- while ((reuse = recordIterator.next(reuse)) != null) {
- sinkFunction.invoke((IN) reuse.getTuple());
- }
+ if (this.isMutable) {
+ while (recordIterator.next(reuse) != null) {
+ sinkFunction.invoke((IN) reuse.getTuple());
+ }
+ } else {
+ while (recordIterator.next(reuse) != null) {
+ sinkFunction.invoke((IN) reuse.getTuple());
+ resetReuse();
+ }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c21f0e35/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamRecordInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamRecordInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamRecordInvokable.java
index 98eb679..903372b 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamRecordInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamRecordInvokable.java
@@ -32,13 +32,21 @@ public abstract class StreamRecordInvokable<IN extends Tuple, OUT extends Tuple>
protected Collector<OUT> collector;
protected MutableObjectIterator<StreamRecord<IN>> recordIterator;
+ StreamRecordSerializer<IN> serializer;
protected StreamRecord<IN> reuse;
+ protected boolean isMutable;
public void initialize(Collector<OUT> collector,
MutableObjectIterator<StreamRecord<IN>> recordIterator,
- StreamRecordSerializer<IN> serializer) {
+ StreamRecordSerializer<IN> serializer, boolean isMutable) {
this.collector = collector;
this.recordIterator = recordIterator;
+ this.serializer = serializer;
+ this.reuse = serializer.createInstance();
+ this.isMutable = isMutable;
+ }
+
+ protected void resetReuse() {
this.reuse = serializer.createInstance();
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c21f0e35/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/UserSinkInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/UserSinkInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/UserSinkInvokable.java
deleted file mode 100644
index 78214e6..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/UserSinkInvokable.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.flink.streaming.api.invokable;
-
-import java.io.Serializable;
-
-import org.apache.flink.api.java.tuple.Tuple;
-
-public abstract class UserSinkInvokable<IN extends Tuple> extends StreamRecordInvokable<IN, IN>
- implements Serializable {
-
-
- private static final long serialVersionUID = 1L;
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c21f0e35/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java
index 8e35af2..811a929 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java
@@ -43,10 +43,18 @@ public class BatchReduceInvokable<IN extends Tuple, OUT extends Tuple> extends
@Override
public void invoke() throws Exception {
MyIterator it = new MyIterator();
- do {
- reducer.reduce(it, collector);
- it.reset();
- } while (reuse != null);
+ if (this.isMutable) {
+ do {
+ reducer.reduce(it, collector);
+ it.reset();
+ } while (reuse != null);
+ } else {
+ do {
+ reducer.reduce(it, collector);
+ it.reset();
+ } while (reuse != null);
+ }
+
}
public class MyIterator implements Iterator<IN> {
@@ -62,6 +70,7 @@ public class BatchReduceInvokable<IN extends Tuple, OUT extends Tuple> extends
return false;
} else {
try {
+ resetReuse();
reuse = recordIterator.next(reuse);
} catch (IOException e) {
e.printStackTrace();
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c21f0e35/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java
index 1dba6b8..ac79764 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java
@@ -35,10 +35,20 @@ public class FilterInvokable<IN extends Tuple> extends UserTaskInvokable<IN, IN>
@Override
public void invoke() throws Exception {
- while ((reuse = recordIterator.next(reuse)) != null) {
- if (filterFunction.filter(reuse.getTuple())) {
- collector.collect(reuse.getTuple());
+ if (this.isMutable) {
+ while (recordIterator.next(reuse) != null) {
+ if (filterFunction.filter(reuse.getTuple())) {
+ collector.collect(reuse.getTuple());
+ }
+ }
+ } else {
+ while (recordIterator.next(reuse) != null) {
+ if (filterFunction.filter(reuse.getTuple())) {
+ collector.collect(reuse.getTuple());
+ }
+ resetReuse();
}
}
+
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c21f0e35/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java
index 36ab1bf..33bda80 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java
@@ -34,8 +34,15 @@ public class FlatMapInvokable<IN extends Tuple, OUT extends Tuple> extends
}
public void invoke() throws Exception {
- while ((reuse = recordIterator.next(reuse)) != null) {
- flatMapper.flatMap(reuse.getTuple(), collector);
+ if (this.isMutable) {
+ while (recordIterator.next(reuse) != null) {
+ flatMapper.flatMap(reuse.getTuple(), collector);
+ }
+ } else {
+ while (recordIterator.next(reuse) != null) {
+ flatMapper.flatMap(reuse.getTuple(), collector);
+ resetReuse();
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c21f0e35/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java
index 59b9f8f..ff29d15 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java
@@ -34,8 +34,15 @@ public class MapInvokable<IN extends Tuple, OUT extends Tuple> extends UserTaskI
@Override
public void invoke() throws Exception {
- while ((reuse = recordIterator.next(reuse)) != null) {
- collector.collect(mapper.map(reuse.getTuple()));
+ if (this.isMutable) {
+ while (recordIterator.next(reuse) != null) {
+ collector.collect(mapper.map(reuse.getTuple()));
+ }
+ } else {
+ while (recordIterator.next(reuse) != null) {
+ collector.collect(mapper.map(reuse.getTuple()));
+ resetReuse();
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c21f0e35/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/AbstractStreamComponent.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/AbstractStreamComponent.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/AbstractStreamComponent.java
index 207d269..f90caf8 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/AbstractStreamComponent.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/AbstractStreamComponent.java
@@ -76,6 +76,7 @@ public abstract class AbstractStreamComponent<IN extends Tuple, OUT extends Tupl
protected int instanceID;
protected String name;
private static int numComponents = 0;
+ protected boolean isMutable;
protected static int newComponent() {
numComponents++;
@@ -259,6 +260,7 @@ public abstract class AbstractStreamComponent<IN extends Tuple, OUT extends Tupl
StreamComponentInvokable userFunction = null;
byte[] userFunctionSerialized = configuration.getBytes("serializedudf", null);
+ this.isMutable = configuration.getBoolean("isMutable", false);
try {
userFunction = deserializeObject(userFunctionSerialized);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c21f0e35/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSink.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSink.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSink.java
index 508a10e9..1433ebc 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSink.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSink.java
@@ -23,9 +23,8 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.runtime.io.network.api.MutableReader;
-import org.apache.flink.streaming.api.invokable.DefaultSinkInvokable;
+import org.apache.flink.streaming.api.invokable.SinkInvokable;
import org.apache.flink.streaming.api.invokable.StreamRecordInvokable;
-import org.apache.flink.streaming.api.invokable.UserSinkInvokable;
import org.apache.flink.streaming.api.streamrecord.StreamRecord;
import org.apache.flink.util.MutableObjectIterator;
@@ -62,10 +61,10 @@ public class StreamSink<IN extends Tuple> extends AbstractStreamComponent<IN, IN
@SuppressWarnings({ "rawtypes", "unchecked" })
@Override
protected void setInvokable() {
- Class<? extends UserSinkInvokable> userFunctionClass = configuration.getClass(
- "userfunction", DefaultSinkInvokable.class, UserSinkInvokable.class);
- userFunction = (UserSinkInvokable<IN>) getInvokable(userFunctionClass);
- userFunction.initialize(collector, inputIter, inTupleSerializer);
+ Class<? extends SinkInvokable> userFunctionClass = configuration.getClass("userfunction",
+ SinkInvokable.class, SinkInvokable.class);
+ userFunction = (SinkInvokable<IN>) getInvokable(userFunctionClass);
+ userFunction.initialize(collector, inputIter, inTupleSerializer, isMutable);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c21f0e35/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSource.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSource.java
index 4ac7100..108916a 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSource.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSource.java
@@ -27,7 +27,6 @@ import org.apache.commons.logging.LogFactory;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.runtime.io.network.api.RecordWriter;
import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.api.invokable.DefaultSourceInvokable;
import org.apache.flink.streaming.api.invokable.UserSourceInvokable;
import org.apache.flink.streaming.api.streamrecord.StreamRecord;
@@ -74,7 +73,7 @@ public class StreamSource<OUT extends Tuple> extends AbstractStreamComponent<Tup
protected void setInvokable() {
// Default value is a TaskInvokable even if it was called from a source
Class<? extends UserSourceInvokable> userFunctionClass = configuration.getClass(
- "userfunction", DefaultSourceInvokable.class, UserSourceInvokable.class);
+ "userfunction", UserSourceInvokable.class, UserSourceInvokable.class);
userFunction = (UserSourceInvokable<OUT>) getInvokable(userFunctionClass);
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c21f0e35/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamTask.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamTask.java
index 44f93b3..7da19aa 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamTask.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamTask.java
@@ -29,7 +29,6 @@ import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.runtime.io.network.api.MutableReader;
import org.apache.flink.runtime.io.network.api.RecordWriter;
import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.api.invokable.DefaultTaskInvokable;
import org.apache.flink.streaming.api.invokable.StreamRecordInvokable;
import org.apache.flink.streaming.api.invokable.UserTaskInvokable;
import org.apache.flink.streaming.api.streamrecord.StreamRecord;
@@ -79,9 +78,9 @@ public class StreamTask<IN extends Tuple, OUT extends Tuple> extends
protected void setInvokable() {
// Default value is a TaskInvokable even if it was called from a source
Class<? extends UserTaskInvokable> userFunctionClass = configuration.getClass(
- "userfunction", DefaultTaskInvokable.class, UserTaskInvokable.class);
+ "userfunction", UserTaskInvokable.class, UserTaskInvokable.class);
userFunction = (UserTaskInvokable<IN, OUT>) getInvokable(userFunctionClass);
- userFunction.initialize(collector, inputIter, inTupleSerializer);
+ userFunction.initialize(collector, inputIter, inTupleSerializer, isMutable);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c21f0e35/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamrecord/StreamRecordTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamrecord/StreamRecordTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamrecord/StreamRecordTest.java
deleted file mode 100755
index 403c863..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamrecord/StreamRecordTest.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.flink.streaming.api.streamrecord;
-
-import static org.junit.Assert.assertEquals;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
-import org.apache.flink.runtime.io.network.serialization.DataInputDeserializer;
-import org.apache.flink.runtime.io.network.serialization.DataOutputSerializer;
-import org.apache.flink.runtime.plugable.DeserializationDelegate;
-import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.junit.Test;
-
-public class StreamRecordTest {
-
- @Test
- public void testReadWrite() throws IOException {
- StreamRecord<Tuple2<Integer, String>> streamRecord = new StreamRecord<Tuple2<Integer, String>>();
- Tuple2<Integer, String> tuple = new Tuple2<Integer, String>(2, "a");
- streamRecord.setTuple(tuple).setId(1);
-
- TupleSerializer<Tuple2<Integer, String>> ts = (TupleSerializer<Tuple2<Integer, String>>) TypeExtractor
- .getForObject(tuple).createSerializer();
-
- SerializationDelegate<Tuple2<Integer, String>> sd = new SerializationDelegate<Tuple2<Integer, String>>(
- ts);
- streamRecord.setSeralizationDelegate(sd);
-
- DataOutputSerializer out = new DataOutputSerializer(64);
- streamRecord.write(out);
-
- ByteBuffer buff = out.wrapAsByteBuffer();
-
- DataInputDeserializer in = new DataInputDeserializer(buff);
-
- StreamRecord<Tuple2<Integer, String>> streamRecord2 = new StreamRecord<Tuple2<Integer, String>>();
-
- streamRecord2.setDeseralizationDelegate(
- new DeserializationDelegate<Tuple2<Integer, String>>(ts), ts);
-
- streamRecord2.read(in);
-
- assertEquals(streamRecord.getTuple(), streamRecord2.getTuple());
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c21f0e35/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/BroadcastPartitionerTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/BroadcastPartitionerTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/BroadcastPartitionerTest.java
index 3a1c8f0..ac5240f 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/BroadcastPartitionerTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/BroadcastPartitionerTest.java
@@ -22,6 +22,7 @@ package org.apache.flink.streaming.partitioner;
import static org.junit.Assert.assertArrayEquals;
import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.runtime.plugable.SerializationDelegate;
import org.apache.flink.streaming.api.streamrecord.StreamRecord;
import org.junit.Before;
import org.junit.Test;
@@ -33,6 +34,7 @@ public class BroadcastPartitionerTest {
private BroadcastPartitioner<Tuple> broadcastPartitioner3;
private StreamRecord<Tuple> streamRecord = new StreamRecord<Tuple>();
+ private SerializationDelegate<StreamRecord<Tuple>> sd = new SerializationDelegate<StreamRecord<Tuple>>(null);
@Before
public void setPartitioner() {
@@ -47,9 +49,9 @@ public class BroadcastPartitionerTest {
int[] first = new int[] { 0 };
int[] second = new int[] { 0, 1 };
int[] sixth = new int[] { 0, 1, 2, 3, 4, 5 };
-
- assertArrayEquals(first, broadcastPartitioner1.selectChannels(streamRecord, 1));
- assertArrayEquals(second, broadcastPartitioner2.selectChannels(streamRecord, 2));
- assertArrayEquals(sixth, broadcastPartitioner3.selectChannels(streamRecord, 6));
+ sd.setInstance(streamRecord);
+ assertArrayEquals(first, broadcastPartitioner1.selectChannels(sd, 1));
+ assertArrayEquals(second, broadcastPartitioner2.selectChannels(sd, 2));
+ assertArrayEquals(sixth, broadcastPartitioner3.selectChannels(sd, 6));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c21f0e35/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/FieldsPartitionerTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/FieldsPartitionerTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/FieldsPartitionerTest.java
index 77dacf7..716a869 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/FieldsPartitionerTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/FieldsPartitionerTest.java
@@ -24,6 +24,7 @@ import static org.junit.Assert.assertEquals;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.plugable.SerializationDelegate;
import org.apache.flink.streaming.api.streamrecord.StreamRecord;
import org.junit.Before;
import org.junit.Test;
@@ -31,8 +32,14 @@ import org.junit.Test;
public class FieldsPartitionerTest {
private FieldsPartitioner<Tuple> fieldsPartitioner;
- private StreamRecord<Tuple> streamRecord1 = new StreamRecord<Tuple>().setTuple(new Tuple2<String, Integer>("test", 0));
- private StreamRecord<Tuple> streamRecord2 = new StreamRecord<Tuple>().setTuple(new Tuple2<String, Integer>("test", 42));
+ private StreamRecord<Tuple> streamRecord1 = new StreamRecord<Tuple>()
+ .setTuple(new Tuple2<String, Integer>("test", 0));
+ private StreamRecord<Tuple> streamRecord2 = new StreamRecord<Tuple>()
+ .setTuple(new Tuple2<String, Integer>("test", 42));
+ private SerializationDelegate<StreamRecord<Tuple>> sd1 = new SerializationDelegate<StreamRecord<Tuple>>(
+ null);
+ private SerializationDelegate<StreamRecord<Tuple>> sd2 = new SerializationDelegate<StreamRecord<Tuple>>(
+ null);
@Before
public void setPartitioner() {
@@ -41,22 +48,22 @@ public class FieldsPartitionerTest {
@Test
public void testSelectChannelsLength() {
- assertEquals(1,
- fieldsPartitioner.selectChannels(streamRecord1, 1).length);
- assertEquals(1,
- fieldsPartitioner.selectChannels(streamRecord1, 2).length);
- assertEquals(1,
- fieldsPartitioner.selectChannels(streamRecord1, 1024).length);
+ sd1.setInstance(streamRecord1);
+ assertEquals(1, fieldsPartitioner.selectChannels(sd1, 1).length);
+ assertEquals(1, fieldsPartitioner.selectChannels(sd1, 2).length);
+ assertEquals(1, fieldsPartitioner.selectChannels(sd1, 1024).length);
}
@Test
public void testSelectChannelsGrouping() {
- assertArrayEquals(fieldsPartitioner.selectChannels(streamRecord1, 1),
- fieldsPartitioner.selectChannels(streamRecord2, 1));
- assertArrayEquals(fieldsPartitioner.selectChannels(streamRecord1, 2),
- fieldsPartitioner.selectChannels(streamRecord2, 2));
- assertArrayEquals(
- fieldsPartitioner.selectChannels(streamRecord1, 1024),
- fieldsPartitioner.selectChannels(streamRecord2, 1024));
+ sd1.setInstance(streamRecord1);
+ sd2.setInstance(streamRecord2);
+
+ assertArrayEquals(fieldsPartitioner.selectChannels(sd1, 1),
+ fieldsPartitioner.selectChannels(sd2, 1));
+ assertArrayEquals(fieldsPartitioner.selectChannels(sd1, 2),
+ fieldsPartitioner.selectChannels(sd2, 2));
+ assertArrayEquals(fieldsPartitioner.selectChannels(sd1, 1024),
+ fieldsPartitioner.selectChannels(sd2, 1024));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c21f0e35/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/GlobalPartitionerTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/GlobalPartitionerTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/GlobalPartitionerTest.java
index dcf3160..e2703c1 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/GlobalPartitionerTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/GlobalPartitionerTest.java
@@ -22,6 +22,7 @@ package org.apache.flink.streaming.partitioner;
import static org.junit.Assert.assertArrayEquals;
import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.runtime.plugable.SerializationDelegate;
import org.apache.flink.streaming.api.streamrecord.StreamRecord;
import org.junit.Before;
import org.junit.Test;
@@ -30,6 +31,8 @@ public class GlobalPartitionerTest {
private GlobalPartitioner<Tuple> globalPartitioner;
private StreamRecord<Tuple> streamRecord = new StreamRecord<Tuple>();
+ private SerializationDelegate<StreamRecord<Tuple>> sd = new SerializationDelegate<StreamRecord<Tuple>>(
+ null);
@Before
public void setPartitioner() {
@@ -40,11 +43,10 @@ public class GlobalPartitionerTest {
public void testSelectChannels() {
int[] result = new int[] { 0 };
- assertArrayEquals(result,
- globalPartitioner.selectChannels(streamRecord, 1));
- assertArrayEquals(result,
- globalPartitioner.selectChannels(streamRecord, 2));
- assertArrayEquals(result,
- globalPartitioner.selectChannels(streamRecord, 1024));
+ sd.setInstance(streamRecord);
+
+ assertArrayEquals(result, globalPartitioner.selectChannels(sd, 1));
+ assertArrayEquals(result, globalPartitioner.selectChannels(sd, 2));
+ assertArrayEquals(result, globalPartitioner.selectChannels(sd, 1024));
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c21f0e35/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/ShufflePartitionerTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/ShufflePartitionerTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/ShufflePartitionerTest.java
index 32cc2ab..33881e8 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/ShufflePartitionerTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/ShufflePartitionerTest.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.runtime.plugable.SerializationDelegate;
import org.apache.flink.streaming.api.streamrecord.StreamRecord;
import org.junit.Before;
import org.junit.Test;
@@ -31,6 +32,8 @@ public class ShufflePartitionerTest {
private ShufflePartitioner<Tuple> shufflePartitioner;
private StreamRecord<Tuple> streamRecord = new StreamRecord<Tuple>();
+ private SerializationDelegate<StreamRecord<Tuple>> sd = new SerializationDelegate<StreamRecord<Tuple>>(
+ null);
@Before
public void setPartitioner() {
@@ -39,22 +42,21 @@ public class ShufflePartitionerTest {
@Test
public void testSelectChannelsLength() {
- assertEquals(1,
- shufflePartitioner.selectChannels(streamRecord, 1).length);
- assertEquals(1,
- shufflePartitioner.selectChannels(streamRecord, 2).length);
- assertEquals(1,
- shufflePartitioner.selectChannels(streamRecord, 1024).length);
+ sd.setInstance(streamRecord);
+ assertEquals(1, shufflePartitioner.selectChannels(sd, 1).length);
+ assertEquals(1, shufflePartitioner.selectChannels(sd, 2).length);
+ assertEquals(1, shufflePartitioner.selectChannels(sd, 1024).length);
}
@Test
public void testSelectChannelsInterval() {
- assertEquals(0, shufflePartitioner.selectChannels(streamRecord, 1)[0]);
+ sd.setInstance(streamRecord);
+ assertEquals(0, shufflePartitioner.selectChannels(sd, 1)[0]);
- assertTrue(0 <= shufflePartitioner.selectChannels(streamRecord, 2)[0]);
- assertTrue(2 > shufflePartitioner.selectChannels(streamRecord, 2)[0]);
+ assertTrue(0 <= shufflePartitioner.selectChannels(sd, 2)[0]);
+ assertTrue(2 > shufflePartitioner.selectChannels(sd, 2)[0]);
- assertTrue(0 <= shufflePartitioner.selectChannels(streamRecord, 1024)[0]);
- assertTrue(1024 > shufflePartitioner.selectChannels(streamRecord, 1024)[0]);
+ assertTrue(0 <= shufflePartitioner.selectChannels(sd, 1024)[0]);
+ assertTrue(1024 > shufflePartitioner.selectChannels(sd, 1024)[0]);
}
}