You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2014/12/17 13:22:29 UTC

[2/2] incubator-flink git commit: [streaming] DataStream print functionality update

[streaming] DataStream print functionality update

PrintSinkFunction now explicitly states threads in output
Added printToErr functionality


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/b5ac6ec7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/b5ac6ec7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/b5ac6ec7

Branch: refs/heads/master
Commit: b5ac6ec7d97edcd224ac990c1f0314bc8acdfa9e
Parents: 61b023f
Author: mbalassi <mb...@apache.org>
Authored: Tue Dec 16 12:48:09 2014 +0100
Committer: mbalassi <mb...@apache.org>
Committed: Tue Dec 16 23:39:05 2014 +0100

----------------------------------------------------------------------
 .../streaming/api/datastream/DataStream.java    |  17 ++-
 .../api/function/sink/PrintSinkFunction.java    | 128 ++++++++++++++-----
 .../streamvertex/StreamingRuntimeContext.java   |   2 +-
 3 files changed, 112 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b5ac6ec7/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 978f5fa..3fc685a 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -780,7 +780,7 @@ public class DataStream<OUT> {
 	}
 
 	/**
-	 * Writes a DataStream to the standard output stream (stdout). For each
+	 * Writes a DataStream to the standard output stream (stdout).<br> For each
 	 * element of the DataStream the result of {@link Object#toString()} is
 	 * written.
 	 * 
@@ -793,6 +793,21 @@ public class DataStream<OUT> {
 
 		return returnStream;
 	}
+	
+	/**
+	 * Writes a DataStream to the standard output stream (stderr).<br> For each
+	 * element of the DataStream the result of {@link Object#toString()} is
+	 * written.
+	 * 
+	 * @return The closed DataStream.
+	 */
+	public DataStreamSink<OUT> printToErr() {
+		DataStream<OUT> inputStream = this.copy();
+		PrintSinkFunction<OUT> printFunction = new PrintSinkFunction<OUT>(true);
+		DataStreamSink<OUT> returnStream = addSink(inputStream, printFunction, getType());
+
+		return returnStream;
+	}
 
 	/**
 	 * Writes a DataStream to the file specified by path in text format. For

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b5ac6ec7/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/PrintSinkFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/PrintSinkFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/PrintSinkFunction.java
index fc75da7..d460749 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/PrintSinkFunction.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/PrintSinkFunction.java
@@ -1,36 +1,98 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
  * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.function.sink;
-
-
-/**
- * Dummy implementation of the SinkFunction writing every tuple to the standard
- * output. Used for print.
- * 
- * @param <IN>
- *            Input tuple type
- */
-public class PrintSinkFunction<IN> implements SinkFunction<IN> {
-	private static final long serialVersionUID = 1L;
-
-	@Override
-	public void invoke(IN tuple) {
-		System.out.println(tuple);
-	}
-
+ */
+
+package org.apache.flink.streaming.api.function.sink;
+
+import java.io.PrintStream;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.streamvertex.StreamingRuntimeContext;
+
+/**
+ * Implementation of the SinkFunction writing every tuple to the standard
+ * output or standard error stream.
+ * 
+ * @param <IN>
+ *            Input record type
+ */
+public class PrintSinkFunction<IN> extends RichSinkFunction<IN> {
+	private static final long serialVersionUID = 1L;
+
+	private static final boolean STD_OUT = false;
+	private static final boolean STD_ERR = true;
+	
+	private boolean target; 
+	private transient PrintStream stream;
+	private transient String prefix;
+	
+	/**
+	 * Instantiates a print sink function that prints to standard out.
+	 */
+	public PrintSinkFunction() {}
+	
+	/**
+	 * Instantiates a print sink function that prints to standard out.
+	 * 
+	 * @param stdErr True, if the format should print to standard error instead of standard out.
+	 */
+	public PrintSinkFunction(boolean stdErr) {
+		target = stdErr;
+	}
+
+	public void setTargetToStandardOut() {
+		target = STD_OUT;
+	}
+	
+	public void setTargetToStandardErr() {
+		target = STD_ERR;
+	}
+	
+	@Override
+	public void open(Configuration parameters) throws Exception {
+		super.open(parameters);
+		StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext();
+		// get the target stream
+		stream = target == STD_OUT ? System.out : System.err;
+		
+		// set the prefix if we have a >1 DOP
+		prefix = (context.getNumberOfParallelSubtasks() > 1) ? 
+				((context.getIndexOfThisSubtask() + 1) + "> ") : null;
+	}
+
+	@Override
+	public void invoke(IN record) {
+		if (prefix != null) {
+			stream.println(prefix + record.toString());
+		}
+		else {
+			stream.println(record.toString());
+		}
+	}
+	
+	@Override
+	public void close() throws Exception {
+		this.stream = null;
+		this.prefix = null;
+		super.close();
+	}
+	
+	@Override
+	public String toString() {
+		return "Print to " + (target == STD_OUT ? "System.out" : "System.err");
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b5ac6ec7/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingRuntimeContext.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingRuntimeContext.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingRuntimeContext.java
index 49cf15f..798724e 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingRuntimeContext.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingRuntimeContext.java
@@ -34,7 +34,7 @@ import org.apache.flink.streaming.state.OperatorState;
  */
 public class StreamingRuntimeContext extends RuntimeUDFContext {
 
-	private Environment env;
+	public Environment env;
 	private final Map<String, OperatorState<?>> operatorStates;
 
 	public StreamingRuntimeContext(String name, Environment env, ClassLoader userCodeClassLoader,