You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2019/08/01 10:12:58 UTC

[flink] 02/02: [FLINK-13376][datastream] Unify all no operation sinks with DiscardingSink

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit bd4328dbcea908191b669e53b57b790a4e627ecd
Author: ifndef-SleePy <mm...@gmail.com>
AuthorDate: Tue Jul 30 17:35:06 2019 +0800

    [FLINK-13376][datastream] Unify all no operation sinks with DiscardingSink
---
 .../StreamingJobGraphGeneratorNodeHashTest.java    | 42 +++++++++-------------
 .../jar/CheckpointedStreamingProgram.java          | 10 ++----
 .../jar/StreamingCustomInputSplitProgram.java      | 10 ++----
 .../test/classloading/jar/StreamingProgram.java    | 10 ++----
 .../api/ContinuousFileReaderOperatorITCase.java    |  6 ++--
 .../streaming/runtime/GlobalAggregateITCase.java   | 17 ++-------
 6 files changed, 27 insertions(+), 68 deletions(-)

diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamingJobGraphGeneratorNodeHashTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamingJobGraphGeneratorNodeHashTest.java
index 3a44c17..92a67c9 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamingJobGraphGeneratorNodeHashTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamingJobGraphGeneratorNodeHashTest.java
@@ -28,7 +28,6 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
 import org.apache.flink.streaming.api.graph.StreamGraph;
 import org.apache.flink.streaming.api.graph.StreamNode;
