You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2015/11/24 16:31:11 UTC

[jira] [Commented] (FLINK-2622) Scala DataStream API does not have writeAsText method which supports WriteMode

    [ https://issues.apache.org/jira/browse/FLINK-2622?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15024680#comment-15024680 ] 

ASF GitHub Bot commented on FLINK-2622:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1098#discussion_r45749794
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala ---
    @@ -717,23 +717,188 @@ class DataStream[T](javaStream: JavaStream[T]) {
        * every element of the DataStream the result of .toString
        * is written.
        *
    +   * @param path
    +   * the path pointing to the location the text file is written to
    +   *
    +   * @param millis
    +   * the file update frequency
    +   *
    +   * @return the closed DataStream
        */
       def writeAsText(path: String, millis: Long = 0): DataStreamSink[T] =
         javaStream.writeAsText(path, millis)
     
       /**
    +   * Writes a DataStream to the file specified by path in text format. For
    +   * every element of the DataStream the result of .toString
    +   * is written.
    +   *
    +   * @param path
    +   * the path pointing to the location the text file is written to
    +   *
    +   * @param writeMode
    +   * Controls the behavior for existing files. Options are
    +   * NO_OVERWRITE and OVERWRITE.
    +   *
    +   * @return the closed DataStream
    +   *
    +   */
    +  def writeAsText(
    +       path: String,
    +       writeMode: FileSystem.WriteMode): DataStreamSink[T] = {
    +    if (writeMode != null) {
    +      javaStream.writeAsText(path, writeMode)
    +    } else {
    +      javaStream.writeAsText(path)
    +    }
    +  }
    +
    +  /**
        * Writes a DataStream to the file specified by path in text format. The
        * writing is performed periodically, in every millis milliseconds. For
        * every element of the DataStream the result of .toString
        * is written.
        *
    +   * @param path
    +   * the path pointing to the location the text file is written to
    +   *
    +   * @param writeMode
    +   * Controls the behavior for existing files. Options are
    +   * NO_OVERWRITE and OVERWRITE.
    +   *
    +   * @param millis
    +   * the file update frequency
    +   *
    +   * @return the closed DataStream
    +   *
    +   */
    +  def writeAsText(
    +       path: String,
    +       writeMode: FileSystem.WriteMode,
    +       millis: Long): DataStreamSink[T] = {
    +    if (writeMode != null) {
    +      javaStream.writeAsText(path, writeMode, millis)
    +    } else {
    +      javaStream.writeAsText(path, millis)
    +    }
    +  }
    +
    +  /**
    +   * Writes a DataStream to the file specified by path in csv format. The
    +   * writing is performed periodically, in every millis milliseconds. For
    +   * every element of the DataStream the result of .toString
    +   * is written.
    +   *
    +   * @param path
    +   * the path pointing to the location the text file is written to
    +   *
    +   * @param millis
    +   * the file update frequency
    +   *
    +   * @return the closed DataStream
    +   */
    +  def writeAsCsv(
    +      path: String,
    +      millis: Long = 0): DataStreamSink[T] = {
    +    require(javaStream.getType.isTupleType, "CSV output can only be used with Tuple DataSets.")
    +    val of = new ScalaCsvOutputFormat[Product](
    +      new Path(path),
    +      ScalaCsvOutputFormat.DEFAULT_LINE_DELIMITER,
    +      ScalaCsvOutputFormat.DEFAULT_FIELD_DELIMITER)
    +    javaStream.write(of.asInstanceOf[OutputFormat[T]], millis)
    +  }
    +
    +  /**
    +   * Writes a DataStream to the file specified by path in csv format. For
    +   * every element of the DataStream the result of .toString
    +   * is written.
    +   *
    +   * @param path
    +   * the path pointing to the location the text file is written to
    +   *
    +   * @param writeMode
    +   * Controls the behavior for existing files. Options are
    +   * NO_OVERWRITE and OVERWRITE.
    +   *
    +   * @return the closed DataStream
    +   */
    +  def writeAsCsv(
    +      path: String,
    +      writeMode: FileSystem.WriteMode): DataStreamSink[T] = {
    +    require(javaStream.getType.isTupleType, "CSV output can only be used with Tuple DataSets.")
    +    val of = new ScalaCsvOutputFormat[Product](
    +      new Path(path),
    +      ScalaCsvOutputFormat.DEFAULT_LINE_DELIMITER,
    +      ScalaCsvOutputFormat.DEFAULT_FIELD_DELIMITER)
    +    if (writeMode != null) {
    +      of.setWriteMode(writeMode)
    +    }
    +    javaStream.write(of.asInstanceOf[OutputFormat[T]], 0L)
    --- End diff --
    
    Same here: Call the most generic method here.


> Scala DataStream API does not have writeAsText method which supports WriteMode
> ------------------------------------------------------------------------------
>
>                 Key: FLINK-2622
>                 URL: https://issues.apache.org/jira/browse/FLINK-2622
>             Project: Flink
>          Issue Type: Bug
>          Components: Scala API, Streaming
>            Reporter: Till Rohrmann
>
> The Scala DataStream API, unlike the Java DataStream API, does not support a {{writeAsText}} method which takes the {{WriteMode}} as a parameter. In order to make the two APIs consistent, it should be added to the Scala DataStream API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)