You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2015/11/24 16:31:11 UTC
[jira] [Commented] (FLINK-2622) Scala DataStream API does not have
writeAsText method which supports WriteMode
[ https://issues.apache.org/jira/browse/FLINK-2622?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15024680#comment-15024680 ]
ASF GitHub Bot commented on FLINK-2622:
---------------------------------------
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/1098#discussion_r45749794
--- Diff: flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala ---
@@ -717,23 +717,188 @@ class DataStream[T](javaStream: JavaStream[T]) {
* every element of the DataStream the result of .toString
* is written.
*
+ * @param path
+ * the path pointing to the location the text file is written to
+ *
+ * @param millis
+ * the file update frequency
+ *
+ * @return the closed DataStream
*/
def writeAsText(path: String, millis: Long = 0): DataStreamSink[T] =
javaStream.writeAsText(path, millis)
/**
+ * Writes a DataStream to the file specified by path in text format. For
+ * every element of the DataStream the result of .toString
+ * is written.
+ *
+ * @param path
+ * the path pointing to the location the text file is written to
+ *
+ * @param writeMode
+ * Controls the behavior for existing files. Options are
+ * NO_OVERWRITE and OVERWRITE.
+ *
+ * @return the closed DataStream
+ *
+ */
+ def writeAsText(
+ path: String,
+ writeMode: FileSystem.WriteMode): DataStreamSink[T] = {
+ if (writeMode != null) {
+ javaStream.writeAsText(path, writeMode)
+ } else {
+ javaStream.writeAsText(path)
+ }
+ }
+
+ /**
* Writes a DataStream to the file specified by path in text format. The
* writing is performed periodically, in every millis milliseconds. For
* every element of the DataStream the result of .toString
* is written.
*
+ * @param path
+ * the path pointing to the location the text file is written to
+ *
+ * @param writeMode
+ * Controls the behavior for existing files. Options are
+ * NO_OVERWRITE and OVERWRITE.
+ *
+ * @param millis
+ * the file update frequency
+ *
+ * @return the closed DataStream
+ *
+ */
+ def writeAsText(
+ path: String,
+ writeMode: FileSystem.WriteMode,
+ millis: Long): DataStreamSink[T] = {
+ if (writeMode != null) {
+ javaStream.writeAsText(path, writeMode, millis)
+ } else {
+ javaStream.writeAsText(path, millis)
+ }
+ }
+
+ /**
+ * Writes a DataStream to the file specified by path in csv format. The
+ * writing is performed periodically, in every millis milliseconds. For
+ * every element of the DataStream the result of .toString
+ * is written.
+ *
+ * @param path
+ * the path pointing to the location the text file is written to
+ *
+ * @param millis
+ * the file update frequency
+ *
+ * @return the closed DataStream
+ */
+ def writeAsCsv(
+ path: String,
+ millis: Long = 0): DataStreamSink[T] = {
+ require(javaStream.getType.isTupleType, "CSV output can only be used with Tuple DataSets.")
+ val of = new ScalaCsvOutputFormat[Product](
+ new Path(path),
+ ScalaCsvOutputFormat.DEFAULT_LINE_DELIMITER,
+ ScalaCsvOutputFormat.DEFAULT_FIELD_DELIMITER)
+ javaStream.write(of.asInstanceOf[OutputFormat[T]], millis)
+ }
+
+ /**
+ * Writes a DataStream to the file specified by path in csv format. For
+ * every element of the DataStream the result of .toString
+ * is written.
+ *
+ * @param path
+ * the path pointing to the location the text file is written to
+ *
+ * @param writeMode
+ * Controls the behavior for existing files. Options are
+ * NO_OVERWRITE and OVERWRITE.
+ *
+ * @return the closed DataStream
+ */
+ def writeAsCsv(
+ path: String,
+ writeMode: FileSystem.WriteMode): DataStreamSink[T] = {
+ require(javaStream.getType.isTupleType, "CSV output can only be used with Tuple DataSets.")
+ val of = new ScalaCsvOutputFormat[Product](
+ new Path(path),
+ ScalaCsvOutputFormat.DEFAULT_LINE_DELIMITER,
+ ScalaCsvOutputFormat.DEFAULT_FIELD_DELIMITER)
+ if (writeMode != null) {
+ of.setWriteMode(writeMode)
+ }
+ javaStream.write(of.asInstanceOf[OutputFormat[T]], 0L)
--- End diff --
Same here: Call the most generic method here.
> Scala DataStream API does not have writeAsText method which supports WriteMode
> ------------------------------------------------------------------------------
>
> Key: FLINK-2622
> URL: https://issues.apache.org/jira/browse/FLINK-2622
> Project: Flink
> Issue Type: Bug
> Components: Scala API, Streaming
> Reporter: Till Rohrmann
>
> The Scala DataStream API, unlike the Java DataStream API, does not support a {{writeAsText}} method which takes the {{WriteMode}} as a parameter. In order to make the two APIs consistent, it should be added to the Scala DataStream API.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)