@@ -94,7 +93,7 @@ public class StreamingJobGraphGeneratorNodeHashTest extends TestLogger {
 
 		src0.map(new NoOpMapFunction())
 				.union(src1, src2)
-				.addSink(new NoOpSinkFunction()).name("sink");
+				.addSink(new DiscardingSink<>()).name("sink");
 
 		JobGraph jobGraph = env.getStreamGraph().getJobGraph();
 
@@ -121,7 +120,7 @@ public class StreamingJobGraphGeneratorNodeHashTest extends TestLogger {
 
 		src0.map(new NoOpMapFunction())
 				.union(src1, src2)
-				.addSink(new NoOpSinkFunction()).name("sink");
+				.addSink(new DiscardingSink<>()).name("sink");
 
 		jobGraph = env.getStreamGraph().getJobGraph();
 
@@ -146,7 +145,7 @@ public class StreamingJobGraphGeneratorNodeHashTest extends TestLogger {
 		DataStream<String> src0 = env.addSource(new NoOpSourceFunction());
 		DataStream<String> src1 = env.addSource(new NoOpSourceFunction());
 
-		src0.union(src1).addSink(new NoOpSinkFunction());
+		src0.union(src1).addSink(new DiscardingSink<>());
 
 		JobGraph jobGraph = env.getStreamGraph().getJobGraph();
 
@@ -178,7 +177,7 @@ public class StreamingJobGraphGeneratorNodeHashTest extends TestLogger {
 		env.addSource(new NoOpSourceFunction())
 				.map(new NoOpMapFunction())
 				.filter(new NoOpFilterFunction())
-				.addSink(new NoOpSinkFunction());
+				.addSink(new DiscardingSink<>());
 
 		JobGraph jobGraph = env.getStreamGraph().getJobGraph();
 
@@ -192,7 +191,7 @@ public class StreamingJobGraphGeneratorNodeHashTest extends TestLogger {
 				.map(new NoOpMapFunction())
 				.startNewChain()
 				.filter(new NoOpFilterFunction())
-				.addSink(new NoOpSinkFunction());
+				.addSink(new DiscardingSink<>());
 
 		jobGraph = env.getStreamGraph().getJobGraph();
 
@@ -221,7 +220,7 @@ public class StreamingJobGraphGeneratorNodeHashTest extends TestLogger {
 				.map(new NoOpMapFunction()).name("map")
 				.startNewChain()
 				.filter(new NoOpFilterFunction())
-				.addSink(new NoOpSinkFunction());
+				.addSink(new DiscardingSink<>());
 
 		JobGraph jobGraph = env.getStreamGraph().getJobGraph();
 
@@ -237,7 +236,7 @@ public class StreamingJobGraphGeneratorNodeHashTest extends TestLogger {
 				.startNewChain()
 				.filter(new NoOpFilterFunction())
 				.startNewChain()
-				.addSink(new NoOpSinkFunction());
+				.addSink(new DiscardingSink<>());
 
 		jobGraph = env.getStreamGraph().getJobGraph();
 
@@ -266,9 +265,9 @@ public class StreamingJobGraphGeneratorNodeHashTest extends TestLogger {
 
 		DataStream<String> src = env.addSource(new NoOpSourceFunction());
 
-		src.map(new NoOpMapFunction()).addSink(new NoOpSinkFunction());
+		src.map(new NoOpMapFunction()).addSink(new DiscardingSink<>());
 
-		src.map(new NoOpMapFunction()).addSink(new NoOpSinkFunction());
+		src.map(new NoOpMapFunction()).addSink(new DiscardingSink<>());
 
 		JobGraph jobGraph = env.getStreamGraph().getJobGraph();
 		Set<JobVertexID> vertexIds = new HashSet<>();
@@ -326,11 +325,11 @@ public class StreamingJobGraphGeneratorNodeHashTest extends TestLogger {
 				.name("source").uid("source");
 
 		src.map(new NoOpMapFunction())
-				.addSink(new NoOpSinkFunction())
+				.addSink(new DiscardingSink<>())
 				.name("sink0").uid("sink0");
 
 		src.map(new NoOpMapFunction())
-				.addSink(new NoOpSinkFunction())
+				.addSink(new DiscardingSink<>())
 				.name("sink1").uid("sink1");
 
 		JobGraph jobGraph = env.getStreamGraph().getJobGraph();
@@ -352,13 +351,13 @@ public class StreamingJobGraphGeneratorNodeHashTest extends TestLogger {
 		src.map(new NoOpMapFunction())
 				.keyBy(new NoOpKeySelector())
 				.reduce(new NoOpReduceFunction())
-				.addSink(new NoOpSinkFunction())
+				.addSink(new DiscardingSink<>())
 				.name("sink0").uid("sink0");
 
 		src.map(new NoOpMapFunction())
 				.keyBy(new NoOpKeySelector())
 				.reduce(new NoOpReduceFunction())
-				.addSink(new NoOpSinkFunction())
+				.addSink(new DiscardingSink<>())
 				.name("sink1").uid("sink1");
 
 		JobGraph newJobGraph = env.getStreamGraph().getJobGraph();
@@ -386,7 +385,7 @@ public class StreamingJobGraphGeneratorNodeHashTest extends TestLogger {
 
 		env.addSource(new NoOpSourceFunction()).uid("source")
 				.map(new NoOpMapFunction()).uid("source") // Collision
-				.addSink(new NoOpSinkFunction());
+				.addSink(new DiscardingSink<>());
 
 		// This call is necessary to generate the job graph
 		env.getStreamGraph().getJobGraph();
@@ -403,7 +402,7 @@ public class StreamingJobGraphGeneratorNodeHashTest extends TestLogger {
 		env.addSource(new NoOpSourceFunction())
 				// Intermediate chained node
 				.map(new NoOpMapFunction()).uid("map")
-				.addSink(new NoOpSinkFunction());
+				.addSink(new DiscardingSink<>());
 
 		env.getStreamGraph().getJobGraph();
 	}
@@ -418,7 +417,7 @@ public class StreamingJobGraphGeneratorNodeHashTest extends TestLogger {
 
 		env.addSource(new NoOpSourceFunction()).uid("source")
 				.map(new NoOpMapFunction())
-				.addSink(new NoOpSinkFunction());
+				.addSink(new DiscardingSink<>());
 
 		env.getStreamGraph().getJobGraph();
 	}
@@ -547,15 +546,6 @@ public class StreamingJobGraphGeneratorNodeHashTest extends TestLogger {
 		}
 	}
 
-	private static class NoOpSinkFunction implements SinkFunction<String> {
-
-		private static final long serialVersionUID = -5654199886203297279L;
-
-		@Override
-		public void invoke(String value) throws Exception {
-		}
-	}
-
 	private static class NoOpMapFunction implements MapFunction<String, String> {
 
 		private static final long serialVersionUID = 6584823409744624276L;
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointedStreamingProgram.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointedStreamingProgram.java
index 0503c93..6fe2e5a 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointedStreamingProgram.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointedStreamingProgram.java
@@ -24,7 +24,7 @@ import org.apache.flink.runtime.state.CheckpointListener;
 import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 
 import java.util.Collections;
@@ -48,7 +48,7 @@ public class CheckpointedStreamingProgram {
 		env.disableOperatorChaining();
 
 		DataStream<String> text = env.addSource(new SimpleStringGenerator());
-		text.map(new StatefulMapper()).addSink(new NoOpSink());
+		text.map(new StatefulMapper()).addSink(new DiscardingSink<>());
 		env.setParallelism(1);
 		env.execute("Checkpointed Streaming Program");
 	}
@@ -133,10 +133,4 @@ public class CheckpointedStreamingProgram {
 	private static class SuccessException extends Exception {
 
 	}
-
-	private static class NoOpSink implements SinkFunction<String>{
-		@Override
-		public void invoke(String value) throws Exception {
-		}
-	}
 }
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingCustomInputSplitProgram.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingCustomInputSplitProgram.java
index 26fe96a..0904544 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingCustomInputSplitProgram.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingCustomInputSplitProgram.java
@@ -31,7 +31,7 @@ import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.core.io.InputSplitAssigner;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
 
 import java.io.Serializable;
 import java.util.ArrayList;
@@ -60,7 +60,7 @@ public class StreamingCustomInputSplitProgram {
 			public Tuple2<Integer, Double> map(Integer value) throws Exception {
 				return new Tuple2<Integer, Double>(value, value * 0.5);
 			}
-		}).addSink(new NoOpSink());
+		}).addSink(new DiscardingSink<>());
 
 		env.execute();
 	}
@@ -167,10 +167,4 @@ public class StreamingCustomInputSplitProgram {
 			}
 		}
 	}
-
-	private static class NoOpSink implements SinkFunction<Tuple2<Integer, Double>> {
-		@Override
-		public void invoke(Tuple2<Integer, Double> value) throws Exception {
-		}
-	}
 }
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingProgram.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingProgram.java
index 596e4dd..b2414a9 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingProgram.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingProgram.java
@@ -21,7 +21,7 @@ package org.apache.flink.test.classloading.jar;
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
 import org.apache.flink.test.testdata.WordCountData;
 import org.apache.flink.util.Collector;
 
@@ -42,7 +42,7 @@ public class StreamingProgram {
 		DataStream<Word> counts =
 				text.flatMap(new Tokenizer()).keyBy("word").sum("frequency");
 
-		counts.addSink(new NoOpSink());
+		counts.addSink(new DiscardingSink<>());
 
 		env.execute();
 	}
@@ -95,10 +95,4 @@ public class StreamingProgram {
 			}
 		}
 	}
-
-	private static class NoOpSink implements SinkFunction<Word>{
-		@Override
-		public void invoke(Word value) throws Exception {
-		}
-	}
 }
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/ContinuousFileReaderOperatorITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/ContinuousFileReaderOperatorITCase.java
index b681114..8d9995f 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/ContinuousFileReaderOperatorITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/ContinuousFileReaderOperatorITCase.java
@@ -21,7 +21,7 @@ package org.apache.flink.test.streaming.api;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.BoundedOneInput;
 import org.apache.flink.streaming.api.operators.ChainingStrategy;
@@ -61,7 +61,7 @@ public class ContinuousFileReaderOperatorITCase extends AbstractTestBase {
 		TestBoundedOneInputStreamOperator checkingOperator = new TestBoundedOneInputStreamOperator(elementCount);
 		DataStream<String> endInputChecking = source.transform("EndInputChecking", STRING_TYPE_INFO, checkingOperator);
 
-		endInputChecking.addSink(new NoOpSink());
+		endInputChecking.addSink(new DiscardingSink<>());
 
 		env.execute("ContinuousFileReaderOperatorITCase.testEndInput");
 	}
@@ -95,6 +95,4 @@ public class ContinuousFileReaderOperatorITCase extends AbstractTestBase {
 			processedElementCount++;
 		}
 	}
