You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mx...@apache.org on 2015/09/08 16:43:36 UTC
flink git commit: [FLINK-2480][test] add a prefix test for
PrintSinkFunction
Repository: flink
Updated Branches:
refs/heads/master 97fb9a47c -> 24f7fa9ef
[FLINK-2480][test] add a prefix test for PrintSinkFunction
- improve test layout
This closes #1073.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/24f7fa9e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/24f7fa9e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/24f7fa9e
Branch: refs/heads/master
Commit: 24f7fa9ef73c9f8b08d85296323a264b33a5b9ab
Parents: 97fb9a4
Author: HuangWHWHW <40...@qq.com>
Authored: Wed Sep 2 17:47:56 2015 +0800
Committer: Maximilian Michels <mx...@apache.org>
Committed: Tue Sep 8 16:42:55 2015 +0200
----------------------------------------------------------------------
.../api/functions/PrintSinkFunctionTest.java | 79 ++++++++++----------
1 file changed, 41 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/24f7fa9e/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkFunctionTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkFunctionTest.java
index 0dba4f9..ac23cda 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkFunctionTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkFunctionTest.java
@@ -19,51 +19,30 @@ package org.apache.flink.streaming.api.functions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
-import static org.junit.Assert.*;
-
import org.junit.After;
+import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
-import java.io.*;
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+
+import static org.junit.Assert.assertEquals;
/**
* Tests for the {@link org.apache.flink.streaming.api.functions.sink.PrintSinkFunction}.
*/
-public class PrintSinkFunctionTest<IN> extends RichSinkFunction<IN> {
-
- private static final long serialVersionUID = -7194618347883773533L;
+public class PrintSinkFunctionTest {
public PrintStream printStreamOriginal = System.out;
-
- public class printStreamMock extends PrintStream{
-
- public String result;
-
- public printStreamMock(OutputStream out) {
- super(out);
- }
-
- @Override
- public void println(String x) {
- this.result = x;
- }
- }
-
- public OutputStream out = new OutputStream() {
- @Override
- public void write(int b) throws IOException {
-
- }
- };
+ private String line = System.lineSeparator();
@Test
public void testPrintSinkStdOut(){
-
- printStreamMock stream = new printStreamMock(out);
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ PrintStream stream = new PrintStream(baos);
System.setOut(stream);
final StreamingRuntimeContext ctx = Mockito.mock(StreamingRuntimeContext.class);
@@ -73,21 +52,22 @@ public class PrintSinkFunctionTest<IN> extends RichSinkFunction<IN> {
try {
printSink.open(new Configuration());
} catch (Exception e) {
- e.printStackTrace();
+ Assert.fail();
}
printSink.setTargetToStandardOut();
printSink.invoke("hello world!");
assertEquals("Print to System.out", printSink.toString());
- assertEquals("hello world!", stream.result);
+ assertEquals("hello world!" + line, baos.toString());
printSink.close();
+ stream.close();
}
@Test
public void testPrintSinkStdErr(){
-
- printStreamMock stream = new printStreamMock(out);
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ PrintStream stream = new PrintStream(baos);
System.setOut(stream);
final StreamingRuntimeContext ctx = Mockito.mock(StreamingRuntimeContext.class);
@@ -97,20 +77,43 @@ public class PrintSinkFunctionTest<IN> extends RichSinkFunction<IN> {
try {
printSink.open(new Configuration());
} catch (Exception e) {
- e.printStackTrace();
+ Assert.fail();
}
printSink.setTargetToStandardErr();
printSink.invoke("hello world!");
assertEquals("Print to System.err", printSink.toString());
- assertEquals("hello world!", stream.result);
+ assertEquals("hello world!" + line, baos.toString());
printSink.close();
+ stream.close();
}
- @Override
- public void invoke(IN record) {
+ @Test
+ public void testPrintSinkWithPrefix(){
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ PrintStream stream = new PrintStream(baos);
+ System.setOut(stream);
+
+ final StreamingRuntimeContext ctx = Mockito.mock(StreamingRuntimeContext.class);
+ Mockito.when(ctx.getNumberOfParallelSubtasks()).thenReturn(2);
+ Mockito.when(ctx.getIndexOfThisSubtask()).thenReturn(1);
+ PrintSinkFunction<String> printSink = new PrintSinkFunction<>();
+ printSink.setRuntimeContext(ctx);
+ try {
+ printSink.open(new Configuration());
+ } catch (Exception e) {
+ Assert.fail();
+ }
+ printSink.setTargetToStandardErr();
+ printSink.invoke("hello world!");
+
+ assertEquals("Print to System.err", printSink.toString());
+ assertEquals("2> hello world!" + line, baos.toString());
+
+ printSink.close();
+ stream.close();
}
@After