You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2014/12/17 13:22:29 UTC
[2/2] incubator-flink git commit: [streaming] DataStream print
functionality update
[streaming] DataStream print functionality update
PrintSinkFunction now explicitly states threads in output
Added printToErr functionality
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/b5ac6ec7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/b5ac6ec7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/b5ac6ec7
Branch: refs/heads/master
Commit: b5ac6ec7d97edcd224ac990c1f0314bc8acdfa9e
Parents: 61b023f
Author: mbalassi <mb...@apache.org>
Authored: Tue Dec 16 12:48:09 2014 +0100
Committer: mbalassi <mb...@apache.org>
Committed: Tue Dec 16 23:39:05 2014 +0100
----------------------------------------------------------------------
.../streaming/api/datastream/DataStream.java | 17 ++-
.../api/function/sink/PrintSinkFunction.java | 128 ++++++++++++++-----
.../streamvertex/StreamingRuntimeContext.java | 2 +-
3 files changed, 112 insertions(+), 35 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b5ac6ec7/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 978f5fa..3fc685a 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -780,7 +780,7 @@ public class DataStream<OUT> {
}
/**
- * Writes a DataStream to the standard output stream (stdout). For each
+ * Writes a DataStream to the standard output stream (stdout).<br> For each
* element of the DataStream the result of {@link Object#toString()} is
* written.
*
@@ -793,6 +793,21 @@ public class DataStream<OUT> {
return returnStream;
}
+
+ /**
+ * Writes a DataStream to the standard output stream (stderr).<br> For each
+ * element of the DataStream the result of {@link Object#toString()} is
+ * written.
+ *
+ * @return The closed DataStream.
+ */
+ public DataStreamSink<OUT> printToErr() {
+ DataStream<OUT> inputStream = this.copy();
+ PrintSinkFunction<OUT> printFunction = new PrintSinkFunction<OUT>(true);
+ DataStreamSink<OUT> returnStream = addSink(inputStream, printFunction, getType());
+
+ return returnStream;
+ }
/**
* Writes a DataStream to the file specified by path in text format. For
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b5ac6ec7/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/PrintSinkFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/PrintSinkFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/PrintSinkFunction.java
index fc75da7..d460749 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/PrintSinkFunction.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/PrintSinkFunction.java
@@ -1,36 +1,98 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
* limitations under the License.
- */
-
-package org.apache.flink.streaming.api.function.sink;
-
-
-/**
- * Dummy implementation of the SinkFunction writing every tuple to the standard
- * output. Used for print.
- *
- * @param <IN>
- * Input tuple type
- */
-public class PrintSinkFunction<IN> implements SinkFunction<IN> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void invoke(IN tuple) {
- System.out.println(tuple);
- }
-
+ */
+
+package org.apache.flink.streaming.api.function.sink;
+
+import java.io.PrintStream;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.streamvertex.StreamingRuntimeContext;
+
+/**
+ * Implementation of the SinkFunction writing every tuple to the standard
+ * output or standard error stream.
+ *
+ * @param <IN>
+ * Input record type
+ */
+public class PrintSinkFunction<IN> extends RichSinkFunction<IN> {
+ private static final long serialVersionUID = 1L;
+
+ private static final boolean STD_OUT = false;
+ private static final boolean STD_ERR = true;
+
+ private boolean target;
+ private transient PrintStream stream;
+ private transient String prefix;
+
+ /**
+ * Instantiates a print sink function that prints to standard out.
+ */
+ public PrintSinkFunction() {}
+
+ /**
+ * Instantiates a print sink function that prints to standard out.
+ *
+ * @param stdErr True, if the format should print to standard error instead of standard out.
+ */
+ public PrintSinkFunction(boolean stdErr) {
+ target = stdErr;
+ }
+
+ public void setTargetToStandardOut() {
+ target = STD_OUT;
+ }
+
+ public void setTargetToStandardErr() {
+ target = STD_ERR;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext();
+ // get the target stream
+ stream = target == STD_OUT ? System.out : System.err;
+
+ // set the prefix if we have a >1 DOP
+ prefix = (context.getNumberOfParallelSubtasks() > 1) ?
+ ((context.getIndexOfThisSubtask() + 1) + "> ") : null;
+ }
+
+ @Override
+ public void invoke(IN record) {
+ if (prefix != null) {
+ stream.println(prefix + record.toString());
+ }
+ else {
+ stream.println(record.toString());
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ this.stream = null;
+ this.prefix = null;
+ super.close();
+ }
+
+ @Override
+ public String toString() {
+ return "Print to " + (target == STD_OUT ? "System.out" : "System.err");
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b5ac6ec7/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingRuntimeContext.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingRuntimeContext.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingRuntimeContext.java
index 49cf15f..798724e 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingRuntimeContext.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingRuntimeContext.java
@@ -34,7 +34,7 @@ import org.apache.flink.streaming.state.OperatorState;
*/
public class StreamingRuntimeContext extends RuntimeUDFContext {
- private Environment env;
+ public Environment env;
private final Map<String, OperatorState<?>> operatorStates;
public StreamingRuntimeContext(String name, Environment env, ClassLoader userCodeClassLoader,