-
-	private static class NoOpSink implements SinkFunction<String> {}
 }
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/GlobalAggregateITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/GlobalAggregateITCase.java
index 5e2bb7d..0266f9e 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/GlobalAggregateITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/GlobalAggregateITCase.java
@@ -22,7 +22,7 @@ import org.apache.flink.api.common.functions.AggregateFunction;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.taskexecutor.GlobalAggregateManager;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
 import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.apache.flink.test.util.AbstractTestBase;
@@ -45,7 +45,7 @@ public class GlobalAggregateITCase extends AbstractTestBase {
 
 		streamExecutionEnvironment
 			.addSource(new TestSourceFunction(new IntegerAggregateFunction(), false))
-			.addSink(new NoOpSinkFunction());
+			.addSink(new DiscardingSink<>());
 
 		streamExecutionEnvironment.execute();
 	}
@@ -56,7 +56,7 @@ public class GlobalAggregateITCase extends AbstractTestBase {
 
 		streamExecutionEnvironment
 			.addSource(new TestSourceFunction(new ExceptionThrowingAggregateFunction(), true))
-			.addSink(new NoOpSinkFunction());
+			.addSink(new DiscardingSink<>());
 
 		streamExecutionEnvironment.execute();
 	}
@@ -166,15 +166,4 @@ public class GlobalAggregateITCase extends AbstractTestBase {
 			return add(accumulatorA, accumulatorB);
 		}
 	}
-
-	/**
-	 * Sink function that does nothing.
-	 */
-	private static class NoOpSinkFunction implements SinkFunction<Integer> {
-
-		@Override
-		public void invoke(Integer value, Context context) throws Exception {
-
-		}
-	}
 }