You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2019/08/01 10:12:58 UTC
[flink] 02/02: [FLINK-13376][datastream] Unify all no operation
sinks with DiscardingSink
This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit bd4328dbcea908191b669e53b57b790a4e627ecd
Author: ifndef-SleePy <mm...@gmail.com>
AuthorDate: Tue Jul 30 17:35:06 2019 +0800
[FLINK-13376][datastream] Unify all no operation sinks with DiscardingSink
---
.../StreamingJobGraphGeneratorNodeHashTest.java | 42 +++++++++-------------
.../jar/CheckpointedStreamingProgram.java | 10 ++----
.../jar/StreamingCustomInputSplitProgram.java | 10 ++----
.../test/classloading/jar/StreamingProgram.java | 10 ++----
.../api/ContinuousFileReaderOperatorITCase.java | 6 ++--
.../streaming/runtime/GlobalAggregateITCase.java | 17 ++-------
6 files changed, 27 insertions(+), 68 deletions(-)
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamingJobGraphGeneratorNodeHashTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamingJobGraphGeneratorNodeHashTest.java
index 3a44c17..92a67c9 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamingJobGraphGeneratorNodeHashTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamingJobGraphGeneratorNodeHashTest.java
@@ -28,7 +28,6 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.graph.StreamNode;
@@ -94,7 +93,7 @@ public class StreamingJobGraphGeneratorNodeHashTest extends TestLogger {
src0.map(new NoOpMapFunction())
.union(src1, src2)
- .addSink(new NoOpSinkFunction()).name("sink");
+ .addSink(new DiscardingSink<>()).name("sink");
JobGraph jobGraph = env.getStreamGraph().getJobGraph();
@@ -121,7 +120,7 @@ public class StreamingJobGraphGeneratorNodeHashTest extends TestLogger {
src0.map(new NoOpMapFunction())
.union(src1, src2)
- .addSink(new NoOpSinkFunction()).name("sink");
+ .addSink(new DiscardingSink<>()).name("sink");
jobGraph = env.getStreamGraph().getJobGraph();
@@ -146,7 +145,7 @@ public class StreamingJobGraphGeneratorNodeHashTest extends TestLogger {
DataStream<String> src0 = env.addSource(new NoOpSourceFunction());
DataStream<String> src1 = env.addSource(new NoOpSourceFunction());
- src0.union(src1).addSink(new NoOpSinkFunction());
+ src0.union(src1).addSink(new DiscardingSink<>());
JobGraph jobGraph = env.getStreamGraph().getJobGraph();
@@ -178,7 +177,7 @@ public class StreamingJobGraphGeneratorNodeHashTest extends TestLogger {
env.addSource(new NoOpSourceFunction())
.map(new NoOpMapFunction())
.filter(new NoOpFilterFunction())
- .addSink(new NoOpSinkFunction());
+ .addSink(new DiscardingSink<>());
JobGraph jobGraph = env.getStreamGraph().getJobGraph();
@@ -192,7 +191,7 @@ public class StreamingJobGraphGeneratorNodeHashTest extends TestLogger {
.map(new NoOpMapFunction())
.startNewChain()
.filter(new NoOpFilterFunction())
- .addSink(new NoOpSinkFunction());
+ .addSink(new DiscardingSink<>());
jobGraph = env.getStreamGraph().getJobGraph();
@@ -221,7 +220,7 @@ public class StreamingJobGraphGeneratorNodeHashTest extends TestLogger {
.map(new NoOpMapFunction()).name("map")
.startNewChain()
.filter(new NoOpFilterFunction())
- .addSink(new NoOpSinkFunction());
+ .addSink(new DiscardingSink<>());
JobGraph jobGraph = env.getStreamGraph().getJobGraph();
@@ -237,7 +236,7 @@ public class StreamingJobGraphGeneratorNodeHashTest extends TestLogger {
.startNewChain()
.filter(new NoOpFilterFunction())
.startNewChain()
- .addSink(new NoOpSinkFunction());
+ .addSink(new DiscardingSink<>());
jobGraph = env.getStreamGraph().getJobGraph();
@@ -266,9 +265,9 @@ public class StreamingJobGraphGeneratorNodeHashTest extends TestLogger {
DataStream<String> src = env.addSource(new NoOpSourceFunction());
- src.map(new NoOpMapFunction()).addSink(new NoOpSinkFunction());
+ src.map(new NoOpMapFunction()).addSink(new DiscardingSink<>());
- src.map(new NoOpMapFunction()).addSink(new NoOpSinkFunction());
+ src.map(new NoOpMapFunction()).addSink(new DiscardingSink<>());
JobGraph jobGraph = env.getStreamGraph().getJobGraph();
Set<JobVertexID> vertexIds = new HashSet<>();
@@ -326,11 +325,11 @@ public class StreamingJobGraphGeneratorNodeHashTest extends TestLogger {
.name("source").uid("source");
src.map(new NoOpMapFunction())
- .addSink(new NoOpSinkFunction())
+ .addSink(new DiscardingSink<>())
.name("sink0").uid("sink0");
src.map(new NoOpMapFunction())
- .addSink(new NoOpSinkFunction())
+ .addSink(new DiscardingSink<>())
.name("sink1").uid("sink1");
JobGraph jobGraph = env.getStreamGraph().getJobGraph();
@@ -352,13 +351,13 @@ public class StreamingJobGraphGeneratorNodeHashTest extends TestLogger {
src.map(new NoOpMapFunction())
.keyBy(new NoOpKeySelector())
.reduce(new NoOpReduceFunction())
- .addSink(new NoOpSinkFunction())
+ .addSink(new DiscardingSink<>())
.name("sink0").uid("sink0");
src.map(new NoOpMapFunction())
.keyBy(new NoOpKeySelector())
.reduce(new NoOpReduceFunction())
- .addSink(new NoOpSinkFunction())
+ .addSink(new DiscardingSink<>())
.name("sink1").uid("sink1");
JobGraph newJobGraph = env.getStreamGraph().getJobGraph();
@@ -386,7 +385,7 @@ public class StreamingJobGraphGeneratorNodeHashTest extends TestLogger {
env.addSource(new NoOpSourceFunction()).uid("source")
.map(new NoOpMapFunction()).uid("source") // Collision
- .addSink(new NoOpSinkFunction());
+ .addSink(new DiscardingSink<>());
// This call is necessary to generate the job graph
env.getStreamGraph().getJobGraph();
@@ -403,7 +402,7 @@ public class StreamingJobGraphGeneratorNodeHashTest extends TestLogger {
env.addSource(new NoOpSourceFunction())
// Intermediate chained node
.map(new NoOpMapFunction()).uid("map")
- .addSink(new NoOpSinkFunction());
+ .addSink(new DiscardingSink<>());
env.getStreamGraph().getJobGraph();
}
@@ -418,7 +417,7 @@ public class StreamingJobGraphGeneratorNodeHashTest extends TestLogger {
env.addSource(new NoOpSourceFunction()).uid("source")
.map(new NoOpMapFunction())
- .addSink(new NoOpSinkFunction());
+ .addSink(new DiscardingSink<>());
env.getStreamGraph().getJobGraph();
}
@@ -547,15 +546,6 @@ public class StreamingJobGraphGeneratorNodeHashTest extends TestLogger {
}
}
- private static class NoOpSinkFunction implements SinkFunction<String> {
-
- private static final long serialVersionUID = -5654199886203297279L;
-
- @Override
- public void invoke(String value) throws Exception {
- }
- }
-
private static class NoOpMapFunction implements MapFunction<String, String> {
private static final long serialVersionUID = 6584823409744624276L;
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointedStreamingProgram.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointedStreamingProgram.java
index 0503c93..6fe2e5a 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointedStreamingProgram.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointedStreamingProgram.java
@@ -24,7 +24,7 @@ import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.util.Collections;
@@ -48,7 +48,7 @@ public class CheckpointedStreamingProgram {
env.disableOperatorChaining();
DataStream<String> text = env.addSource(new SimpleStringGenerator());
- text.map(new StatefulMapper()).addSink(new NoOpSink());
+ text.map(new StatefulMapper()).addSink(new DiscardingSink<>());
env.setParallelism(1);
env.execute("Checkpointed Streaming Program");
}
@@ -133,10 +133,4 @@ public class CheckpointedStreamingProgram {
private static class SuccessException extends Exception {
}
-
- private static class NoOpSink implements SinkFunction<String>{
- @Override
- public void invoke(String value) throws Exception {
- }
- }
}
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingCustomInputSplitProgram.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingCustomInputSplitProgram.java
index 26fe96a..0904544 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingCustomInputSplitProgram.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingCustomInputSplitProgram.java
@@ -31,7 +31,7 @@ import org.apache.flink.core.io.InputSplit;
import org.apache.flink.core.io.InputSplitAssigner;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import java.io.Serializable;
import java.util.ArrayList;
@@ -60,7 +60,7 @@ public class StreamingCustomInputSplitProgram {
public Tuple2<Integer, Double> map(Integer value) throws Exception {
return new Tuple2<Integer, Double>(value, value * 0.5);
}
- }).addSink(new NoOpSink());
+ }).addSink(new DiscardingSink<>());
env.execute();
}
@@ -167,10 +167,4 @@ public class StreamingCustomInputSplitProgram {
}
}
}
-
- private static class NoOpSink implements SinkFunction<Tuple2<Integer, Double>> {
- @Override
- public void invoke(Tuple2<Integer, Double> value) throws Exception {
- }
- }
}
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingProgram.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingProgram.java
index 596e4dd..b2414a9 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingProgram.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingProgram.java
@@ -21,7 +21,7 @@ package org.apache.flink.test.classloading.jar;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.test.testdata.WordCountData;
import org.apache.flink.util.Collector;
@@ -42,7 +42,7 @@ public class StreamingProgram {
DataStream<Word> counts =
text.flatMap(new Tokenizer()).keyBy("word").sum("frequency");
- counts.addSink(new NoOpSink());
+ counts.addSink(new DiscardingSink<>());
env.execute();
}
@@ -95,10 +95,4 @@ public class StreamingProgram {
}
}
}
-
- private static class NoOpSink implements SinkFunction<Word>{
- @Override
- public void invoke(Word value) throws Exception {
- }
- }
}
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/ContinuousFileReaderOperatorITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/ContinuousFileReaderOperatorITCase.java
index b681114..8d9995f 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/ContinuousFileReaderOperatorITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/ContinuousFileReaderOperatorITCase.java
@@ -21,7 +21,7 @@ package org.apache.flink.test.streaming.api;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
@@ -61,7 +61,7 @@ public class ContinuousFileReaderOperatorITCase extends AbstractTestBase {
TestBoundedOneInputStreamOperator checkingOperator = new TestBoundedOneInputStreamOperator(elementCount);
DataStream<String> endInputChecking = source.transform("EndInputChecking", STRING_TYPE_INFO, checkingOperator);
- endInputChecking.addSink(new NoOpSink());
+ endInputChecking.addSink(new DiscardingSink<>());
env.execute("ContinuousFileReaderOperatorITCase.testEndInput");
}
@@ -95,6 +95,4 @@ public class ContinuousFileReaderOperatorITCase extends AbstractTestBase {
processedElementCount++;
}
}
-
- private static class NoOpSink implements SinkFunction<String> {}
}
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/GlobalAggregateITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/GlobalAggregateITCase.java
index 5e2bb7d..0266f9e 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/GlobalAggregateITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/GlobalAggregateITCase.java
@@ -22,7 +22,7 @@ import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.taskexecutor.GlobalAggregateManager;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.test.util.AbstractTestBase;
@@ -45,7 +45,7 @@ public class GlobalAggregateITCase extends AbstractTestBase {
streamExecutionEnvironment
.addSource(new TestSourceFunction(new IntegerAggregateFunction(), false))
- .addSink(new NoOpSinkFunction());
+ .addSink(new DiscardingSink<>());
streamExecutionEnvironment.execute();
}
@@ -56,7 +56,7 @@ public class GlobalAggregateITCase extends AbstractTestBase {
streamExecutionEnvironment
.addSource(new TestSourceFunction(new ExceptionThrowingAggregateFunction(), true))
- .addSink(new NoOpSinkFunction());
+ .addSink(new DiscardingSink<>());
streamExecutionEnvironment.execute();
}
@@ -166,15 +166,4 @@ public class GlobalAggregateITCase extends AbstractTestBase {
return add(accumulatorA, accumulatorB);
}
}
-
- /**
- * Sink function that does nothing.
- */
- private static class NoOpSinkFunction implements SinkFunction<Integer> {
-
- @Override
- public void invoke(Integer value, Context context) throws Exception {
-
- }
- }
}