You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2015/11/10 14:33:43 UTC

[1/2] flink git commit: [FLINK-2992] Remove use of SerializationUtils

Repository: flink
Updated Branches:
  refs/heads/release-0.10 571665927 -> c0fe30578


[FLINK-2992] Remove use of SerializationUtils


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c0fe3057
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c0fe3057
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c0fe3057

Branch: refs/heads/release-0.10
Commit: c0fe305781344089493e0ef9ca27fdcdafd9aac4
Parents: c098377
Author: Robert Metzger <rm...@apache.org>
Authored: Tue Nov 10 12:29:17 2015 +0100
Committer: Robert Metzger <rm...@apache.org>
Committed: Tue Nov 10 12:40:45 2015 +0100

----------------------------------------------------------------------
 .../api/datastream/AllWindowedStream.java       |  7 +------
 .../api/datastream/WindowedStream.java          |  7 +------
 .../streaming/api/operators/StreamSource.java   |  2 +-
 .../windowing/NonKeyedWindowOperator.java       | 22 ++++++++------------
 .../operators/windowing/WindowOperator.java     | 21 ++++++++-----------
 tools/maven/checkstyle.xml                      |  7 +++++++
 6 files changed, 28 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c0fe3057/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
index 7191304..989e762 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.streaming.api.datastream;
 
