You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mx...@apache.org on 2015/09/08 16:43:36 UTC

flink git commit: [FLINK-2480][test] add a prefix test for PrintSinkFunction

Repository: flink
Updated Branches:
  refs/heads/master 97fb9a47c -> 24f7fa9ef


[FLINK-2480][test] add a prefix test for PrintSinkFunction

- improve test layout

This closes #1073.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/24f7fa9e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/24f7fa9e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/24f7fa9e

Branch: refs/heads/master
Commit: 24f7fa9ef73c9f8b08d85296323a264b33a5b9ab
Parents: 97fb9a4
Author: HuangWHWHW <40...@qq.com>
Authored: Wed Sep 2 17:47:56 2015 +0800
Committer: Maximilian Michels <mx...@apache.org>
Committed: Tue Sep 8 16:42:55 2015 +0200

----------------------------------------------------------------------
 .../api/functions/PrintSinkFunctionTest.java    | 79 ++++++++++----------
 1 file changed, 41 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/24f7fa9e/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkFunctionTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkFunctionTest.java
index 0dba4f9..ac23cda 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkFunctionTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkFunctionTest.java
@@ -19,51 +19,30 @@ package org.apache.flink.streaming.api.functions;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
 
-import static org.junit.Assert.*;
-
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.Mockito;
 
-import java.io.*;
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+
+import static org.junit.Assert.assertEquals;
 
 /**
  * Tests for the {@link org.apache.flink.streaming.api.functions.sink.PrintSinkFunction}.
  */
-public class PrintSinkFunctionTest<IN> extends RichSinkFunction<IN> {
-
-	private static final long serialVersionUID = -7194618347883773533L;
+public class PrintSinkFunctionTest {
 
 	public PrintStream printStreamOriginal = System.out;
-
-	public class printStreamMock extends PrintStream{
-
-		public String result;
-
-		public printStreamMock(OutputStream out) {
-			super(out);
-		}
-
-		@Override
-		public void println(String x) {
-			this.result = x;
-		}
-	}
-
-	public OutputStream out = new OutputStream() {
-		@Override
-		public void write(int b) throws IOException {
-
-		}
-	};
+	private String line = System.lineSeparator();
 
 	@Test
 	public void testPrintSinkStdOut(){
-
-		printStreamMock stream = new printStreamMock(out);
+		ByteArrayOutputStream baos = new ByteArrayOutputStream();
+		PrintStream stream = new PrintStream(baos);
 		System.setOut(stream);
 
 		final StreamingRuntimeContext ctx = Mockito.mock(StreamingRuntimeContext.class);
@@ -73,21 +52,22 @@ public class PrintSinkFunctionTest<IN> extends RichSinkFunction<IN> {
 		try {
 			printSink.open(new Configuration());
 		} catch (Exception e) {
-			e.printStackTrace();
+			Assert.fail();
 		}
 		printSink.setTargetToStandardOut();
 		printSink.invoke("hello world!");
 
 		assertEquals("Print to System.out", printSink.toString());
-		assertEquals("hello world!", stream.result);
+		assertEquals("hello world!" + line, baos.toString());
 
 		printSink.close();
+		stream.close();
 	}
 
 	@Test
 	public void testPrintSinkStdErr(){
-
-		printStreamMock stream = new printStreamMock(out);
+		ByteArrayOutputStream baos = new ByteArrayOutputStream();
+		PrintStream stream = new PrintStream(baos);
 		System.setOut(stream);
 
 		final StreamingRuntimeContext ctx = Mockito.mock(StreamingRuntimeContext.class);
@@ -97,20 +77,43 @@ public class PrintSinkFunctionTest<IN> extends RichSinkFunction<IN> {
 		try {
 			printSink.open(new Configuration());
 		} catch (Exception e) {
-			e.printStackTrace();
+			Assert.fail();
 		}
 		printSink.setTargetToStandardErr();
 		printSink.invoke("hello world!");
 
 		assertEquals("Print to System.err", printSink.toString());
-		assertEquals("hello world!", stream.result);
+		assertEquals("hello world!" + line, baos.toString());
 
 		printSink.close();
+		stream.close();
 	}
 
-	@Override
-	public void invoke(IN record) {
+	@Test
+	public void testPrintSinkWithPrefix(){
+		ByteArrayOutputStream baos = new ByteArrayOutputStream();
+		PrintStream stream = new PrintStream(baos);
+		System.setOut(stream);
+
+		final StreamingRuntimeContext ctx = Mockito.mock(StreamingRuntimeContext.class);
+		Mockito.when(ctx.getNumberOfParallelSubtasks()).thenReturn(2);
+		Mockito.when(ctx.getIndexOfThisSubtask()).thenReturn(1);
 
+		PrintSinkFunction<String> printSink = new PrintSinkFunction<>();
+		printSink.setRuntimeContext(ctx);
+		try {
+			printSink.open(new Configuration());
+		} catch (Exception e) {
+			Assert.fail();
+		}
+		printSink.setTargetToStandardErr();
+		printSink.invoke("hello world!");
+
+		assertEquals("Print to System.err", printSink.toString());
+		assertEquals("2> hello world!" + line, baos.toString());
+
+		printSink.close();
+		stream.close();
 	}
 
 	@After