You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2015/06/03 20:55:16 UTC
flink git commit: [FLINK-2069] Fix Scala CSV Output Format
Repository: flink
Updated Branches:
refs/heads/master 39ec54ff1 -> a750415b6
[FLINK-2069] Fix Scala CSV Output Format
Closes #759
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a750415b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a750415b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a750415b
Branch: refs/heads/master
Commit: a750415b60a8f5bb150378fc49b5985c4ccb8c57
Parents: 39ec54f
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Tue Jun 2 16:37:06 2015 +0200
Committer: mbalassi <mb...@apache.org>
Committed: Wed Jun 3 17:05:01 2015 +0200
----------------------------------------------------------------------
.../streaming/api/datastream/DataStream.java | 11 +++++-
.../api/functions/sink/FileSinkFunction.java | 1 +
.../flink/streaming/api/scala/DataStream.scala | 35 ++++++++++++++++----
3 files changed, 40 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/a750415b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 1ec440d..e0a8a7a 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -1230,7 +1230,16 @@ public class DataStream<OUT> {
return returnStream;
}
- private DataStreamSink<OUT> writeToFile(OutputFormat<OUT> format, long millis) {
+ /**
+ * Writes a DataStream using the given {@link OutputFormat}. The
+ * writing is performed periodically, in every millis milliseconds.
+ *
+ * @param format The output format that should be used for writing.
+ * @param millis the file update frequency
+ *
+ * @return the closed DataStream
+ */
+ public DataStreamSink<OUT> writeToFile(OutputFormat<OUT> format, long millis) {
DataStreamSink<OUT> returnStream = addSink(new FileSinkFunctionByMillis<OUT>(format, millis));
return returnStream;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a750415b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/FileSinkFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/FileSinkFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/FileSinkFunction.java
index 43ee2a7..1cf5c07 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/FileSinkFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/FileSinkFunction.java
@@ -105,6 +105,7 @@ public abstract class FileSinkFunction<IN> extends RichSinkFunction<IN> {
} catch (Throwable t) {
LOG.error("Cleanup on error failed.", t);
}
+ throw new RuntimeException(ex);
}
resetParameters();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a750415b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index f32d0cc..3b2183b 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -18,6 +18,10 @@
package org.apache.flink.streaming.api.scala
+import org.apache.flink.api.common.io.OutputFormat
+import org.apache.flink.api.scala.operators.ScalaCsvOutputFormat
+import org.apache.flink.core.fs.{FileSystem, Path}
+
import scala.collection.JavaConverters._
import scala.reflect.ClassTag
@@ -27,10 +31,10 @@ import org.apache.flink.api.java.functions.KeySelector
import org.apache.flink.api.java.typeutils.TupleTypeInfoBase
import org.apache.flink.api.streaming.scala.ScalaStreamingAggregator
import org.apache.flink.streaming.api.collector.selector.OutputSelector
-import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream, GroupedDataStream, SingleOutputStreamOperator}
+import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream, DataStreamSink, GroupedDataStream, SingleOutputStreamOperator}
import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction.AggregationType
import org.apache.flink.streaming.api.functions.aggregation.SumFunction
-import org.apache.flink.streaming.api.functions.sink.SinkFunction
+import org.apache.flink.streaming.api.functions.sink.{FileSinkFunctionByMillis, SinkFunction}
import org.apache.flink.streaming.api.operators.{StreamGroupedReduce, StreamReduce}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.clean
import org.apache.flink.streaming.api.windowing.helper.WindowingHelper
@@ -728,8 +732,27 @@ class DataStream[T](javaStream: JavaStream[T]) {
* is written.
*
*/
- def writeAsCsv(path: String, millis: Long = 0): DataStream[T] =
- javaStream.writeAsCsv(path, millis)
+ def writeAsCsv(
+ path: String,
+ millis: Long = 0,
+ rowDelimiter: String = ScalaCsvOutputFormat.DEFAULT_LINE_DELIMITER,
+ fieldDelimiter: String = ScalaCsvOutputFormat.DEFAULT_FIELD_DELIMITER,
+ writeMode: FileSystem.WriteMode = null): DataStream[T] = {
+ require(javaStream.getType.isTupleType, "CSV output can only be used with Tuple DataSets.")
+ val of = new ScalaCsvOutputFormat[Product](new Path(path), rowDelimiter, fieldDelimiter)
+ if (writeMode != null) {
+ of.setWriteMode(writeMode)
+ }
+ javaStream.writeToFile(of.asInstanceOf[OutputFormat[T]], millis)
+ }
+
+ /**
+ * Writes a DataStream using the given [[OutputFormat]]. The
+ * writing is performed periodically, in every millis milliseconds.
+ */
+ def writeToFile(format: OutputFormat[T], millis: Long): DataStreamSink[T] = {
+ javaStream.writeToFile(format, millis)
+ }
/**
* Writes the DataStream to a socket as a byte array. The format of the output is
@@ -744,8 +767,8 @@ class DataStream[T](javaStream: JavaStream[T]) {
* method is called.
*
*/
- def addSink(sinkFuntion: SinkFunction[T]): DataStream[T] =
- javaStream.addSink(sinkFuntion)
+ def addSink(sinkFunction: SinkFunction[T]): DataStream[T] =
+ javaStream.addSink(sinkFunction)
/**
* Adds the given sink to this DataStream. Only streams with sinks added