You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2015/11/10 14:33:43 UTC
[1/2] flink git commit: [FLINK-2992] Remove use of SerializationUtils
Repository: flink
Updated Branches:
refs/heads/release-0.10 571665927 -> c0fe30578
[FLINK-2992] Remove use of SerializationUtils
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c0fe3057
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c0fe3057
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c0fe3057
Branch: refs/heads/release-0.10
Commit: c0fe305781344089493e0ef9ca27fdcdafd9aac4
Parents: c098377
Author: Robert Metzger <rm...@apache.org>
Authored: Tue Nov 10 12:29:17 2015 +0100
Committer: Robert Metzger <rm...@apache.org>
Committed: Tue Nov 10 12:40:45 2015 +0100
----------------------------------------------------------------------
.../api/datastream/AllWindowedStream.java | 7 +------
.../api/datastream/WindowedStream.java | 7 +------
.../streaming/api/operators/StreamSource.java | 2 +-
.../windowing/NonKeyedWindowOperator.java | 22 ++++++++------------
.../operators/windowing/WindowOperator.java | 21 ++++++++-----------
tools/maven/checkstyle.xml | 7 +++++++
6 files changed, 28 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/c0fe3057/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
index 7191304..989e762 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
@@ -18,7 +18,6 @@
package org.apache.flink.streaming.api.datastream;
-import org.apache.commons.lang.SerializationUtils;
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.functions.ReduceFunction;
@@ -153,13 +152,9 @@ public class AllWindowedStream<T, W extends Window> {
evictor).enableSetProcessingTime(setProcessingTime);
} else {
- // we need to copy because we need our own instance of the pre aggregator
- @SuppressWarnings("unchecked")
- ReduceFunction<T> functionCopy = (ReduceFunction<T>) SerializationUtils.clone(function);
-
operator = new NonKeyedWindowOperator<>(windowAssigner,
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
- new PreAggregatingHeapWindowBuffer.Factory<>(functionCopy),
+ new PreAggregatingHeapWindowBuffer.Factory<>(function),
new ReduceAllWindowFunction<W, T>(function),
trigger).enableSetProcessingTime(setProcessingTime);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/c0fe3057/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
index 033e84f..9dbee30 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
@@ -18,7 +18,6 @@
package org.apache.flink.streaming.api.datastream;
-import org.apache.commons.lang.SerializationUtils;
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.functions.ReduceFunction;
@@ -167,15 +166,11 @@ public class WindowedStream<T, K, W extends Window> {
evictor).enableSetProcessingTime(setProcessingTime);
} else {
- // we need to copy because we need our own instance of the pre aggregator
- @SuppressWarnings("unchecked")
- ReduceFunction<T> functionCopy = (ReduceFunction<T>) SerializationUtils.clone(function);
-
operator = new WindowOperator<>(windowAssigner,
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
keySel,
input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
- new PreAggregatingHeapWindowBuffer.Factory<>(functionCopy),
+ new PreAggregatingHeapWindowBuffer.Factory<>(function),
new ReduceWindowFunction<K, W, T>(function),
trigger).enableSetProcessingTime(setProcessingTime);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/c0fe3057/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
index 65c11b6..56426f6 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
@@ -65,7 +65,7 @@ public class StreamSource<T> extends AbstractUdfStreamOperator<T, SourceFunction
public void cancel() {
userFunction.cancel();
// the context may not be initialized if the source was never running.
- if(ctx != null) {
+ if (ctx != null) {
ctx.close();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/c0fe3057/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
index cf90cf2..7daaca4 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
@@ -18,7 +18,6 @@
package org.apache.flink.streaming.runtime.operators.windowing;
import com.google.common.annotations.VisibleForTesting;
-import org.apache.commons.lang.SerializationUtils;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.OperatorState;
import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -42,11 +41,10 @@ import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBuff
import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
+import org.apache.flink.util.InstantiationUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.Serializable;
@@ -372,7 +370,7 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window>
@SuppressWarnings("unchecked")
- protected Context(DataInputView in) throws Exception {
+ protected Context(DataInputView in, ClassLoader userClassloader) throws Exception {
this.window = windowSerializer.deserialize(in);
this.watermarkTimer = in.readLong();
this.processingTimeTimer = in.readLong();
@@ -380,8 +378,7 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window>
int stateSize = in.readInt();
byte[] stateData = new byte[stateSize];
in.read(stateData);
- ByteArrayInputStream bais = new ByteArrayInputStream(stateData);
- state = (HashMap<String, Serializable>) SerializationUtils.deserialize(bais);
+ state = InstantiationUtil.deserializeObject(stateData, userClassloader);
this.windowBuffer = windowBufferFactory.create();
int numElements = in.readInt();
@@ -396,10 +393,9 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window>
out.writeLong(watermarkTimer);
out.writeLong(processingTimeTimer);
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- SerializationUtils.serialize(state, baos);
- out.writeInt(baos.size());
- out.write(baos.toByteArray(), 0, baos.size());
+ byte[] serializedState = InstantiationUtil.serializeObject(state);
+ out.writeInt(serializedState.length);
+ out.write(serializedState, 0, serializedState.length);
MultiplexingStreamRecordSerializer<IN> recordSerializer = new MultiplexingStreamRecordSerializer<>(inputSerializer);
out.writeInt(windowBuffer.size());
@@ -534,10 +530,10 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window>
public void restoreState(StreamTaskState taskState) throws Exception {
super.restoreState(taskState);
-
+ final ClassLoader userClassloader = getUserCodeClassloader();
@SuppressWarnings("unchecked")
StateHandle<DataInputView> inputState = (StateHandle<DataInputView>) taskState.getOperatorState();
- DataInputView in = inputState.getState(getUserCodeClassloader());
+ DataInputView in = inputState.getState(userClassloader);
int numWindows = in.readInt();
this.windows = new HashMap<>(numWindows);
@@ -545,7 +541,7 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window>
this.watermarkTimers = new HashMap<>();
for (int j = 0; j < numWindows; j++) {
- Context context = new Context(in);
+ Context context = new Context(in, userClassloader);
windows.put(context.window, context);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/c0fe3057/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index 6764186..1c5a70c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -18,7 +18,6 @@
package org.apache.flink.streaming.runtime.operators.windowing;
import com.google.common.annotations.VisibleForTesting;
-import org.apache.commons.lang.SerializationUtils;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.OperatorState;
import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -43,11 +42,10 @@ import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBuff
import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
+import org.apache.flink.util.InstantiationUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.Serializable;
@@ -436,7 +434,7 @@ public class WindowOperator<K, IN, OUT, W extends Window>
* {@link #writeToState(StateBackend.CheckpointStateOutputView)}
*/
@SuppressWarnings("unchecked")
- protected Context(DataInputView in) throws Exception {
+ protected Context(DataInputView in, ClassLoader userClassloader) throws Exception {
this.key = keySerializer.deserialize(in);
this.window = windowSerializer.deserialize(in);
this.watermarkTimer = in.readLong();
@@ -445,8 +443,7 @@ public class WindowOperator<K, IN, OUT, W extends Window>
int stateSize = in.readInt();
byte[] stateData = new byte[stateSize];
in.read(stateData);
- ByteArrayInputStream bais = new ByteArrayInputStream(stateData);
- state = (HashMap<String, Serializable>) SerializationUtils.deserialize(bais);
+ state = InstantiationUtil.deserializeObject(stateData, userClassloader);
this.windowBuffer = windowBufferFactory.create();
int numElements = in.readInt();
@@ -465,10 +462,9 @@ public class WindowOperator<K, IN, OUT, W extends Window>
out.writeLong(watermarkTimer);
out.writeLong(processingTimeTimer);
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- SerializationUtils.serialize(state, baos);
- out.writeInt(baos.size());
- out.write(baos.toByteArray(), 0, baos.size());
+ byte[] serializedState = InstantiationUtil.serializeObject(state);
+ out.writeInt(serializedState.length);
+ out.write(serializedState, 0, serializedState.length);
MultiplexingStreamRecordSerializer<IN> recordSerializer = new MultiplexingStreamRecordSerializer<>(inputSerializer);
out.writeInt(windowBuffer.size());
@@ -608,10 +604,11 @@ public class WindowOperator<K, IN, OUT, W extends Window>
public void restoreState(StreamTaskState taskState) throws Exception {
super.restoreState(taskState);
+ final ClassLoader userClassloader = getUserCodeClassloader();
@SuppressWarnings("unchecked")
StateHandle<DataInputView> inputState = (StateHandle<DataInputView>) taskState.getOperatorState();
- DataInputView in = inputState.getState(getUserCodeClassloader());
+ DataInputView in = inputState.getState(userClassloader);
int numKeys = in.readInt();
this.windows = new HashMap<>(numKeys);
@@ -621,7 +618,7 @@ public class WindowOperator<K, IN, OUT, W extends Window>
for (int i = 0; i < numKeys; i++) {
int numWindows = in.readInt();
for (int j = 0; j < numWindows; j++) {
- Context context = new Context(in);
+ Context context = new Context(in, userClassloader);
Map<W, Context> keyWindows = windows.get(context.key);
if (keyWindows == null) {
keyWindows = new HashMap<>(numWindows);
http://git-wip-us.apache.org/repos/asf/flink/blob/c0fe3057/tools/maven/checkstyle.xml
----------------------------------------------------------------------
diff --git a/tools/maven/checkstyle.xml b/tools/maven/checkstyle.xml
index ecedb19..6b76c18 100644
--- a/tools/maven/checkstyle.xml
+++ b/tools/maven/checkstyle.xml
@@ -49,11 +49,18 @@ under the License.
<module name="IllegalImport">
<property name="illegalPkgs" value="org.apache.flink.shaded"/>
</module>
+ <!-- forbid use of commons lang validate -->
<module name="Regexp">
<property name="format" value="org\.apache\.commons\.lang3\.Validate"/>
<property name="illegalPattern" value="true"/>
<property name="message" value="Use Guava Checks instead of Commons Validate. Please refer to the coding guidelines."/>
</module>
+ <!-- forbid the use of org.apache.commons.lang.SerializationUtils -->
+ <module name="Regexp">
+ <property name="format" value="org\.apache\.commons\.lang\.SerializationUtils"/>
+ <property name="illegalPattern" value="true"/>
+ <property name="message" value="Use Flink's InstantiationUtil instead of common's SerializationUtils"/>
+ </module>
<module name="NeedBraces">
<property name="tokens" value="LITERAL_IF, LITERAL_ELSE"/>
</module>
[2/2] flink git commit: [hotfix] Check for null in
StreamSource.cancel()
Posted by rm...@apache.org.
[hotfix] Check for null in StreamSource.cancel()
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c0983773
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c0983773
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c0983773
Branch: refs/heads/release-0.10
Commit: c09837732fdc7fdb3a65a9d4bffaff38887cee5d
Parents: 5716659
Author: Robert Metzger <rm...@apache.org>
Authored: Tue Nov 10 11:32:20 2015 +0100
Committer: Robert Metzger <rm...@apache.org>
Committed: Tue Nov 10 12:40:45 2015 +0100
----------------------------------------------------------------------
.../org/apache/flink/streaming/api/operators/StreamSource.java | 5 ++++-
1 file changed, 4 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/c0983773/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
index fbecbd1..65c11b6 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
@@ -64,7 +64,10 @@ public class StreamSource<T> extends AbstractUdfStreamOperator<T, SourceFunction
public void cancel() {
userFunction.cancel();
- ctx.close();
+ // the context may not be initialized if the source was never running.
+ if(ctx != null) {
+ ctx.close();
+ }
}
/**