You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by marmbrus <gi...@git.apache.org> on 2016/03/22 20:04:02 UTC

[GitHub] spark pull request: [SPARK-14078] Streaming Parquet Based FileSink

GitHub user marmbrus opened a pull request:

    https://github.com/apache/spark/pull/11897

    [SPARK-14078] Streaming Parquet Based FileSink

    This PR adds a new `Sink` implementation that writes out Parquet files.  In order to correctly handle partial failures while maintaining exactly once semantics, the files for each batch are written out to a unique directory and then atomically appended to a metadata log.  When a parquet based `DataSource` is initialized for reading, we first check for this log directory and use it instead of file listing when present.
    
    Unit tests are added, as well as a stress test that checks the answer after non-deterministic injected failures.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/marmbrus/spark fileSink

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/11897.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #11897
    
----
commit e861a6837382eaddd2dbd85d01cbc99c0bcd06fa
Author: Michael Armbrust <mi...@databricks.com>
Date:   2016-03-22T05:31:21Z

    WIP

commit 8e8e4c60a8b6a2289de5867767e1d8ebd21d32ba
Author: Michael Armbrust <mi...@databricks.com>
Date:   2016-03-22T18:26:13Z

    cleanup

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-14078] Streaming Parquet Based FileSink

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11897#discussion_r57087494
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala ---
    @@ -27,6 +27,8 @@ import org.apache.spark.sql.test.SharedSQLContext
     
     class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext {
     
    +  private implicit def toOption[A](a: A): Option[A] = Option(a)
    --- End diff --
    
    it (or some other change) is required for compilation


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-14078] Streaming Parquet Based FileSink

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/11897


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-14078] Streaming Parquet Based FileSink

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11897#discussion_r57087119
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala ---
    @@ -27,6 +27,8 @@ import org.apache.spark.sql.test.SharedSQLContext
     
     class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext {
     
    +  private implicit def toOption[A](a: A): Option[A] = Option(a)
    --- End diff --
    
    nit: remove it


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-14078] Streaming Parquet Based FileSink

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/11897#issuecomment-200074914
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/53807/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-14078] Streaming Parquet Based FileSink

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/11897#issuecomment-200087081
  
    **[Test build #53847 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/53847/consoleFull)** for PR 11897 at commit [`e821f2f`](https://github.com/apache/spark/commit/e821f2f558fc3426a13e89598eb1a3eb887daeff).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-14078] Streaming Parquet Based FileSink

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/11897#issuecomment-200074908
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-14078] Streaming Parquet Based FileSink

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11897#discussion_r57210628
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala ---
    @@ -114,18 +114,24 @@ class FileStreamSource(
         val endId = end.asInstanceOf[LongOffset].offset
     
         assert(startId <= endId)
    -    val files = metadataLog.get(Some(startId + 1), endId).map(_._2).flatten
    -    logDebug(s"Return files from batches ${startId + 1}:$endId")
    +    val files = metadataLog.get(Some(startId + 1), Some(endId)).map(_._2).flatten
    +    logInfo(s"Processing ${files.length} files from ${startId + 1}:$endId")
         logDebug(s"Streaming ${files.mkString(", ")}")
         dataFrameBuilder(files)
     
       }
     
       private def fetchAllFiles(): Seq[String] = {
    -    fs.listStatus(new Path(path))
    +    val startTime = System.nanoTime()
    +    val files = fs.listStatus(new Path(path))
           .filterNot(_.getPath.getName.startsWith("_"))
           .map(_.getPath.toUri.toString)
    +    val endTime = System.nanoTime()
    +    logDebug(s"Listed ${files.size} in ${(endTime.toDouble - startTime) / 1000000}ms")
    +    files
       }
     
       override def getOffset: Option[Offset] = Some(fetchMaxOffset()).filterNot(_.offset == -1)
    +
    +  override def toString: String = s"FileSink[$path]"
    --- End diff --
    
    Haha, yes.  I'll fix this in a follow up.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-14078] Streaming Parquet Based FileSink

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11897#discussion_r57089301
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStressSuite.scala ---
    @@ -0,0 +1,128 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.streaming
    +
    +import java.io.File
    +import java.util.UUID
    +
    +import scala.util.Random
    +import scala.util.control.NonFatal
    +
    +import org.apache.spark.sql.{ContinuousQuery, ContinuousQueryException, StreamTest}
    +import org.apache.spark.sql.catalyst.util._
    +import org.apache.spark.sql.test.SharedSQLContext
    +import org.apache.spark.util.Utils
    +
    +/**
    + * A stress test for streamign queries that read and write files.  This test constists of
    + * two threads:
    + *  - one that writes out `numRecords` distinct integers to files of random sizes (the total
    + *    number of records is fixed but each files size / creation time is random).
    + *  - another that continually restarts a buggy streaming query (i.e. fails with 5% probability on
    + *    any partition).
    + *
    + * At the end, the resulting files are loaded and the answer is checked.
    + */
    +class FileStressSuite extends StreamTest with SharedSQLContext {
    +  import testImplicits._
    +
    +  test("fault tolerance stress test") {
    +    val numRecords = 10000
    +    val inputDir = Utils.createTempDir("stream.input").getCanonicalPath
    +    val stagingDir = Utils.createTempDir("stream.staging").getCanonicalPath
    +    val outputDir = Utils.createTempDir("stream.output").getCanonicalPath
    +    val checkpoint = Utils.createTempDir("stream.checkpoint").getCanonicalPath
    +
    +    @volatile
    +    var continue = true
    +    var stream: ContinuousQuery = null
    --- End diff --
    
    nit: stream should be volatile


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-14078] Streaming Parquet Based FileSink

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/11897#issuecomment-200130642
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/53847/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-14078] Streaming Parquet Based FileSink

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11897#discussion_r57088463
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala ---
    @@ -0,0 +1,49 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.streaming
    +
    +import org.apache.spark.sql.StreamTest
    +import org.apache.spark.sql.execution.streaming.MemoryStream
    +import org.apache.spark.sql.test.SharedSQLContext
    +import org.apache.spark.util.Utils
    +
    +class FileStreamSinkSuite extends StreamTest with SharedSQLContext {
    +  import testImplicits._
    +
    +  test("unpartitioned writing") {
    +    val inputData = MemoryStream[Int]
    +    val df = inputData.toDF()
    +
    +    val outputDir = Utils.createTempDir("stream.output").getCanonicalPath
    +    val checkpointDir = Utils.createTempDir("stream.checkpoint").getCanonicalPath
    +
    +    val query =
    +      df.write
    +        .format("parquet")
    +        .option("checkpointLocation", checkpointDir)
    +        .startStream(outputDir)
    +
    +    inputData.addData(1, 2, 3)
    +    failAfter(streamingTimeout) { query.processAllAvailable() }
    --- End diff --
    
    Never mind. I just realized it will continue to set `noNewData` to true


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-14078] Streaming Parquet Based FileSink

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on the pull request:

    https://github.com/apache/spark/pull/11897#issuecomment-200082783
  
    Looks pretty good. Just some nits.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-14078] Streaming Parquet Based FileSink

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/11897#issuecomment-200130638
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-14078] Streaming Parquet Based FileSink

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/11897#issuecomment-200074612
  
    **[Test build #53807 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/53807/consoleFull)** for PR 11897 at commit [`8e8e4c6`](https://github.com/apache/spark/commit/8e8e4c60a8b6a2289de5867767e1d8ebd21d32ba).
     * This patch **fails from timeout after a configured wait of \`250m\`**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class StreamFileCatalog(sqlContext: SQLContext, path: Path) extends FileCatalog with Logging `
      * `trait FileFormat `


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-14078] Streaming Parquet Based FileSink

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11897#discussion_r57086874
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala ---
    @@ -0,0 +1,79 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming
    +
    +import java.util.UUID
    +
    +import org.apache.hadoop.fs.Path
    +
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.sql.{DataFrame, SQLContext}
    +import org.apache.spark.sql.sources.FileFormat
    +
    +object FileStreamSink {
    +  // The name of the subdirectory that is used to store metadata about which files are valid.
    +  val metadataDir = "_spark_metadata"
    +}
    +
    +/**
    + * A sink that writes out results to parquet files.  Each batch is written out to a unique
    + * directory. After all of the files in a batch have been succesfully written, the list of
    + * file paths is appended to the log atomically. In the case of partial failures, some duplicate
    + * data may be present in the target directory, but only one copy of each file will be present
    + * in the log.
    + */
    +class FileStreamSink(
    +    sqlContext: SQLContext,
    +    path: String,
    +    fileFormat: FileFormat) extends Sink with Logging {
    +
    +  val basePath = new Path(path)
    +  val logPath = new Path(basePath, FileStreamSink.metadataDir)
    +  val fileLog = new HDFSMetadataLog[Seq[String]](sqlContext, logPath.toUri.toString)
    --- End diff --
    
    nit: private


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-14078] Streaming Parquet Based FileSink

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11897#discussion_r57086868
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala ---
    @@ -0,0 +1,79 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming
    +
    +import java.util.UUID
    +
    +import org.apache.hadoop.fs.Path
    +
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.sql.{DataFrame, SQLContext}
    +import org.apache.spark.sql.sources.FileFormat
    +
    +object FileStreamSink {
    +  // The name of the subdirectory that is used to store metadata about which files are valid.
    +  val metadataDir = "_spark_metadata"
    +}
    +
    +/**
    + * A sink that writes out results to parquet files.  Each batch is written out to a unique
    + * directory. After all of the files in a batch have been succesfully written, the list of
    + * file paths is appended to the log atomically. In the case of partial failures, some duplicate
    + * data may be present in the target directory, but only one copy of each file will be present
    + * in the log.
    + */
    +class FileStreamSink(
    +    sqlContext: SQLContext,
    +    path: String,
    +    fileFormat: FileFormat) extends Sink with Logging {
    +
    +  val basePath = new Path(path)
    +  val logPath = new Path(basePath, FileStreamSink.metadataDir)
    --- End diff --
    
    nit: private


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-14078] Streaming Parquet Based FileSink

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11897#discussion_r57088232
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala ---
    @@ -0,0 +1,49 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.streaming
    +
    +import org.apache.spark.sql.StreamTest
    +import org.apache.spark.sql.execution.streaming.MemoryStream
    +import org.apache.spark.sql.test.SharedSQLContext
    +import org.apache.spark.util.Utils
    +
    +class FileStreamSinkSuite extends StreamTest with SharedSQLContext {
    +  import testImplicits._
    +
    +  test("unpartitioned writing") {
    +    val inputData = MemoryStream[Int]
    +    val df = inputData.toDF()
    +
    +    val outputDir = Utils.createTempDir("stream.output").getCanonicalPath
    +    val checkpointDir = Utils.createTempDir("stream.checkpoint").getCanonicalPath
    +
    +    val query =
    +      df.write
    +        .format("parquet")
    +        .option("checkpointLocation", checkpointDir)
    +        .startStream(outputDir)
    +
    +    inputData.addData(1, 2, 3)
    +    failAfter(streamingTimeout) { query.processAllAvailable() }
    --- End diff --
    
    There is a race condition here: `noNewData` may become `true` before `processAllAvailable`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-14078] Streaming Parquet Based FileSink

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/11897#issuecomment-199967158
  
    **[Test build #53807 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/53807/consoleFull)** for PR 11897 at commit [`8e8e4c6`](https://github.com/apache/spark/commit/8e8e4c60a8b6a2289de5867767e1d8ebd21d32ba).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-14078] Streaming Parquet Based FileSink

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/11897#issuecomment-200117051
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-14078] Streaming Parquet Based FileSink

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11897#discussion_r57086859
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala ---
    @@ -0,0 +1,79 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming
    +
    +import java.util.UUID
    +
    +import org.apache.hadoop.fs.Path
    +
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.sql.{DataFrame, SQLContext}
    +import org.apache.spark.sql.sources.FileFormat
    +
    +object FileStreamSink {
    +  // The name of the subdirectory that is used to store metadata about which files are valid.
    +  val metadataDir = "_spark_metadata"
    +}
    +
    +/**
    + * A sink that writes out results to parquet files.  Each batch is written out to a unique
    + * directory. After all of the files in a batch have been succesfully written, the list of
    + * file paths is appended to the log atomically. In the case of partial failures, some duplicate
    + * data may be present in the target directory, but only one copy of each file will be present
    + * in the log.
    + */
    +class FileStreamSink(
    +    sqlContext: SQLContext,
    +    path: String,
    +    fileFormat: FileFormat) extends Sink with Logging {
    +
    +  val basePath = new Path(path)
    --- End diff --
    
    nit: private


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-14078] Streaming Parquet Based FileSink

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11897#discussion_r57086227
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryException.scala ---
    @@ -32,12 +32,12 @@ import org.apache.spark.sql.execution.streaming.{Offset, StreamExecution}
      */
     @Experimental
     class ContinuousQueryException private[sql](
    -    val query: ContinuousQuery,
    +    @transient val query: ContinuousQuery,
         val message: String,
         val cause: Throwable,
         val startOffset: Option[Offset] = None,
         val endOffset: Option[Offset] = None
    -  ) extends Exception(message, cause) {
    +  ) extends Exception(message, cause) with Serializable {
    --- End diff --
    
    nit: Exception has already extended `Serializable`. Not need to add it again.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-14078] Streaming Parquet Based FileSink

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/11897#issuecomment-200115565
  
    **[Test build #53841 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/53841/consoleFull)** for PR 11897 at commit [`7da7c7e`](https://github.com/apache/spark/commit/7da7c7eb455d065a67a5f6143ce8ca99102f3742).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-14078] Streaming Parquet Based FileSink

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/11897#issuecomment-200129448
  
    **[Test build #53847 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/53847/consoleFull)** for PR 11897 at commit [`e821f2f`](https://github.com/apache/spark/commit/e821f2f558fc3426a13e89598eb1a3eb887daeff).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-14078] Streaming Parquet Based FileSink

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11897#discussion_r57102528
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala ---
    @@ -114,18 +114,24 @@ class FileStreamSource(
         val endId = end.asInstanceOf[LongOffset].offset
     
         assert(startId <= endId)
    -    val files = metadataLog.get(Some(startId + 1), endId).map(_._2).flatten
    -    logDebug(s"Return files from batches ${startId + 1}:$endId")
    +    val files = metadataLog.get(Some(startId + 1), Some(endId)).map(_._2).flatten
    +    logInfo(s"Processing ${files.length} files from ${startId + 1}:$endId")
         logDebug(s"Streaming ${files.mkString(", ")}")
         dataFrameBuilder(files)
     
       }
     
       private def fetchAllFiles(): Seq[String] = {
    -    fs.listStatus(new Path(path))
    +    val startTime = System.nanoTime()
    +    val files = fs.listStatus(new Path(path))
           .filterNot(_.getPath.getName.startsWith("_"))
           .map(_.getPath.toUri.toString)
    +    val endTime = System.nanoTime()
    +    logDebug(s"Listed ${files.size} in ${(endTime.toDouble - startTime) / 1000000}ms")
    +    files
       }
     
       override def getOffset: Option[Offset] = Some(fetchMaxOffset()).filterNot(_.offset == -1)
    +
    +  override def toString: String = s"FileSink[$path]"
    --- End diff --
    
    This is the file source :D


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-14078] Streaming Parquet Based FileSink

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/11897#issuecomment-200117057
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/53841/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-14078] Streaming Parquet Based FileSink

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/11897#issuecomment-200079104
  
    **[Test build #53841 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/53841/consoleFull)** for PR 11897 at commit [`7da7c7e`](https://github.com/apache/spark/commit/7da7c7eb455d065a67a5f6143ce8ca99102f3742).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org