-import org.apache.commons.lang.SerializationUtils;
 import org.apache.flink.api.common.functions.FoldFunction;
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.functions.ReduceFunction;
@@ -153,13 +152,9 @@ public class AllWindowedStream<T, W extends Window> {
 					evictor).enableSetProcessingTime(setProcessingTime);
 
 		} else {
-			// we need to copy because we need our own instance of the pre aggregator
-			@SuppressWarnings("unchecked")
-			ReduceFunction<T> functionCopy = (ReduceFunction<T>) SerializationUtils.clone(function);
-
 			operator = new NonKeyedWindowOperator<>(windowAssigner,
 					windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
-					new PreAggregatingHeapWindowBuffer.Factory<>(functionCopy),
+					new PreAggregatingHeapWindowBuffer.Factory<>(function),
 					new ReduceAllWindowFunction<W, T>(function),
 					trigger).enableSetProcessingTime(setProcessingTime);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/c0fe3057/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
index 033e84f..9dbee30 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.streaming.api.datastream;
 
-import org.apache.commons.lang.SerializationUtils;
 import org.apache.flink.api.common.functions.FoldFunction;
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.functions.ReduceFunction;
@@ -167,15 +166,11 @@ public class WindowedStream<T, K, W extends Window> {
 					evictor).enableSetProcessingTime(setProcessingTime);
 
 		} else {
-			// we need to copy because we need our own instance of the pre aggregator
-			@SuppressWarnings("unchecked")
-			ReduceFunction<T> functionCopy = (ReduceFunction<T>) SerializationUtils.clone(function);
-
 			operator = new WindowOperator<>(windowAssigner,
 					windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
 					keySel,
 					input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
-					new PreAggregatingHeapWindowBuffer.Factory<>(functionCopy),
+					new PreAggregatingHeapWindowBuffer.Factory<>(function),
 					new ReduceWindowFunction<K, W, T>(function),
 					trigger).enableSetProcessingTime(setProcessingTime);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/c0fe3057/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
index 65c11b6..56426f6 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
@@ -65,7 +65,7 @@ public class StreamSource<T> extends AbstractUdfStreamOperator<T, SourceFunction
 	public void cancel() {
 		userFunction.cancel();
 		// the context may not be initialized if the source was never running.
-		if(ctx != null) {
+		if (ctx != null) {
 			ctx.close();
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/c0fe3057/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
index cf90cf2..7daaca4 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
@@ -18,7 +18,6 @@
 package org.apache.flink.streaming.runtime.operators.windowing;
 
 import com.google.common.annotations.VisibleForTesting;
-import org.apache.commons.lang.SerializationUtils;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.state.OperatorState;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -42,11 +41,10 @@ import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBuff
 import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
+import org.apache.flink.util.InstantiationUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.Serializable;
@@ -372,7 +370,7 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window>
 
 
 		@SuppressWarnings("unchecked")
-		protected Context(DataInputView in) throws Exception {
+		protected Context(DataInputView in, ClassLoader userClassloader) throws Exception {
 			this.window = windowSerializer.deserialize(in);
 			this.watermarkTimer = in.readLong();
 			this.processingTimeTimer = in.readLong();
@@ -380,8 +378,7 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window>
 			int stateSize = in.readInt();
 			byte[] stateData = new byte[stateSize];
 			in.read(stateData);
-			ByteArrayInputStream bais = new ByteArrayInputStream(stateData);
-			state = (HashMap<String, Serializable>) SerializationUtils.deserialize(bais);
+			state = InstantiationUtil.deserializeObject(stateData, userClassloader);
 
 			this.windowBuffer = windowBufferFactory.create();
 			int numElements = in.readInt();
@@ -396,10 +393,9 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window>
 			out.writeLong(watermarkTimer);
 			out.writeLong(processingTimeTimer);
 
-			ByteArrayOutputStream baos = new ByteArrayOutputStream();
-			SerializationUtils.serialize(state, baos);
-			out.writeInt(baos.size());
-			out.write(baos.toByteArray(), 0, baos.size());
+			byte[] serializedState = InstantiationUtil.serializeObject(state);
+			out.writeInt(serializedState.length);
+			out.write(serializedState, 0, serializedState.length);
 
 			MultiplexingStreamRecordSerializer<IN> recordSerializer = new MultiplexingStreamRecordSerializer<>(inputSerializer);
 			out.writeInt(windowBuffer.size());
@@ -534,10 +530,10 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window>
 	public void restoreState(StreamTaskState taskState) throws Exception {
 		super.restoreState(taskState);
 
-
+		final ClassLoader userClassloader = getUserCodeClassloader();
 		@SuppressWarnings("unchecked")
 		StateHandle<DataInputView> inputState = (StateHandle<DataInputView>) taskState.getOperatorState();
-		DataInputView in = inputState.getState(getUserCodeClassloader());
+		DataInputView in = inputState.getState(userClassloader);
 
 		int numWindows = in.readInt();
 		this.windows = new HashMap<>(numWindows);
@@ -545,7 +541,7 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window>
 		this.watermarkTimers = new HashMap<>();
 
 		for (int j = 0; j < numWindows; j++) {
-			Context context = new Context(in);
+			Context context = new Context(in, userClassloader);
 			windows.put(context.window, context);
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/c0fe3057/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index 6764186..1c5a70c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -18,7 +18,6 @@
 package org.apache.flink.streaming.runtime.operators.windowing;
 
 import com.google.common.annotations.VisibleForTesting;
-import org.apache.commons.lang.SerializationUtils;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.state.OperatorState;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -43,11 +42,10 @@ import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBuff
 import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
+import org.apache.flink.util.InstantiationUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.Serializable;
@@ -436,7 +434,7 @@ public class WindowOperator<K, IN, OUT, W extends Window>
 		 * {@link #writeToState(StateBackend.CheckpointStateOutputView)}
 		 */
 		@SuppressWarnings("unchecked")
-		protected Context(DataInputView in) throws Exception {
+		protected Context(DataInputView in, ClassLoader userClassloader) throws Exception {
 			this.key = keySerializer.deserialize(in);
 			this.window = windowSerializer.deserialize(in);
 			this.watermarkTimer = in.readLong();
@@ -445,8 +443,7 @@ public class WindowOperator<K, IN, OUT, W extends Window>
 			int stateSize = in.readInt();
 			byte[] stateData = new byte[stateSize];
 			in.read(stateData);
-			ByteArrayInputStream bais = new ByteArrayInputStream(stateData);
-			state = (HashMap<String, Serializable>) SerializationUtils.deserialize(bais);
+			state = InstantiationUtil.deserializeObject(stateData, userClassloader);
 
 			this.windowBuffer = windowBufferFactory.create();
 			int numElements = in.readInt();
@@ -465,10 +462,9 @@ public class WindowOperator<K, IN, OUT, W extends Window>
 			out.writeLong(watermarkTimer);
 			out.writeLong(processingTimeTimer);
 
-			ByteArrayOutputStream baos = new ByteArrayOutputStream();
-			SerializationUtils.serialize(state, baos);
-			out.writeInt(baos.size());
-			out.write(baos.toByteArray(), 0, baos.size());
+			byte[] serializedState = InstantiationUtil.serializeObject(state);
+			out.writeInt(serializedState.length);
+			out.write(serializedState, 0, serializedState.length);
 
 			MultiplexingStreamRecordSerializer<IN> recordSerializer = new MultiplexingStreamRecordSerializer<>(inputSerializer);
 			out.writeInt(windowBuffer.size());
@@ -608,10 +604,11 @@ public class WindowOperator<K, IN, OUT, W extends Window>
 	public void restoreState(StreamTaskState taskState) throws Exception {
 		super.restoreState(taskState);
 
+		final ClassLoader userClassloader = getUserCodeClassloader();
 
 		@SuppressWarnings("unchecked")
 		StateHandle<DataInputView> inputState = (StateHandle<DataInputView>) taskState.getOperatorState();
-		DataInputView in = inputState.getState(getUserCodeClassloader());
+		DataInputView in = inputState.getState(userClassloader);
 
 		int numKeys = in.readInt();
 		this.windows = new HashMap<>(numKeys);
@@ -621,7 +618,7 @@ public class WindowOperator<K, IN, OUT, W extends Window>
 		for (int i = 0; i < numKeys; i++) {
 			int numWindows = in.readInt();
 			for (int j = 0; j < numWindows; j++) {
-				Context context = new Context(in);
+				Context context = new Context(in, userClassloader);
 				Map<W, Context> keyWindows = windows.get(context.key);
 				if (keyWindows == null) {
 					keyWindows = new HashMap<>(numWindows);

http://git-wip-us.apache.org/repos/asf/flink/blob/c0fe3057/tools/maven/checkstyle.xml
----------------------------------------------------------------------
diff --git a/tools/maven/checkstyle.xml b/tools/maven/checkstyle.xml
index ecedb19..6b76c18 100644
--- a/tools/maven/checkstyle.xml
+++ b/tools/maven/checkstyle.xml
@@ -49,11 +49,18 @@ under the License.
 		<module name="IllegalImport">
 			<property name="illegalPkgs" value="org.apache.flink.shaded"/>
 		</module>
+		<!-- forbid use of commons lang validate -->
 		<module name="Regexp">
 			<property name="format" value="org\.apache\.commons\.lang3\.Validate"/>
 			<property name="illegalPattern" value="true"/>
 			<property name="message" value="Use Guava Checks instead of Commons Validate. Please refer to the coding guidelines."/>
 		</module>
+		<!-- forbid the use of org.apache.commons.lang.SerializationUtils -->
+		<module name="Regexp">
+			<property name="format" value="org\.apache\.commons\.lang\.SerializationUtils"/>
+			<property name="illegalPattern" value="true"/>
+			<property name="message" value="Use Flink's InstantiationUtil instead of common's SerializationUtils"/>
+		</module>
 		<module name="NeedBraces">
 			<property name="tokens" value="LITERAL_IF, LITERAL_ELSE"/>
 		</module>


[2/2] flink git commit: [hotfix] Check for null in StreamSource.cancel()

Posted by rm...@apache.org.
[hotfix] Check for null in StreamSource.cancel()


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c0983773
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c0983773
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c0983773

Branch: refs/heads/release-0.10
Commit: c09837732fdc7fdb3a65a9d4bffaff38887cee5d
Parents: 5716659
Author: Robert Metzger <rm...@apache.org>
Authored: Tue Nov 10 11:32:20 2015 +0100
Committer: Robert Metzger <rm...@apache.org>
Committed: Tue Nov 10 12:40:45 2015 +0100

----------------------------------------------------------------------
 .../org/apache/flink/streaming/api/operators/StreamSource.java  | 5 ++++-
 1 file changed, 4 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c0983773/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
index fbecbd1..65c11b6 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
@@ -64,7 +64,10 @@ public class StreamSource<T> extends AbstractUdfStreamOperator<T, SourceFunction
 
 	public void cancel() {
 		userFunction.cancel();
-		ctx.close();
+		// the context may not be initialized if the source was never running.
+		if(ctx != null) {
+			ctx.close();
+		}
 	}
 
 	/**