You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2015/06/03 20:55:16 UTC

flink git commit: [FLINK-2069] Fix Scala CSV Output Format

Repository: flink
Updated Branches:
  refs/heads/master 39ec54ff1 -> a750415b6


[FLINK-2069] Fix Scala CSV Output Format

Closes #759


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a750415b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a750415b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a750415b

Branch: refs/heads/master
Commit: a750415b60a8f5bb150378fc49b5985c4ccb8c57
Parents: 39ec54f
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Tue Jun 2 16:37:06 2015 +0200
Committer: mbalassi <mb...@apache.org>
Committed: Wed Jun 3 17:05:01 2015 +0200

----------------------------------------------------------------------
 .../streaming/api/datastream/DataStream.java    | 11 +++++-
 .../api/functions/sink/FileSinkFunction.java    |  1 +
 .../flink/streaming/api/scala/DataStream.scala  | 35 ++++++++++++++++----
 3 files changed, 40 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a750415b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 1ec440d..e0a8a7a 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -1230,7 +1230,16 @@ public class DataStream<OUT> {
 		return returnStream;
 	}
 
-	private DataStreamSink<OUT> writeToFile(OutputFormat<OUT> format, long millis) {
+	/**
+	 * Writes a DataStream using the given {@link OutputFormat}. The
+	 * writing is performed periodically, in every millis milliseconds.
+	 *
+	 * @param format The output format that should be used for writing.
+	 * @param millis the file update frequency
+	 *
+	 * @return the closed DataStream
+	 */
+	public DataStreamSink<OUT> writeToFile(OutputFormat<OUT> format, long millis) {
 		DataStreamSink<OUT> returnStream = addSink(new FileSinkFunctionByMillis<OUT>(format, millis));
 		return returnStream;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/a750415b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/FileSinkFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/FileSinkFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/FileSinkFunction.java
index 43ee2a7..1cf5c07 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/FileSinkFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/FileSinkFunction.java
@@ -105,6 +105,7 @@ public abstract class FileSinkFunction<IN> extends RichSinkFunction<IN> {
 			} catch (Throwable t) {
 				LOG.error("Cleanup on error failed.", t);
 			}
+			throw new RuntimeException(ex);
 		}
 		resetParameters();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/a750415b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index f32d0cc..3b2183b 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -18,6 +18,10 @@
 
 package org.apache.flink.streaming.api.scala
 
+import org.apache.flink.api.common.io.OutputFormat
+import org.apache.flink.api.scala.operators.ScalaCsvOutputFormat
+import org.apache.flink.core.fs.{FileSystem, Path}
+
 import scala.collection.JavaConverters._
 import scala.reflect.ClassTag
 
@@ -27,10 +31,10 @@ import org.apache.flink.api.java.functions.KeySelector
 import org.apache.flink.api.java.typeutils.TupleTypeInfoBase
 import org.apache.flink.api.streaming.scala.ScalaStreamingAggregator
 import org.apache.flink.streaming.api.collector.selector.OutputSelector
-import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream, GroupedDataStream, SingleOutputStreamOperator}
+import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream, DataStreamSink, GroupedDataStream, SingleOutputStreamOperator}
 import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction.AggregationType
 import org.apache.flink.streaming.api.functions.aggregation.SumFunction
-import org.apache.flink.streaming.api.functions.sink.SinkFunction
+import org.apache.flink.streaming.api.functions.sink.{FileSinkFunctionByMillis, SinkFunction}
 import org.apache.flink.streaming.api.operators.{StreamGroupedReduce, StreamReduce}
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.clean
 import org.apache.flink.streaming.api.windowing.helper.WindowingHelper
@@ -728,8 +732,27 @@ class DataStream[T](javaStream: JavaStream[T]) {
    * is written.
    *
    */
-  def writeAsCsv(path: String, millis: Long = 0): DataStream[T] =
-    javaStream.writeAsCsv(path, millis)
+  def writeAsCsv(
+      path: String,
+      millis: Long = 0,
+      rowDelimiter: String = ScalaCsvOutputFormat.DEFAULT_LINE_DELIMITER,
+      fieldDelimiter: String = ScalaCsvOutputFormat.DEFAULT_FIELD_DELIMITER,
+      writeMode: FileSystem.WriteMode = null): DataStream[T] = {
+    require(javaStream.getType.isTupleType, "CSV output can only be used with Tuple DataSets.")
+    val of = new ScalaCsvOutputFormat[Product](new Path(path), rowDelimiter, fieldDelimiter)
+    if (writeMode != null) {
+      of.setWriteMode(writeMode)
+    }
+    javaStream.writeToFile(of.asInstanceOf[OutputFormat[T]], millis)
+  }
+
+  /**
+   * Writes a DataStream using the given [[OutputFormat]]. The
+   * writing is performed periodically, in every millis milliseconds.
+   */
+  def writeToFile(format: OutputFormat[T], millis: Long): DataStreamSink[T] = {
+    javaStream.writeToFile(format, millis)
+  }
 
   /**
    * Writes the DataStream to a socket as a byte array. The format of the output is
@@ -744,8 +767,8 @@ class DataStream[T](javaStream: JavaStream[T]) {
    * method is called.
    *
    */
-  def addSink(sinkFuntion: SinkFunction[T]): DataStream[T] =
-    javaStream.addSink(sinkFuntion)
+  def addSink(sinkFunction: SinkFunction[T]): DataStream[T] =
+    javaStream.addSink(sinkFunction)
 
   /**
    * Adds the given sink to this DataStream. Only streams with sinks added