You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by marmbrus <gi...@git.apache.org> on 2016/02/01 20:08:14 UTC

[GitHub] spark pull request: [SPARK-10820][SQL] Support for the continuous ...

GitHub user marmbrus opened a pull request:

    https://github.com/apache/spark/pull/11006

    [SPARK-10820][SQL] Support for the continuous execution of structured queries

    This is a follow up to 9aadcffabd226557174f3ff566927f873c71672e that extends Spark SQL to allow users to _repeatedly_ optimize and execute structured queries.  A `ContinuousQuery` can be expressed using SQL, DataFrames or Datasets.  The purpose of this PR is only to add some initial infrastructure which will be extended in subsequent PRs.
    
    ## User-facing API
    
    - `sqlContext.streamFrom` and `df.streamTo` return builder objects that are analogous to the `read/write` interfaces already available to executing queries in a batch-oriented fashion.
    - `ContinuousQuery` provides an interface for interacting with a query that is currently executing in the background.
    
    ## Internal Interfaces
     - `StreamExecution` - executes streaming queries in micro-batches
    
    The following are currently internal, but public APIs will be provided in a future release.
     - `Source` - an interface for providers of continually arriving data.  A source must have a notion of an `Offset` that monotonically tracks what data has arrived.  For fault tolerance, a source must be able to replay data given a start offset.
     - `Sink` - an interface that accepts the results of a continuously executing query.  Also responsible for tracking the offset that should be resumed from in the case of a failure.
    
    ## Testing
     - `MemoryStream` and `MemorySink` - simple implementations of source and sink that keep all data in memory and have methods for simulating durability failures
     - `StreamTest` - a framework for performing actions and checking invariants on a continuous query

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/marmbrus/spark structured-streaming

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/11006.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #11006
    
----
commit e238911f494c2db41e36c9cde9dd0b733630b36f
Author: Michael Armbrust <mi...@databricks.com>
Date:   2015-12-10T02:51:06Z

    first draft

commit d2706b511f254feff7d4558550e403215ffb795f
Author: Michael Armbrust <mi...@databricks.com>
Date:   2015-12-11T07:49:33Z

    working on state

commit 7a3590fe5dd659d1a47e645eec247e21280a3373
Author: Michael Armbrust <mi...@databricks.com>
Date:   2015-12-11T19:26:40Z

    working on stateful streaming

commit c8a923831bbfc24f71eb744e36a15e432d6ae067
Author: Michael Armbrust <mi...@databricks.com>
Date:   2015-12-12T00:34:19Z

    now with event time windows

commit dddd192b7fb9efc59f22009adf61a432b5bafc19
Author: Michael Armbrust <mi...@databricks.com>
Date:   2015-12-13T03:16:48Z

    some refactoring after talking to ali

commit a0a1e7bbd85d0de3f6d90793b894a8b00f86a7f0
Author: Michael Armbrust <mi...@databricks.com>
Date:   2015-12-15T00:47:55Z

    docs

commit 15bed31800319547eb616c0cc22c564bf967b7f1
Author: Michael Armbrust <mi...@databricks.com>
Date:   2015-12-15T18:04:42Z

    start kinesis

commit 89464a91e3954f30b68a1f633e2b9c062e08fa2c
Author: Michael Armbrust <mi...@databricks.com>
Date:   2015-12-17T02:18:35Z

    some renaming

commit d133dbbed3d3ff7146129cd6532b1f26f2201315
Author: Michael Armbrust <mi...@databricks.com>
Date:   2015-12-28T00:01:57Z

    WIP: file source

commit e3c4c8301fdcfaaa0bd56ed92a81e4e1d2db64a8
Author: Michael Armbrust <mi...@databricks.com>
Date:   2016-01-05T05:39:40Z

    Merge remote-tracking branch 'origin/master' into streaming-infra
    
    Conflicts:
    	sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala

commit 90fa6d30f0bf70a7c0c4bfe0889a07cbdddbf203
Author: Michael Armbrust <mi...@databricks.com>
Date:   2016-01-05T05:52:43Z

    remove half-baked stateful implementation

commit b1c1dc6ead2c76589e48f4c28ef02e6c4361e549
Author: Michael Armbrust <mi...@databricks.com>
Date:   2016-01-05T06:36:46Z

    cleanup

commit 92050688699b4c509f5ec5c2b636797cb5ae3c89
Author: Michael Armbrust <mi...@databricks.com>
Date:   2016-01-05T23:32:34Z

    more docs

commit 7a59f00b4f0d238c33d3250fcfe162d7c2c27c18
Author: Josh Rosen <ro...@gmail.com>
Date:   2016-01-06T00:00:48Z

    Add circle.yml file.

commit eab186d5780b2946a32041ca17d80323b052efc6
Author: Michael Armbrust <mi...@databricks.com>
Date:   2016-01-06T00:48:42Z

    rollback changes

commit 20750630bd50bf7d0abad4a23b2a717705115e46
Author: Josh Rosen <ro...@gmail.com>
Date:   2016-01-06T00:56:16Z

    Try using cached resolution to speed up compilation

commit dabc102271e09bae2ddca1366deb13b20c10ded3
Author: Josh Rosen <ro...@gmail.com>
Date:   2016-01-06T00:59:10Z

    Use assembly/assembly.

commit 4630a87751bad6660d12645a5a7e56ea39dad48b
Author: Josh Rosen <ro...@gmail.com>
Date:   2016-01-06T01:10:02Z

    Add test:compile so that test dependencies are also cached.

commit cd575db5bb4ab1e954f80a3b0d044ab492394443
Author: Michael Armbrust <mi...@databricks.com>
Date:   2016-01-06T01:10:23Z

    some feedback

commit 03f7c5daeb1cc9ede78651a7722923dbfadc868c
Author: Josh Rosen <ro...@gmail.com>
Date:   2016-01-06T01:34:14Z

    Disable cached resolution.

commit 0760d16cfde55382d412ad002f019ceb848a6188
Author: Josh Rosen <ro...@gmail.com>
Date:   2016-01-06T02:20:48Z

    Run assembly and test in separate commands

commit 90125826d744a2fdff4743cd33e5d9ce531aab1d
Author: Michael Armbrust <mi...@databricks.com>
Date:   2016-01-06T03:44:13Z

    Merge pull request #22 from marmbrus/circle-ci
    
    Add circle.yml file for configuring CircleCI

commit d233eaed9b9ec98a8155039a46d6e8b6b8a4d6ea
Author: Michael Armbrust <mi...@databricks.com>
Date:   2016-01-06T04:41:13Z

    comments

commit 3423dce332989c7997b1137d2a708fc6d75ec244
Author: Michael Armbrust <mi...@databricks.com>
Date:   2016-01-06T05:13:06Z

    Merge remote-tracking branch 'marmbrus/streaming-df' into streaming-infra

commit b0b20e50d4f712763a7f6edea8af7a39edeffe33
Author: Michael Armbrust <mi...@databricks.com>
Date:   2016-01-06T06:16:16Z

    Update circle.yml

commit f5d9642859a2862abfff924e700fbacf52807004
Author: Michael Armbrust <mi...@databricks.com>
Date:   2016-01-06T06:17:53Z

    Update SparkBuild.scala

commit f8911e4d6a6d32044b55212ae187eb4251bdfe02
Author: Michael Armbrust <mi...@databricks.com>
Date:   2016-01-06T06:44:31Z

    Update circle.yml

commit 6387781c63597680e341244d6347e9760c56c808
Author: Josh Rosen <ro...@gmail.com>
Date:   2016-01-06T07:03:01Z

    Add newlines to satisfy Scalastyle

commit 6242bc2eaecae4645622604d6e0de9802b472715
Author: Michael Armbrust <mi...@databricks.com>
Date:   2016-01-06T20:01:45Z

    revert CI changes

commit 95fd97807968c2437f104dc3e4578abad14f9554
Author: Michael Armbrust <mi...@databricks.com>
Date:   2016-01-06T21:38:53Z

    update based on TD's comments

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-10820][SQL] Support for the continuous ...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on the pull request:

    https://github.com/apache/spark/pull/11006#issuecomment-178336884
  
    Test this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-10820][SQL] Support for the continuous ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11006#discussion_r51504974
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySourceStressSuite.scala ---
    @@ -0,0 +1,33 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.streaming
    +
    +import org.apache.spark.sql.StreamTest
    +import org.apache.spark.sql.execution.streaming._
    +import org.apache.spark.sql.test.SharedSQLContext
    +
    +class MemorySourceStressSuite extends StreamTest with SharedSQLContext {
    +  import testImplicits._
    +
    +  test("memory stress test") {
    +    val input = MemoryStream[Int]
    +    val mapped = input.toDS().map(_ + 1)
    +
    +    createStressTest(mapped, AddData(input, _: _*))
    --- End diff --
    
    nit: createStressTest --> runStressTest (because it does not just create the stress test, it actually runs it)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-10820][SQL] Support for the continuous ...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11006#discussion_r51501096
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala ---
    @@ -0,0 +1,346 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql
    +
    +import java.lang.Thread.UncaughtExceptionHandler
    +
    +import scala.collection.mutable
    +import scala.collection.mutable.ArrayBuffer
    +import scala.util.Random
    +
    +import org.scalatest.concurrent.Timeouts
    +import org.scalatest.time.SpanSugar._
    +
    +import org.apache.spark.sql.catalyst.encoders.{encoderFor, ExpressionEncoder, RowEncoder}
    +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
    +import org.apache.spark.sql.catalyst.util._
    +import org.apache.spark.sql.execution.streaming._
    +
    +/**
    + * A framework for implementing tests for streaming queries and sources.
    + *
    + * A test consists of a set of steps (expressed as a `StreamAction`) that are executed in order,
    + * blocking as necessary to let the stream catch up.  For example, the following adds some data to
    + * a stream, blocking until it can verify that the correct values are eventually produced.
    + *
    + * {{{
    + *  val inputData = MemoryStream[Int]
    +    val mapped = inputData.toDS().map(_ + 1)
    +
    +    testStream(mapped)(
    +      AddData(inputData, 1, 2, 3),
    +      CheckAnswer(2, 3, 4))
    + * }}}
    + *
    + * Note that while we do sleep to allow the other thread to progress without spinning,
    + * `StreamAction` checks should not depend on the amount of time spent sleeping.  Instead they
    + * should check the actual progress of the stream before verifying the required test condition.
    + *
    + * Currently it is assumed that all streaming queries will eventually complete in 10 seconds to
    + * avoid hanging forever in the case of failures. However, individual suites can change this
    + * by overriding `streamingTimeout`.
    + */
    +trait StreamTest extends QueryTest with Timeouts {
    +
    +  implicit class RichSource(s: Source) {
    +    def toDF(): DataFrame = new DataFrame(sqlContext, StreamingRelation(s))
    +  }
    +
    +  /** How long to wait for an active stream to catch up when checking a result. */
    +  val streamingTimout = 10.seconds
    +
    +  /** A trait for actions that can be performed while testing a streaming DataFrame. */
    +  trait StreamAction
    +
    +  /** A trait to mark actions that require the stream to be actively running. */
    +  trait StreamMustBeRunning
    +
    +  /**
    +   * Adds the given data to the stream.  Subsuquent check answers will block until this data has
    +   * been processed.
    +   */
    +  object AddData {
    +    def apply[A](source: MemoryStream[A], data: A*): AddDataMemory[A] =
    +      AddDataMemory(source, data)
    +  }
    +
    +  /** A trait that can be extended when testing other sources. */
    +  trait AddData extends StreamAction {
    +    def source: Source
    +
    +    /**
    +     * Called to trigger adding the data.  Should return the offset that will denote when this
    +     * new data has been processed.
    +     */
    +    def addData(): Offset
    +  }
    +
    +  case class AddDataMemory[A](source: MemoryStream[A], data: Seq[A]) extends AddData {
    +    override def toString: String = s"AddData to $source: ${data.mkString(",")}"
    +
    +    override def addData(): Offset = {
    +      source.addData(data)
    +    }
    +  }
    +
    +  /**
    +   * Checks to make sure that the current data stored in the sink matches the `expectedAnswer`.
    +   * This operation automatically blocks untill all added data has been processed.
    +   */
    +  object CheckAnswer {
    +    def apply[A : Encoder](data: A*): CheckAnswerRows = {
    +      val encoder = encoderFor[A]
    +      val toExternalRow = RowEncoder(encoder.schema)
    +      CheckAnswerRows(data.map(d => toExternalRow.fromRow(encoder.toRow(d))))
    +    }
    +
    +    def apply(rows: Row*): CheckAnswerRows = CheckAnswerRows(rows)
    +  }
    +
    +  case class CheckAnswerRows(expectedAnswer: Seq[Row])
    +      extends StreamAction with StreamMustBeRunning {
    +    override def toString: String = s"CheckAnswer: ${expectedAnswer.mkString(",")}"
    +  }
    +
    +  case class DropBatches(num: Int) extends StreamAction
    +
    +  /** Stops the stream.  It must currently be running. */
    +  case object StopStream extends StreamAction
    +
    +  /** Starts the stream, resuming if data has already been processed.  It must not be running. */
    +  case object StartStream extends StreamAction
    +
    +  /** Signals that a failure is expected and should not kill the test. */
    +  case object ExpectFailure extends StreamAction
    +
    +  /** A helper for running actions on a Streaming Dataset. See `checkAnswer(DataFrame)`. */
    +  def testStream(stream: Dataset[_])(actions: StreamAction*): Unit =
    +    testStream(stream.toDF())(actions: _*)
    +
    +  /**
    +   * Executes the specified actions on the the given streaming DataFrame and provides helpful
    +   * error messages in the case of failures or incorrect answers.
    +   *
    +   * Note that if the stream is not explictly started before an action that requires it to be
    +   * running then it will be automatically started before performing any other actions.
    +   */
    +  def testStream(stream: DataFrame)(actions: StreamAction*): Unit = {
    +    var pos = 0
    +    var currentPlan: LogicalPlan = stream.logicalPlan
    +    var currentStream: StreamExecution = null
    +    val awaiting = new mutable.HashMap[Source, Offset]()
    +    val sink = new MemorySink(stream.schema)
    +
    +    @volatile
    +    var streamDeathCause: Throwable = null
    +
    +    // If the test doesn't manually start the stream, we do it automatically at the beginning.
    +    val startedManually =
    +      actions.takeWhile(_.isInstanceOf[StreamMustBeRunning]).contains(StartStream)
    --- End diff --
    
    Good catch!  I was missing a `!`.  Fixed and made sure that this is tested (by manually starting one of the unit tests).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-10820][SQL] Support for the continuous ...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11006#discussion_r51476807
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala ---
    @@ -0,0 +1,213 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming
    +
    +import java.lang.Thread.UncaughtExceptionHandler
    +
    +import scala.collection.mutable.ArrayBuffer
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.sql.{ContinuousQuery, DataFrame, SQLContext}
    +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap}
    +import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
    +import org.apache.spark.sql.catalyst.util._
    +import org.apache.spark.sql.execution.QueryExecution
    +
    +/**
    + * Manages the execution of a streaming Spark SQL query that is occurring in a separate thread.
    + * Unlike a standard query, a streaming query executes repeatedly each time new data arrives at any
    + * [[Source]] present in the query plan. Whenever new data arrives, a [[QueryExecution]] is created
    + * and the results are committed transactionally to the given [[Sink]].
    + */
    +class StreamExecution(
    +    sqlContext: SQLContext,
    +    private[sql] val logicalPlan: LogicalPlan,
    +    val sink: Sink) extends ContinuousQuery with Logging {
    +
    +  /** An monitor used to wait/notify when batches complete. */
    +  val awaitBatchLock = new Object
    --- End diff --
    
    private


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-10820][SQL] Support for the continuous ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/11006#issuecomment-178419558
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-10820][SQL] Support for the continuous ...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11006#discussion_r51476822
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala ---
    @@ -0,0 +1,213 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming
    +
    +import java.lang.Thread.UncaughtExceptionHandler
    +
    +import scala.collection.mutable.ArrayBuffer
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.sql.{ContinuousQuery, DataFrame, SQLContext}
    +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap}
    +import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
    +import org.apache.spark.sql.catalyst.util._
    +import org.apache.spark.sql.execution.QueryExecution
    +
    +/**
    + * Manages the execution of a streaming Spark SQL query that is occurring in a separate thread.
    + * Unlike a standard query, a streaming query executes repeatedly each time new data arrives at any
    + * [[Source]] present in the query plan. Whenever new data arrives, a [[QueryExecution]] is created
    + * and the results are committed transactionally to the given [[Sink]].
    + */
    +class StreamExecution(
    +    sqlContext: SQLContext,
    +    private[sql] val logicalPlan: LogicalPlan,
    +    val sink: Sink) extends ContinuousQuery with Logging {
    +
    +  /** An monitor used to wait/notify when batches complete. */
    +  val awaitBatchLock = new Object
    +
    +  @volatile
    +  var batchRun = false
    +
    +  /** Minimum amount of time in between the start of each batch. */
    +  val minBatchTime = 10
    --- End diff --
    
    private


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-10820][SQL] Support for the continuous ...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11006#discussion_r51514134
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala ---
    @@ -0,0 +1,211 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming
    +
    +import java.lang.Thread.UncaughtExceptionHandler
    +
    +import scala.collection.mutable.ArrayBuffer
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.sql.{ContinuousQuery, DataFrame, SQLContext}
    +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap}
    +import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
    +import org.apache.spark.sql.catalyst.util._
    +import org.apache.spark.sql.execution.QueryExecution
    +
    +/**
    + * Manages the execution of a streaming Spark SQL query that is occurring in a separate thread.
    + * Unlike a standard query, a streaming query executes repeatedly each time new data arrives at any
    + * [[Source]] present in the query plan. Whenever new data arrives, a [[QueryExecution]] is created
    + * and the results are committed transactionally to the given [[Sink]].
    + */
    +class StreamExecution(
    +    sqlContext: SQLContext,
    +    private[sql] val logicalPlan: LogicalPlan,
    +    val sink: Sink) extends ContinuousQuery with Logging {
    +
    +  /** An monitor used to wait/notify when batches complete. */
    +  private val awaitBatchLock = new Object
    +
    +  @volatile
    +  private var batchRun = false
    +
    +  /** Minimum amount of time in between the start of each batch. */
    +  private val minBatchTime = 10
    +
    +  /** Tracks how much data we have processed from each input source. */
    +  private[sql] val currentOffsets = new StreamProgress
    +
    +  /** All stream sources present the query plan. */
    +  private val sources =
    +    logicalPlan.collect { case s: StreamingRelation => s.source }
    +
    +  // Start the execution at the current offsets stored in the sink. (i.e. avoid reprocessing data
    +  // that we have already processed).
    +  {
    +    sink.currentOffset match {
    +      case Some(c: CompositeOffset) =>
    +        val storedProgress = c.offsets
    +        val sources = logicalPlan collect {
    +          case StreamingRelation(source, _) => source
    +        }
    +
    +        assert(sources.size == storedProgress.size)
    +        sources.zip(storedProgress).foreach { case (source, offset) =>
    +          offset.foreach(currentOffsets.update(source, _))
    +        }
    +      case None => // We are starting this stream for the first time.
    +      case _ => throw new IllegalArgumentException("Expected composite offset from sink")
    +    }
    +  }
    +
    +  logInfo(s"Stream running at $currentOffsets")
    +
    +  /** When false, signals to the microBatchThread that it should stop running. */
    +  @volatile private var shouldRun = true
    +
    +  /** The thread that runs the micro-batches of this stream. */
    +  private[sql] val microBatchThread = new Thread("stream execution thread") {
    +    override def run(): Unit = {
    +      SQLContext.setActive(sqlContext)
    +      while (shouldRun) {
    +        attemptBatch()
    +        Thread.sleep(minBatchTime) // TODO: Could be tighter
    +      }
    +    }
    +  }
    +  microBatchThread.setDaemon(true)
    +  microBatchThread.setUncaughtExceptionHandler(
    +    new UncaughtExceptionHandler {
    +      override def uncaughtException(t: Thread, e: Throwable): Unit = {
    +        streamDeathCause = e
    +      }
    +    })
    +  microBatchThread.start()
    +
    +  @volatile
    +  private[sql] var lastExecution: QueryExecution = null
    +  @volatile
    +  private[sql] var streamDeathCause: Throwable = null
    +
    +  /**
    +   * Checks to see if any new data is present in any of the sources.  When new data is available,
    +   * a batch is executed and passed to the sink, updating the currentOffsets.
    +   */
    +  private def attemptBatch(): Unit = {
    +    val startTime = System.nanoTime()
    +
    +    // A list of offsets that need to be updated if this batch is successful.
    +    // Populated while walking the tree.
    +    val newOffsets = new ArrayBuffer[(Source, Offset)]
    +    // A list of attributes that will need to be updated.
    +    var replacements = new ArrayBuffer[(Attribute, Attribute)]
    +    // Replace sources in the logical plan with data that has arrived since the last batch.
    +    val withNewSources = logicalPlan transform {
    +      case StreamingRelation(source, output) =>
    +        val prevOffset = currentOffsets.get(source)
    +        val newBatch = source.getNextBatch(prevOffset)
    +
    +        newBatch.map { batch =>
    +          newOffsets += ((source, batch.end))
    +          val newPlan = batch.data.logicalPlan
    +
    +          assert(output.size == newPlan.output.size)
    +          replacements ++= output.zip(newPlan.output)
    +          newPlan
    +        }.getOrElse {
    +          LocalRelation(output)
    +        }
    +    }
    +
    +    // Rewire the plan to use the new attributes that were returned by the source.
    +    val replacementMap = AttributeMap(replacements)
    +    val newPlan = withNewSources transformAllExpressions {
    +      case a: Attribute if replacementMap.contains(a) => replacementMap(a)
    +    }
    +
    +    if (newOffsets.nonEmpty) {
    +      val optimizerStart = System.nanoTime()
    +
    +      lastExecution = new QueryExecution(sqlContext, newPlan)
    +      val executedPlan = lastExecution.executedPlan
    +      val optimizerTime = (System.nanoTime() - optimizerStart).toDouble / 1000000
    +      logDebug(s"Optimized batch in ${optimizerTime}ms")
    +
    +      StreamExecution.this.synchronized {
    --- End diff --
    
    Changed to lock on the internal `streamProgress`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-10820][SQL] Support for the continuous ...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11006#discussion_r51480355
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamProgress.scala ---
    @@ -0,0 +1,71 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming
    +
    +import scala.collection.mutable
    +
    +/**
    + * A helper class that looks like a Map[Source, Offset].
    + */
    +class StreamProgress extends Serializable {
    +  private val currentOffsets = new mutable.HashMap[Source, Offset]
    --- End diff --
    
    @transient since Source is not Serializable.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-10820][SQL] Support for the continuous ...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11006#discussion_r51507290
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/DataStreamWriter.scala ---
    @@ -0,0 +1,133 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql
    +
    +import scala.collection.JavaConverters._
    +
    +import org.apache.spark.annotation.Experimental
    +import org.apache.spark.sql.execution.datasources.ResolvedDataSource
    +import org.apache.spark.sql.execution.streaming.StreamExecution
    +
    +/**
    + * :: Experimental ::
    + * Interface used to start a streaming query query execution.
    + *
    + * @since 2.0.0
    + */
    +@Experimental
    +final class DataStreamWriter private[sql](df: DataFrame) {
    +
    +  /**
    +   * Specifies the underlying output data source. Built-in options include "parquet", "json", etc.
    +   *
    +   * @since 2.0.0
    +   */
    +  def format(source: String): DataStreamWriter = {
    +    this.source = source
    +    this
    +  }
    +
    +  /**
    +   * Adds an output option for the underlying data source.
    +   *
    +   * @since 2.0.0
    +   */
    +  def option(key: String, value: String): DataStreamWriter = {
    +    this.extraOptions += (key -> value)
    +    this
    +  }
    +
    +  /**
    +   * (Scala-specific) Adds output options for the underlying data source.
    +   *
    +   * @since 2.0.0
    +   */
    +  def options(options: scala.collection.Map[String, String]): DataStreamWriter = {
    +    this.extraOptions ++= options
    +    this
    +  }
    +
    +  /**
    +   * Adds output options for the underlying data source.
    +   *
    +   * @since 2.0.0
    +   */
    +  def options(options: java.util.Map[String, String]): DataStreamWriter = {
    +    this.options(options.asScala)
    +    this
    +  }
    +
    +  /**
    +   * Partitions the output by the given columns on the file system. If specified, the output is
    +   * laid out on the file system similar to Hive's partitioning scheme.\
    +   * @since 2.0.0
    +   */
    +  @scala.annotation.varargs
    +  def partitionBy(colNames: String*): DataStreamWriter = {
    --- End diff --
    
    It should be tested though.  I would add it in this round since its going to affect the API of the StreamSinkProvider.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-10820][SQL] Support for the continuous ...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11006#discussion_r51480080
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamProgress.scala ---
    @@ -0,0 +1,71 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming
    +
    +import scala.collection.mutable
    +
    +/**
    + * A helper class that looks like a Map[Source, Offset].
    + */
    +class StreamProgress extends Serializable {
    +  private val currentOffsets = new mutable.HashMap[Source, Offset]
    +    with mutable.SynchronizedMap[Source, Offset]
    +
    +  private[streaming] def update(source: Source, newOffset: Offset): Unit = {
    +    currentOffsets.get(source).foreach(old =>
    +      assert(newOffset > old, s"Stream going backwards $newOffset -> $old"))
    +    currentOffsets.put(source, newOffset)
    +  }
    +
    +  private[streaming] def update(newOffset: (Source, Offset)): Unit =
    +    update(newOffset._1, newOffset._2)
    +
    +  private[streaming] def apply(source: Source): Offset = currentOffsets(source)
    +  private[streaming] def get(source: Source): Option[Offset] = currentOffsets.get(source)
    +  private[streaming] def contains(source: Source): Boolean = currentOffsets.contains(source)
    +
    +  private[streaming] def ++(updates: Map[Source, Offset]): StreamProgress = {
    +    val updated = new StreamProgress
    +    currentOffsets.foreach(updated.update)
    +    updates.foreach(updated.update)
    +    updated
    +  }
    +
    +  /**
    +   * Used to create a new copy of this [[StreamProgress]]. While this class is currently mutable,
    +   * it should be copied before being passed to user code.
    +   */
    +  private[streaming] def copy(): StreamProgress = {
    +    val copied = new StreamProgress
    +    currentOffsets.foreach(copied.update)
    +    copied
    +  }
    +
    +  override def toString: String =
    +    currentOffsets.map { case (k, v) => s"$k: $v"}.mkString("{", ",", "}")
    +
    +  override def equals(other: Any): Boolean = other match {
    +    case s: StreamProgress =>
    +      s.currentOffsets.keys.toSet == currentOffsets.keys.toSet &&
    --- End diff --
    
    `LongOffset` and `CompositeOffset` don't implement `equals` and `hashCode`, `StreamProgress`'s `equals` and `hashCode` doesn't work.
    
    In addition, why not just call `currentOffsets.map(_._1.toString) == s. currentOffsets.map(_._1.toString)` for `equals`, and ``currentOffsets.map(_._1.toString).hashCode` for `hashCode`? `mutable.HashMap` should already do that correctly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-10820][SQL] Support for the continuous ...

Posted by tedyu <gi...@git.apache.org>.
Github user tedyu commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11006#discussion_r51654640
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala ---
    @@ -0,0 +1,211 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming
    +
    +import java.lang.Thread.UncaughtExceptionHandler
    +
    +import scala.collection.mutable.ArrayBuffer
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.sql.{ContinuousQuery, DataFrame, SQLContext}
    +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap}
    +import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
    +import org.apache.spark.sql.catalyst.util._
    +import org.apache.spark.sql.execution.QueryExecution
    +
    +/**
    + * Manages the execution of a streaming Spark SQL query that is occurring in a separate thread.
    + * Unlike a standard query, a streaming query executes repeatedly each time new data arrives at any
    + * [[Source]] present in the query plan. Whenever new data arrives, a [[QueryExecution]] is created
    + * and the results are committed transactionally to the given [[Sink]].
    + */
    +class StreamExecution(
    +    sqlContext: SQLContext,
    +    private[sql] val logicalPlan: LogicalPlan,
    +    val sink: Sink) extends ContinuousQuery with Logging {
    +
    +  /** An monitor used to wait/notify when batches complete. */
    +  private val awaitBatchLock = new Object
    +
    +  @volatile
    +  private var batchRun = false
    +
    +  /** Minimum amount of time in between the start of each batch. */
    +  private val minBatchTime = 10
    +
    +  /** Tracks how much data we have processed from each input source. */
    +  private[sql] val streamProgress = new StreamProgress
    +
    +  /** All stream sources present the query plan. */
    +  private val sources =
    +    logicalPlan.collect { case s: StreamingRelation => s.source }
    +
    +  // Start the execution at the current offsets stored in the sink. (i.e. avoid reprocessing data
    +  // that we have already processed).
    +  {
    +    sink.currentOffset match {
    +      case Some(c: CompositeOffset) =>
    +        val storedProgress = c.offsets
    +        val sources = logicalPlan collect {
    +          case StreamingRelation(source, _) => source
    +        }
    +
    +        assert(sources.size == storedProgress.size)
    +        sources.zip(storedProgress).foreach { case (source, offset) =>
    +          offset.foreach(streamProgress.update(source, _))
    +        }
    +      case None => // We are starting this stream for the first time.
    +      case _ => throw new IllegalArgumentException("Expected composite offset from sink")
    +    }
    +  }
    +
    +  logInfo(s"Stream running at $streamProgress")
    +
    +  /** When false, signals to the microBatchThread that it should stop running. */
    +  @volatile private var shouldRun = true
    +
    +  /** The thread that runs the micro-batches of this stream. */
    +  private[sql] val microBatchThread = new Thread("stream execution thread") {
    +    override def run(): Unit = {
    +      SQLContext.setActive(sqlContext)
    +      while (shouldRun) {
    +        attemptBatch()
    --- End diff --
    
    Should this be wrapped in try block ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-10820][SQL] Support for the continuous ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/11006#issuecomment-178190775
  
    **[Test build #50498 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50498/consoleFull)** for PR 11006 at commit [`7147b3f`](https://github.com/apache/spark/commit/7147b3f8b25e73102a8ba98d81c175acd74f49ca).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-10820][SQL] Support for the continuous ...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11006#discussion_r51481347
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamProgress.scala ---
    @@ -0,0 +1,71 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming
    +
    +import scala.collection.mutable
    +
    +/**
    + * A helper class that looks like a Map[Source, Offset].
    + */
    +class StreamProgress extends Serializable {
    +  private val currentOffsets = new mutable.HashMap[Source, Offset]
    --- End diff --
    
    I think we should not make `StreamProgress` extend `Serializable` since we cannot recover it using Java serialization.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-10820][SQL] Support for the continuous ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11006#discussion_r51661213
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala ---
    @@ -0,0 +1,211 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming
    +
    +import java.lang.Thread.UncaughtExceptionHandler
    +
    +import scala.collection.mutable.ArrayBuffer
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.sql.{ContinuousQuery, DataFrame, SQLContext}
    +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap}
    +import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
    +import org.apache.spark.sql.catalyst.util._
    +import org.apache.spark.sql.execution.QueryExecution
    +
    +/**
    + * Manages the execution of a streaming Spark SQL query that is occurring in a separate thread.
    + * Unlike a standard query, a streaming query executes repeatedly each time new data arrives at any
    + * [[Source]] present in the query plan. Whenever new data arrives, a [[QueryExecution]] is created
    + * and the results are committed transactionally to the given [[Sink]].
    + */
    +class StreamExecution(
    +    sqlContext: SQLContext,
    +    private[sql] val logicalPlan: LogicalPlan,
    +    val sink: Sink) extends ContinuousQuery with Logging {
    +
    +  /** An monitor used to wait/notify when batches complete. */
    +  private val awaitBatchLock = new Object
    +
    +  @volatile
    +  private var batchRun = false
    +
    +  /** Minimum amount of time in between the start of each batch. */
    +  private val minBatchTime = 10
    +
    +  /** Tracks how much data we have processed from each input source. */
    +  private[sql] val streamProgress = new StreamProgress
    +
    +  /** All stream sources present the query plan. */
    +  private val sources =
    +    logicalPlan.collect { case s: StreamingRelation => s.source }
    +
    +  // Start the execution at the current offsets stored in the sink. (i.e. avoid reprocessing data
    +  // that we have already processed).
    +  {
    +    sink.currentOffset match {
    +      case Some(c: CompositeOffset) =>
    +        val storedProgress = c.offsets
    +        val sources = logicalPlan collect {
    +          case StreamingRelation(source, _) => source
    +        }
    +
    +        assert(sources.size == storedProgress.size)
    +        sources.zip(storedProgress).foreach { case (source, offset) =>
    +          offset.foreach(streamProgress.update(source, _))
    +        }
    +      case None => // We are starting this stream for the first time.
    +      case _ => throw new IllegalArgumentException("Expected composite offset from sink")
    +    }
    +  }
    +
    +  logInfo(s"Stream running at $streamProgress")
    +
    +  /** When false, signals to the microBatchThread that it should stop running. */
    +  @volatile private var shouldRun = true
    +
    +  /** The thread that runs the micro-batches of this stream. */
    +  private[sql] val microBatchThread = new Thread("stream execution thread") {
    +    override def run(): Unit = {
    +      SQLContext.setActive(sqlContext)
    +      while (shouldRun) {
    +        attemptBatch()
    --- End diff --
    
    There is already a uncaughtexception handler for this thread.
    
    On Tue, Feb 2, 2016 at 3:29 PM, tedyu <no...@github.com> wrote:
    
    > In
    > sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
    > <https://github.com/apache/spark/pull/11006#discussion_r51654640>:
    >
    > > +      case None => // We are starting this stream for the first time.
    > > +      case _ => throw new IllegalArgumentException("Expected composite offset from sink")
    > > +    }
    > > +  }
    > > +
    > > +  logInfo(s"Stream running at $streamProgress")
    > > +
    > > +  /** When false, signals to the microBatchThread that it should stop running. */
    > > +  @volatile private var shouldRun = true
    > > +
    > > +  /** The thread that runs the micro-batches of this stream. */
    > > +  private[sql] val microBatchThread = new Thread("stream execution thread") {
    > > +    override def run(): Unit = {
    > > +      SQLContext.setActive(sqlContext)
    > > +      while (shouldRun) {
    > > +        attemptBatch()
    >
    > Should this be wrapped in try block ?
    >
    > —
    > Reply to this email directly or view it on GitHub
    > <https://github.com/apache/spark/pull/11006/files#r51654640>.
    >



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-10820][SQL] Support for the continuous ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/11006#issuecomment-178321052
  
    **[Test build #50527 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50527/consoleFull)** for PR 11006 at commit [`735e113`](https://github.com/apache/spark/commit/735e113dfd0a2eda60f01d8974c6f00df34c415f).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-10820][SQL] Support for the continuous ...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11006#discussion_r51501233
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala ---
    @@ -0,0 +1,346 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql
    +
    +import java.lang.Thread.UncaughtExceptionHandler
    +
    +import scala.collection.mutable
    +import scala.collection.mutable.ArrayBuffer
    +import scala.util.Random
    +
    +import org.scalatest.concurrent.Timeouts
    +import org.scalatest.time.SpanSugar._
    +
    +import org.apache.spark.sql.catalyst.encoders.{encoderFor, ExpressionEncoder, RowEncoder}
    +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
    +import org.apache.spark.sql.catalyst.util._
    +import org.apache.spark.sql.execution.streaming._
    +
    +/**
    + * A framework for implementing tests for streaming queries and sources.
    + *
    + * A test consists of a set of steps (expressed as a `StreamAction`) that are executed in order,
    + * blocking as necessary to let the stream catch up.  For example, the following adds some data to
    + * a stream, blocking until it can verify that the correct values are eventually produced.
    + *
    + * {{{
    + *  val inputData = MemoryStream[Int]
    +    val mapped = inputData.toDS().map(_ + 1)
    +
    +    testStream(mapped)(
    +      AddData(inputData, 1, 2, 3),
    +      CheckAnswer(2, 3, 4))
    + * }}}
    + *
    + * Note that while we do sleep to allow the other thread to progress without spinning,
    + * `StreamAction` checks should not depend on the amount of time spent sleeping.  Instead they
    + * should check the actual progress of the stream before verifying the required test condition.
    + *
    + * Currently it is assumed that all streaming queries will eventually complete in 10 seconds to
    + * avoid hanging forever in the case of failures. However, individual suites can change this
    + * by overriding `streamingTimeout`.
    + */
    +trait StreamTest extends QueryTest with Timeouts {
    +
    +  implicit class RichSource(s: Source) {
    +    def toDF(): DataFrame = new DataFrame(sqlContext, StreamingRelation(s))
    +  }
    +
    +  /** How long to wait for an active stream to catch up when checking a result. */
    +  val streamingTimout = 10.seconds
    +
    +  /** A trait for actions that can be performed while testing a streaming DataFrame. */
    +  trait StreamAction
    +
    +  /** A trait to mark actions that require the stream to be actively running. */
    +  trait StreamMustBeRunning
    +
    +  /**
    +   * Adds the given data to the stream.  Subsuquent check answers will block until this data has
    +   * been processed.
    +   */
    +  object AddData {
    +    def apply[A](source: MemoryStream[A], data: A*): AddDataMemory[A] =
    +      AddDataMemory(source, data)
    +  }
    +
    +  /** A trait that can be extended when testing other sources. */
    +  trait AddData extends StreamAction {
    +    def source: Source
    +
    +    /**
    +     * Called to trigger adding the data.  Should return the offset that will denote when this
    +     * new data has been processed.
    +     */
    +    def addData(): Offset
    +  }
    +
    +  case class AddDataMemory[A](source: MemoryStream[A], data: Seq[A]) extends AddData {
    +    override def toString: String = s"AddData to $source: ${data.mkString(",")}"
    +
    +    override def addData(): Offset = {
    +      source.addData(data)
    +    }
    +  }
    +
    +  /**
    +   * Checks to make sure that the current data stored in the sink matches the `expectedAnswer`.
    +   * This operation automatically blocks untill all added data has been processed.
    +   */
    +  object CheckAnswer {
    +    def apply[A : Encoder](data: A*): CheckAnswerRows = {
    +      val encoder = encoderFor[A]
    +      val toExternalRow = RowEncoder(encoder.schema)
    +      CheckAnswerRows(data.map(d => toExternalRow.fromRow(encoder.toRow(d))))
    +    }
    +
    +    def apply(rows: Row*): CheckAnswerRows = CheckAnswerRows(rows)
    +  }
    +
    +  case class CheckAnswerRows(expectedAnswer: Seq[Row])
    +      extends StreamAction with StreamMustBeRunning {
    +    override def toString: String = s"CheckAnswer: ${expectedAnswer.mkString(",")}"
    +  }
    +
    +  case class DropBatches(num: Int) extends StreamAction
    +
    +  /** Stops the stream.  It must currently be running. */
    +  case object StopStream extends StreamAction
    +
    +  /** Starts the stream, resuming if data has already been processed.  It must not be running. */
    +  case object StartStream extends StreamAction
    +
    +  /** Signals that a failure is expected and should not kill the test. */
    +  case object ExpectFailure extends StreamAction
    +
    +  /** A helper for running actions on a Streaming Dataset. See `checkAnswer(DataFrame)`. */
    +  def testStream(stream: Dataset[_])(actions: StreamAction*): Unit =
    +    testStream(stream.toDF())(actions: _*)
    +
    +  /**
    +   * Executes the specified actions on the the given streaming DataFrame and provides helpful
    +   * error messages in the case of failures or incorrect answers.
    +   *
    +   * Note that if the stream is not explictly started before an action that requires it to be
    +   * running then it will be automatically started before performing any other actions.
    +   */
    +  def testStream(stream: DataFrame)(actions: StreamAction*): Unit = {
    +    var pos = 0
    +    var currentPlan: LogicalPlan = stream.logicalPlan
    +    var currentStream: StreamExecution = null
    +    val awaiting = new mutable.HashMap[Source, Offset]()
    +    val sink = new MemorySink(stream.schema)
    +
    +    @volatile
    +    var streamDeathCause: Throwable = null
    +
    +    // If the test doesn't manually start the stream, we do it automatically at the beginning.
    +    val startedManually =
    +      actions.takeWhile(_.isInstanceOf[StreamMustBeRunning]).contains(StartStream)
    +    val startedTest = if (startedManually) actions else StartStream +: actions
    +
    +    def testActions = actions.zipWithIndex.map {
    +      case (a, i) =>
    +        if ((pos == i && startedManually) || (pos == (i + 1) && !startedManually)) {
    +          "=> " + a.toString
    +        } else {
    +          "   " + a.toString
    +        }
    +    }.mkString("\n")
    +
    +    def currentOffsets =
    +      if (currentStream != null) currentStream.currentOffsets.toString else "not started"
    +
    +    def threadState =
    +      if (currentStream != null && currentStream.microBatchThread.isAlive) "alive" else "dead"
    +    def testState =
    +      s"""
    +         |== Progress ==
    +         |$testActions
    +         |
    +         |== Stream ==
    +         |Stream state: $currentOffsets
    +         |Thread state: $threadState
    +         |${if (streamDeathCause != null) stackTraceToString(streamDeathCause) else ""}
    +         |
    +         |== Sink ==
    +         |$sink
    +         |
    +         |== Plan ==
    +         |${if (currentStream != null) currentStream.lastExecution else ""}
    +         """
    +
    +    def checkState(check: Boolean, error: String) = if (!check) {
    +      fail(
    +        s"""
    +           |Invalid State: $error
    +           |$testState
    +         """.stripMargin)
    +    }
    +
    +    val testThread = Thread.currentThread()
    +
    +    try {
    +      startedTest.foreach { action =>
    +        action match {
    +          case StartStream =>
    +            checkState(currentStream == null, "stream already running")
    +
    +            currentStream = new StreamExecution(sqlContext, stream.logicalPlan, sink)
    +            currentStream.microBatchThread.setUncaughtExceptionHandler(
    --- End diff --
    
    I think you are right, but I would probably defer this to the management API that TD is working on.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-10820][SQL] Support for the continuous ...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on the pull request:

    https://github.com/apache/spark/pull/11006#issuecomment-178205092
  
    Looks great! Just some minor comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-10820][SQL] Support for the continuous ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/11006#issuecomment-178146931
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/50497/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-10820][SQL] Support for the continuous ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/11006#issuecomment-178335843
  
    **[Test build #50529 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50529/consoleFull)** for PR 11006 at commit [`735e113`](https://github.com/apache/spark/commit/735e113dfd0a2eda60f01d8974c6f00df34c415f).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-10820][SQL] Support for the continuous ...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11006#discussion_r51483011
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala ---
    @@ -0,0 +1,346 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql
    +
    +import java.lang.Thread.UncaughtExceptionHandler
    +
    +import scala.collection.mutable
    +import scala.collection.mutable.ArrayBuffer
    +import scala.util.Random
    +
    +import org.scalatest.concurrent.Timeouts
    +import org.scalatest.time.SpanSugar._
    +
    +import org.apache.spark.sql.catalyst.encoders.{encoderFor, ExpressionEncoder, RowEncoder}
    +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
    +import org.apache.spark.sql.catalyst.util._
    +import org.apache.spark.sql.execution.streaming._
    +
    +/**
    + * A framework for implementing tests for streaming queries and sources.
    + *
    + * A test consists of a set of steps (expressed as a `StreamAction`) that are executed in order,
    + * blocking as necessary to let the stream catch up.  For example, the following adds some data to
    + * a stream, blocking until it can verify that the correct values are eventually produced.
    + *
    + * {{{
    + *  val inputData = MemoryStream[Int]
    +    val mapped = inputData.toDS().map(_ + 1)
    +
    +    testStream(mapped)(
    +      AddData(inputData, 1, 2, 3),
    +      CheckAnswer(2, 3, 4))
    + * }}}
    + *
    + * Note that while we do sleep to allow the other thread to progress without spinning,
    + * `StreamAction` checks should not depend on the amount of time spent sleeping.  Instead they
    + * should check the actual progress of the stream before verifying the required test condition.
    + *
    + * Currently it is assumed that all streaming queries will eventually complete in 10 seconds to
    + * avoid hanging forever in the case of failures. However, individual suites can change this
    + * by overriding `streamingTimeout`.
    + */
    +trait StreamTest extends QueryTest with Timeouts {
    +
    +  implicit class RichSource(s: Source) {
    +    def toDF(): DataFrame = new DataFrame(sqlContext, StreamingRelation(s))
    +  }
    +
    +  /** How long to wait for an active stream to catch up when checking a result. */
    +  val streamingTimout = 10.seconds
    +
    +  /** A trait for actions that can be performed while testing a streaming DataFrame. */
    +  trait StreamAction
    +
    +  /** A trait to mark actions that require the stream to be actively running. */
    +  trait StreamMustBeRunning
    +
    +  /**
    +   * Adds the given data to the stream.  Subsuquent check answers will block until this data has
    +   * been processed.
    +   */
    +  object AddData {
    +    def apply[A](source: MemoryStream[A], data: A*): AddDataMemory[A] =
    +      AddDataMemory(source, data)
    +  }
    +
    +  /** A trait that can be extended when testing other sources. */
    +  trait AddData extends StreamAction {
    +    def source: Source
    +
    +    /**
    +     * Called to trigger adding the data.  Should return the offset that will denote when this
    +     * new data has been processed.
    +     */
    +    def addData(): Offset
    +  }
    +
    +  case class AddDataMemory[A](source: MemoryStream[A], data: Seq[A]) extends AddData {
    +    override def toString: String = s"AddData to $source: ${data.mkString(",")}"
    +
    +    override def addData(): Offset = {
    +      source.addData(data)
    +    }
    +  }
    +
    +  /**
    +   * Checks to make sure that the current data stored in the sink matches the `expectedAnswer`.
    +   * This operation automatically blocks untill all added data has been processed.
    +   */
    +  object CheckAnswer {
    +    def apply[A : Encoder](data: A*): CheckAnswerRows = {
    +      val encoder = encoderFor[A]
    +      val toExternalRow = RowEncoder(encoder.schema)
    +      CheckAnswerRows(data.map(d => toExternalRow.fromRow(encoder.toRow(d))))
    +    }
    +
    +    def apply(rows: Row*): CheckAnswerRows = CheckAnswerRows(rows)
    +  }
    +
    +  case class CheckAnswerRows(expectedAnswer: Seq[Row])
    +      extends StreamAction with StreamMustBeRunning {
    +    override def toString: String = s"CheckAnswer: ${expectedAnswer.mkString(",")}"
    +  }
    +
    +  case class DropBatches(num: Int) extends StreamAction
    +
    +  /** Stops the stream.  It must currently be running. */
    +  case object StopStream extends StreamAction
    +
    +  /** Starts the stream, resuming if data has already been processed.  It must not be running. */
    +  case object StartStream extends StreamAction
    +
    +  /** Signals that a failure is expected and should not kill the test. */
    +  case object ExpectFailure extends StreamAction
    +
    +  /** A helper for running actions on a Streaming Dataset. See `checkAnswer(DataFrame)`. */
    +  def testStream(stream: Dataset[_])(actions: StreamAction*): Unit =
    +    testStream(stream.toDF())(actions: _*)
    +
    +  /**
    +   * Executes the specified actions on the the given streaming DataFrame and provides helpful
    +   * error messages in the case of failures or incorrect answers.
    +   *
    +   * Note that if the stream is not explictly started before an action that requires it to be
    +   * running then it will be automatically started before performing any other actions.
    +   */
    +  def testStream(stream: DataFrame)(actions: StreamAction*): Unit = {
    +    var pos = 0
    +    var currentPlan: LogicalPlan = stream.logicalPlan
    +    var currentStream: StreamExecution = null
    +    val awaiting = new mutable.HashMap[Source, Offset]()
    +    val sink = new MemorySink(stream.schema)
    +
    +    @volatile
    +    var streamDeathCause: Throwable = null
    +
    +    // If the test doesn't manually start the stream, we do it automatically at the beginning.
    +    val startedManually =
    +      actions.takeWhile(_.isInstanceOf[StreamMustBeRunning]).contains(StartStream)
    --- End diff --
    
    Looks `startedManually` is always `false`, since `StartStream` is not `StreamMustBeRunning`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-10820][SQL] Support for the continuous ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/11006#issuecomment-178151805
  
    **[Test build #50498 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50498/consoleFull)** for PR 11006 at commit [`7147b3f`](https://github.com/apache/spark/commit/7147b3f8b25e73102a8ba98d81c175acd74f49ca).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-10820][SQL] Support for the continuous ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/11006#issuecomment-178418795
  
    **[Test build #50540 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50540/consoleFull)** for PR 11006 at commit [`735e113`](https://github.com/apache/spark/commit/735e113dfd0a2eda60f01d8974c6f00df34c415f).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-10820][SQL] Support for the continuous ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11006#discussion_r51504800
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala ---
    @@ -0,0 +1,211 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming
    +
    +import java.lang.Thread.UncaughtExceptionHandler
    +
    +import scala.collection.mutable.ArrayBuffer
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.sql.{ContinuousQuery, DataFrame, SQLContext}
    +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap}
    +import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
    +import org.apache.spark.sql.catalyst.util._
    +import org.apache.spark.sql.execution.QueryExecution
    +
    +/**
    + * Manages the execution of a streaming Spark SQL query that is occurring in a separate thread.
    + * Unlike a standard query, a streaming query executes repeatedly each time new data arrives at any
    + * [[Source]] present in the query plan. Whenever new data arrives, a [[QueryExecution]] is created
    + * and the results are committed transactionally to the given [[Sink]].
    + */
    +class StreamExecution(
    +    sqlContext: SQLContext,
    +    private[sql] val logicalPlan: LogicalPlan,
    +    val sink: Sink) extends ContinuousQuery with Logging {
    +
    +  /** An monitor used to wait/notify when batches complete. */
    +  private val awaitBatchLock = new Object
    +
    +  @volatile
    +  private var batchRun = false
    +
    +  /** Minimum amount of time in between the start of each batch. */
    +  private val minBatchTime = 10
    +
    +  /** Tracks how much data we have processed from each input source. */
    +  private[sql] val currentOffsets = new StreamProgress
    +
    +  /** All stream sources present the query plan. */
    +  private val sources =
    +    logicalPlan.collect { case s: StreamingRelation => s.source }
    +
    +  // Start the execution at the current offsets stored in the sink. (i.e. avoid reprocessing data
    +  // that we have already processed).
    +  {
    +    sink.currentOffset match {
    +      case Some(c: CompositeOffset) =>
    +        val storedProgress = c.offsets
    +        val sources = logicalPlan collect {
    +          case StreamingRelation(source, _) => source
    +        }
    +
    +        assert(sources.size == storedProgress.size)
    +        sources.zip(storedProgress).foreach { case (source, offset) =>
    +          offset.foreach(currentOffsets.update(source, _))
    +        }
    +      case None => // We are starting this stream for the first time.
    +      case _ => throw new IllegalArgumentException("Expected composite offset from sink")
    +    }
    +  }
    +
    +  logInfo(s"Stream running at $currentOffsets")
    +
    +  /** When false, signals to the microBatchThread that it should stop running. */
    +  @volatile private var shouldRun = true
    +
    +  /** The thread that runs the micro-batches of this stream. */
    +  private[sql] val microBatchThread = new Thread("stream execution thread") {
    +    override def run(): Unit = {
    +      SQLContext.setActive(sqlContext)
    +      while (shouldRun) {
    +        attemptBatch()
    +        Thread.sleep(minBatchTime) // TODO: Could be tighter
    +      }
    +    }
    +  }
    +  microBatchThread.setDaemon(true)
    +  microBatchThread.setUncaughtExceptionHandler(
    +    new UncaughtExceptionHandler {
    +      override def uncaughtException(t: Thread, e: Throwable): Unit = {
    +        streamDeathCause = e
    +      }
    +    })
    +  microBatchThread.start()
    +
    +  @volatile
    +  private[sql] var lastExecution: QueryExecution = null
    +  @volatile
    +  private[sql] var streamDeathCause: Throwable = null
    +
    +  /**
    +   * Checks to see if any new data is present in any of the sources.  When new data is available,
    +   * a batch is executed and passed to the sink, updating the currentOffsets.
    +   */
    +  private def attemptBatch(): Unit = {
    +    val startTime = System.nanoTime()
    +
    +    // A list of offsets that need to be updated if this batch is successful.
    +    // Populated while walking the tree.
    +    val newOffsets = new ArrayBuffer[(Source, Offset)]
    +    // A list of attributes that will need to be updated.
    +    var replacements = new ArrayBuffer[(Attribute, Attribute)]
    +    // Replace sources in the logical plan with data that has arrived since the last batch.
    +    val withNewSources = logicalPlan transform {
    +      case StreamingRelation(source, output) =>
    +        val prevOffset = currentOffsets.get(source)
    +        val newBatch = source.getNextBatch(prevOffset)
    +
    +        newBatch.map { batch =>
    +          newOffsets += ((source, batch.end))
    +          val newPlan = batch.data.logicalPlan
    +
    +          assert(output.size == newPlan.output.size)
    +          replacements ++= output.zip(newPlan.output)
    +          newPlan
    +        }.getOrElse {
    +          LocalRelation(output)
    +        }
    +    }
    +
    +    // Rewire the plan to use the new attributes that were returned by the source.
    +    val replacementMap = AttributeMap(replacements)
    +    val newPlan = withNewSources transformAllExpressions {
    +      case a: Attribute if replacementMap.contains(a) => replacementMap(a)
    +    }
    +
    +    if (newOffsets.nonEmpty) {
    +      val optimizerStart = System.nanoTime()
    +
    +      lastExecution = new QueryExecution(sqlContext, newPlan)
    +      val executedPlan = lastExecution.executedPlan
    +      val optimizerTime = (System.nanoTime() - optimizerStart).toDouble / 1000000
    +      logDebug(s"Optimized batch in ${optimizerTime}ms")
    +
    +      StreamExecution.this.synchronized {
    --- End diff --
    
    nit: it maybe a safer idea to use an explicit locking object. This object is exposed to the user, and if someone accidentally (i.e. dumb user) uses this object as a lock, then it may screw with the logic here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-10820][SQL] Support for the continuous ...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11006#discussion_r51476990
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala ---
    @@ -0,0 +1,213 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming
    +
    +import java.lang.Thread.UncaughtExceptionHandler
    +
    +import scala.collection.mutable.ArrayBuffer
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.sql.{ContinuousQuery, DataFrame, SQLContext}
    +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap}
    +import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
    +import org.apache.spark.sql.catalyst.util._
    +import org.apache.spark.sql.execution.QueryExecution
    +
    +/**
    + * Manages the execution of a streaming Spark SQL query that is occurring in a separate thread.
    + * Unlike a standard query, a streaming query executes repeatedly each time new data arrives at any
    + * [[Source]] present in the query plan. Whenever new data arrives, a [[QueryExecution]] is created
    + * and the results are committed transactionally to the given [[Sink]].
    + */
    +class StreamExecution(
    +    sqlContext: SQLContext,
    +    private[sql] val logicalPlan: LogicalPlan,
    +    val sink: Sink) extends ContinuousQuery with Logging {
    +
    +  /** An monitor used to wait/notify when batches complete. */
    +  val awaitBatchLock = new Object
    +
    +  @volatile
    +  var batchRun = false
    +
    +  /** Minimum amount of time in between the start of each batch. */
    +  val minBatchTime = 10
    +
    +  /** Tracks how much data we have processed from each input source. */
    +  private[sql] val currentOffsets = new StreamProgress
    +
    +  /** All stream sources present the query plan. */
    +  private val sources =
    +    logicalPlan.collect { case s: StreamingRelation => s.source }
    +
    +  // Start the execution at the current offsets stored in the sink. (i.e. avoid reprocessing data
    +  // that we have already processed).
    +  {
    +    sink.currentOffset match {
    +      case Some(c: CompositeOffset) =>
    +        val storedProgress = c.offsets
    +        val sources = logicalPlan collect {
    +          case StreamingRelation(source, _) => source
    +        }
    +
    +        assert(sources.size == storedProgress.size)
    +        sources.zip(storedProgress).foreach { case (source, offset) =>
    +          offset.foreach(currentOffsets.update(source, _))
    +        }
    +      case None => // We are starting this stream for the first time.
    +      case _ => throw new IllegalArgumentException("Expected composite offset from sink")
    +    }
    +  }
    +
    +  logInfo(s"Stream running at $currentOffsets")
    +
    +  /** When false, signals to the microBatchThread that it should stop running. */
    +  @volatile private var shouldRun = true
    +
    +  // TODO: add exception handling to batch thread
    +  /** The thread that runs the micro-batches of this stream. */
    +  private[sql] val microBatchThread = new Thread("stream execution thread") {
    +    override def run(): Unit = {
    +      SQLContext.setActive(sqlContext)
    +      while (shouldRun) {
    +        attemptBatch()
    +        Thread.sleep(minBatchTime) // TODO: Could be tighter
    +      }
    +    }
    +  }
    +  microBatchThread.setDaemon(true)
    +  microBatchThread.start()
    +
    +  @volatile
    +  private[sql] var lastExecution: QueryExecution = null
    +  @volatile
    +  private[sql] var streamDeathCause: Throwable = null
    +
    +  microBatchThread.setUncaughtExceptionHandler(
    --- End diff --
    
    Could you move this statement above `microBatchThread.start()`? Otherwise, we may miss some exception happening between `microBatchThread.start()` and `microBatchThread.setUncaughtExceptionHandler`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-10820][SQL] Support for the continuous ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/11006#issuecomment-178146926
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-10820][SQL] Support for the continuous ...

Posted by tedyu <gi...@git.apache.org>.
Github user tedyu commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11006#discussion_r51652148
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/DataStreamReader.scala ---
    @@ -0,0 +1,127 @@
    +/*
    +* Licensed to the Apache Software Foundation (ASF) under one or more
    +* contributor license agreements.  See the NOTICE file distributed with
    +* this work for additional information regarding copyright ownership.
    +* The ASF licenses this file to You under the Apache License, Version 2.0
    +* (the "License"); you may not use this file except in compliance with
    +* the License.  You may obtain a copy of the License at
    +*
    +*    http://www.apache.org/licenses/LICENSE-2.0
    +*
    +* Unless required by applicable law or agreed to in writing, software
    +* distributed under the License is distributed on an "AS IS" BASIS,
    +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +* See the License for the specific language governing permissions and
    +* limitations under the License.
    +*/
    +
    +package org.apache.spark.sql
    +
    +import scala.collection.JavaConverters._
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.annotation.Experimental
    +import org.apache.spark.sql.execution.datasources.ResolvedDataSource
    +import org.apache.spark.sql.execution.streaming.StreamingRelation
    +import org.apache.spark.sql.types.StructType
    +
    +/**
    + * :: Experimental ::
    + * An interface to reading streaming data.  Use `sqlContext.streamFrom` to access these methods.
    + *
    + * {{{
    + *   val df = sqlContext.streamFrom
    + *    .format("...")
    + *    .open()
    + * }}}
    + */
    +@Experimental
    +class DataStreamReader private[sql](sqlContext: SQLContext) extends Logging {
    +
    +  /**
    +   * Specifies the input data source format.
    +   *
    +   * @since 2.0.0
    +   */
    +  def format(source: String): DataStreamReader = {
    +    this.source = source
    +    this
    +  }
    +
    +  /**
    +   * Specifies the input schema. Some data streams (e.g. JSON) can infer the input schema
    +   * automatically from data. By specifying the schema here, the underlying data stream can
    +   * skip the schema inference step, and thus speed up data reading.
    +   *
    +   * @since 2.0.0
    +   */
    +  def schema(schema: StructType): DataStreamReader = {
    +    this.userSpecifiedSchema = Option(schema)
    +    this
    +  }
    +
    +  /**
    +   * Adds an input option for the underlying data stream.
    +   *
    +   * @since 2.0.0
    +   */
    +  def option(key: String, value: String): DataStreamReader = {
    --- End diff --
    
    Should we add API which allows user to remove certain options ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-10820][SQL] Support for the continuous ...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on the pull request:

    https://github.com/apache/spark/pull/11006#issuecomment-178720153
  
    LGTM


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-10820][SQL] Support for the continuous ...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/11006


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-10820][SQL] Support for the continuous ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/11006#issuecomment-178294673
  
    **[Test build #50509 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50509/consoleFull)** for PR 11006 at commit [`5220ea8`](https://github.com/apache/spark/commit/5220ea8ba171064b328c85ccf20650452af0185d).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class StreamProgress `


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-10820][SQL] Support for the continuous ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/11006#issuecomment-178329534
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-10820][SQL] Support for the continuous ...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on the pull request:

    https://github.com/apache/spark/pull/11006#issuecomment-178144361
  
    test this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-10820][SQL] Support for the continuous ...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on the pull request:

    https://github.com/apache/spark/pull/11006#issuecomment-178730617
  
    Thanks! Merging to master.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-10820][SQL] Support for the continuous ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/11006#issuecomment-178335886
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/50529/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-10820][SQL] Support for the continuous ...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on the pull request:

    https://github.com/apache/spark/pull/11006#issuecomment-178323245
  
    test this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-10820][SQL] Support for the continuous ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/11006#issuecomment-178419567
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/50540/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-10820][SQL] Support for the continuous ...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11006#discussion_r51500216
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamProgress.scala ---
    @@ -0,0 +1,71 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming
    +
    +import scala.collection.mutable
    +
    +/**
    + * A helper class that looks like a Map[Source, Offset].
    + */
    +class StreamProgress extends Serializable {
    +  private val currentOffsets = new mutable.HashMap[Source, Offset]
    +    with mutable.SynchronizedMap[Source, Offset]
    +
    +  private[streaming] def update(source: Source, newOffset: Offset): Unit = {
    +    currentOffsets.get(source).foreach(old =>
    +      assert(newOffset > old, s"Stream going backwards $newOffset -> $old"))
    +    currentOffsets.put(source, newOffset)
    +  }
    +
    +  private[streaming] def update(newOffset: (Source, Offset)): Unit =
    +    update(newOffset._1, newOffset._2)
    +
    +  private[streaming] def apply(source: Source): Offset = currentOffsets(source)
    +  private[streaming] def get(source: Source): Option[Offset] = currentOffsets.get(source)
    +  private[streaming] def contains(source: Source): Boolean = currentOffsets.contains(source)
    +
    +  private[streaming] def ++(updates: Map[Source, Offset]): StreamProgress = {
    +    val updated = new StreamProgress
    +    currentOffsets.foreach(updated.update)
    +    updates.foreach(updated.update)
    +    updated
    +  }
    +
    +  /**
    +   * Used to create a new copy of this [[StreamProgress]]. While this class is currently mutable,
    +   * it should be copied before being passed to user code.
    +   */
    +  private[streaming] def copy(): StreamProgress = {
    +    val copied = new StreamProgress
    +    currentOffsets.foreach(copied.update)
    +    copied
    +  }
    +
    +  override def toString: String =
    +    currentOffsets.map { case (k, v) => s"$k: $v"}.mkString("{", ",", "}")
    +
    +  override def equals(other: Any): Boolean = other match {
    +    case s: StreamProgress =>
    +      s.currentOffsets.keys.toSet == currentOffsets.keys.toSet &&
    --- End diff --
    
    Both `LongOffset` and `CompositeOffset` are case classes, so I think that the default implementation of `hashCode` and `equals` are actually okay.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-10820][SQL] Support for the continuous ...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on the pull request:

    https://github.com/apache/spark/pull/11006#issuecomment-178386271
  
    test this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-10820][SQL] Support for the continuous ...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11006#discussion_r51511655
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamProgress.scala ---
    @@ -0,0 +1,71 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming
    +
    +import scala.collection.mutable
    +
    +/**
    + * A helper class that looks like a Map[Source, Offset].
    + */
    +class StreamProgress extends Serializable {
    +  private val currentOffsets = new mutable.HashMap[Source, Offset]
    +    with mutable.SynchronizedMap[Source, Offset]
    +
    +  private[streaming] def update(source: Source, newOffset: Offset): Unit = {
    +    currentOffsets.get(source).foreach(old =>
    +      assert(newOffset > old, s"Stream going backwards $newOffset -> $old"))
    +    currentOffsets.put(source, newOffset)
    +  }
    +
    +  private[streaming] def update(newOffset: (Source, Offset)): Unit =
    +    update(newOffset._1, newOffset._2)
    +
    +  private[streaming] def apply(source: Source): Offset = currentOffsets(source)
    +  private[streaming] def get(source: Source): Option[Offset] = currentOffsets.get(source)
    +  private[streaming] def contains(source: Source): Boolean = currentOffsets.contains(source)
    +
    +  private[streaming] def ++(updates: Map[Source, Offset]): StreamProgress = {
    +    val updated = new StreamProgress
    +    currentOffsets.foreach(updated.update)
    +    updates.foreach(updated.update)
    +    updated
    +  }
    +
    +  /**
    +   * Used to create a new copy of this [[StreamProgress]]. While this class is currently mutable,
    +   * it should be copied before being passed to user code.
    +   */
    +  private[streaming] def copy(): StreamProgress = {
    +    val copied = new StreamProgress
    +    currentOffsets.foreach(copied.update)
    +    copied
    +  }
    +
    +  override def toString: String =
    +    currentOffsets.map { case (k, v) => s"$k: $v"}.mkString("{", ",", "}")
    +
    +  override def equals(other: Any): Boolean = other match {
    +    case s: StreamProgress =>
    +      s.currentOffsets.keys.toSet == currentOffsets.keys.toSet &&
    --- End diff --
    
    Changed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-10820][SQL] Support for the continuous ...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11006#discussion_r51505200
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamProgress.scala ---
    @@ -0,0 +1,71 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming
    +
    +import scala.collection.mutable
    +
    +/**
    + * A helper class that looks like a Map[Source, Offset].
    + */
    +class StreamProgress extends Serializable {
    +  private val currentOffsets = new mutable.HashMap[Source, Offset]
    +    with mutable.SynchronizedMap[Source, Offset]
    +
    +  private[streaming] def update(source: Source, newOffset: Offset): Unit = {
    +    currentOffsets.get(source).foreach(old =>
    +      assert(newOffset > old, s"Stream going backwards $newOffset -> $old"))
    +    currentOffsets.put(source, newOffset)
    +  }
    +
    +  private[streaming] def update(newOffset: (Source, Offset)): Unit =
    +    update(newOffset._1, newOffset._2)
    +
    +  private[streaming] def apply(source: Source): Offset = currentOffsets(source)
    +  private[streaming] def get(source: Source): Option[Offset] = currentOffsets.get(source)
    +  private[streaming] def contains(source: Source): Boolean = currentOffsets.contains(source)
    +
    +  private[streaming] def ++(updates: Map[Source, Offset]): StreamProgress = {
    +    val updated = new StreamProgress
    +    currentOffsets.foreach(updated.update)
    +    updates.foreach(updated.update)
    +    updated
    +  }
    +
    +  /**
    +   * Used to create a new copy of this [[StreamProgress]]. While this class is currently mutable,
    +   * it should be copied before being passed to user code.
    +   */
    +  private[streaming] def copy(): StreamProgress = {
    +    val copied = new StreamProgress
    +    currentOffsets.foreach(copied.update)
    +    copied
    +  }
    +
    +  override def toString: String =
    +    currentOffsets.map { case (k, v) => s"$k: $v"}.mkString("{", ",", "}")
    +
    +  override def equals(other: Any): Boolean = other match {
    +    case s: StreamProgress =>
    +      s.currentOffsets.keys.toSet == currentOffsets.keys.toSet &&
    --- End diff --
    
    > Both LongOffset and CompositeOffset are case classes, so I think that the default implementation of hashCode and equals are actually okay.
    
    Sorry, didn't notice that.
    
    However, you can use `currentOffsets`'s `hashCode` and `equals` directly and not need to implement it again. Right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-10820][SQL] Support for the continuous ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11006#discussion_r51503397
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala ---
    @@ -0,0 +1,211 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming
    +
    +import java.lang.Thread.UncaughtExceptionHandler
    +
    +import scala.collection.mutable.ArrayBuffer
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.sql.{ContinuousQuery, DataFrame, SQLContext}
    +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap}
    +import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
    +import org.apache.spark.sql.catalyst.util._
    +import org.apache.spark.sql.execution.QueryExecution
    +
    +/**
    + * Manages the execution of a streaming Spark SQL query that is occurring in a separate thread.
    + * Unlike a standard query, a streaming query executes repeatedly each time new data arrives at any
    + * [[Source]] present in the query plan. Whenever new data arrives, a [[QueryExecution]] is created
    + * and the results are committed transactionally to the given [[Sink]].
    + */
    +class StreamExecution(
    --- End diff --
    
    While working on this codebase, I am finding it mentally tedious to continuously map `ContinuousQuery` interface to `StreamExecution`. Intuitively, the latter sounds like a totally new, separate concept compared to `ContinuousQuery`. It may be more intuitive from code readability point of view to call this `ContinuousQueryImpl`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-10820][SQL] Support for the continuous ...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11006#discussion_r51478334
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala ---
    @@ -0,0 +1,138 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming
    +
    +import java.util.concurrent.atomic.AtomicInteger
    +
    +import scala.collection.mutable.ArrayBuffer
    +
    +import org.apache.spark.{Logging, SparkEnv}
    +import org.apache.spark.sql.{DataFrame, Dataset, Encoder, Row, SQLContext}
    +import org.apache.spark.sql.catalyst.InternalRow
    +import org.apache.spark.sql.catalyst.encoders.{encoderFor, RowEncoder}
    +import org.apache.spark.sql.types.StructType
    +
    +object MemoryStream {
    +  protected val currentBlockId = new AtomicInteger(0)
    +  protected val memoryStreamId = new AtomicInteger(0)
    +
    +  def apply[A : Encoder](implicit sqlContext: SQLContext): MemoryStream[A] =
    +    new MemoryStream[A](memoryStreamId.getAndIncrement(), sqlContext)
    +}
    +
    +/**
    + * A [[Source]] that produces value stored in memory as they are added by the user.  This [[Source]]
    + * is primarily intended for use in unit tests as it can only replay data when the object is still
    + * available.
    + */
    +case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext)
    +    extends Source with Logging {
    +  protected val encoder = encoderFor[A]
    +  protected val logicalPlan = StreamingRelation(this)
    +  protected val output = logicalPlan.output
    +  protected val batches = new ArrayBuffer[Dataset[A]]
    +  protected var currentOffset: LongOffset = new LongOffset(-1)
    +
    +  protected def blockManager = SparkEnv.get.blockManager
    +
    +  def schema: StructType = encoder.schema
    +
    +  def getCurrentOffset: Offset = currentOffset
    +
    +  def toDS()(implicit sqlContext: SQLContext): Dataset[A] = {
    +    new Dataset(sqlContext, logicalPlan)
    +  }
    +
    +  def toDF()(implicit sqlContext: SQLContext): DataFrame = {
    +    new DataFrame(sqlContext, logicalPlan)
    +  }
    +
    +  def addData(data: TraversableOnce[A]): Offset = {
    +    import sqlContext.implicits._
    +    this.synchronized {
    +      currentOffset = currentOffset + 1
    +      val ds = data.toVector.toDS()
    +      logDebug(s"Adding ds: $ds")
    +      batches.append(ds)
    +      currentOffset
    +    }
    +  }
    +
    +  override def getNextBatch(start: Option[Offset]): Option[Batch] = synchronized {
    +    val newBlocks =
    +      batches.drop(
    +        start.map(_.asInstanceOf[LongOffset]).getOrElse(LongOffset(-1)).offset.toInt + 1)
    +
    +    if (newBlocks.nonEmpty) {
    +      logDebug(s"Running [$start, $currentOffset] on blocks ${newBlocks.mkString(", ")}")
    +      val df = newBlocks
    +          .map(_.toDF())
    +          .reduceOption(_ unionAll _)
    +          .getOrElse(sqlContext.emptyDataFrame)
    +
    +      Some(new Batch(currentOffset, df))
    +    } else {
    +      None
    +    }
    +  }
    +
    +  override def toString: String = s"MemoryStream[${output.mkString(",")}]"
    +}
    +
    +/**
    + * A sink that stores the results in memory. This [[Sink]] is primarily intended for use in unit
    + * tests and does not provide durablility.
    + */
    +class MemorySink(schema: StructType) extends Sink with Logging {
    +  /** An order list of batches that have been written to this [[Sink]]. */
    +  private var batches = new ArrayBuffer[Batch]()
    +
    +  /** Used to convert an [[InternalRow]] to an external [[Row]] for comparison in testing. */
    +  private val externalRowConverter = RowEncoder(schema)
    +
    +  override def currentOffset: Option[Offset] = synchronized {
    +    batches.lastOption.map(_.end)
    +  }
    +
    +  override def addBatch(nextBatch: Batch): Unit = synchronized {
    +    batches.append(nextBatch)
    +  }
    +
    +  /** Returns all rows that are stored in this [[Sink]]. */
    +  def allData: Seq[Row] = synchronized {
    +    batches
    +        .map(_.data)
    +        .reduceOption(_ unionAll _)
    +        .map(_.collect().toSeq)
    +        .getOrElse(Seq.empty)
    +  }
    +
    +  /**
    +   * Atomically drops the most recent `num` batches and resets the [[StreamProgress]] to the
    +   * corresponding point in the input. This function can be used when testing to simulate data
    +   * that has been lost due to buffering.
    +   */
    +  def dropBatches(num: Int): Unit = synchronized {
    +    batches.remove(batches.size - num, num)
    --- End diff --
    
    Can be simplified as `batches.dropRight(num)`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-10820][SQL] Support for the continuous ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the pull request:

    https://github.com/apache/spark/pull/11006#issuecomment-178279347
  
    LGTM, other than the naming issues (StandingQuery, etc. in the code)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-10820][SQL] Support for the continuous ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/11006#issuecomment-178322981
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-10820][SQL] Support for the continuous ...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11006#discussion_r51513704
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/DataStreamWriter.scala ---
    @@ -0,0 +1,133 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql
    +
    +import scala.collection.JavaConverters._
    +
    +import org.apache.spark.annotation.Experimental
    +import org.apache.spark.sql.execution.datasources.ResolvedDataSource
    +import org.apache.spark.sql.execution.streaming.StreamExecution
    +
    +/**
    + * :: Experimental ::
    + * Interface used to start a streaming query query execution.
    + *
    + * @since 2.0.0
    + */
    +@Experimental
    +final class DataStreamWriter private[sql](df: DataFrame) {
    +
    +  /**
    +   * Specifies the underlying output data source. Built-in options include "parquet", "json", etc.
    +   *
    +   * @since 2.0.0
    +   */
    +  def format(source: String): DataStreamWriter = {
    +    this.source = source
    +    this
    +  }
    +
    +  /**
    +   * Adds an output option for the underlying data source.
    +   *
    +   * @since 2.0.0
    +   */
    +  def option(key: String, value: String): DataStreamWriter = {
    +    this.extraOptions += (key -> value)
    +    this
    +  }
    +
    +  /**
    +   * (Scala-specific) Adds output options for the underlying data source.
    +   *
    +   * @since 2.0.0
    +   */
    +  def options(options: scala.collection.Map[String, String]): DataStreamWriter = {
    +    this.extraOptions ++= options
    +    this
    +  }
    +
    +  /**
    +   * Adds output options for the underlying data source.
    +   *
    +   * @since 2.0.0
    +   */
    +  def options(options: java.util.Map[String, String]): DataStreamWriter = {
    +    this.options(options.asScala)
    +    this
    +  }
    +
    +  /**
    +   * Partitions the output by the given columns on the file system. If specified, the output is
    +   * laid out on the file system similar to Hive's partitioning scheme.\
    +   * @since 2.0.0
    +   */
    +  @scala.annotation.varargs
    +  def partitionBy(colNames: String*): DataStreamWriter = {
    --- End diff --
    
    Wired it up and added tests.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-10820][SQL] Support for the continuous ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/11006#issuecomment-178191589
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/50498/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-10820][SQL] Support for the continuous ...

Posted by brkyvz <gi...@git.apache.org>.
Github user brkyvz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11006#discussion_r51506712
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/DataStreamWriter.scala ---
    @@ -0,0 +1,133 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql
    +
    +import scala.collection.JavaConverters._
    +
    +import org.apache.spark.annotation.Experimental
    +import org.apache.spark.sql.execution.datasources.ResolvedDataSource
    +import org.apache.spark.sql.execution.streaming.StreamExecution
    +
    +/**
    + * :: Experimental ::
    + * Interface used to start a streaming query query execution.
    + *
    + * @since 2.0.0
    + */
    +@Experimental
    +final class DataStreamWriter private[sql](df: DataFrame) {
    +
    +  /**
    +   * Specifies the underlying output data source. Built-in options include "parquet", "json", etc.
    +   *
    +   * @since 2.0.0
    +   */
    +  def format(source: String): DataStreamWriter = {
    +    this.source = source
    +    this
    +  }
    +
    +  /**
    +   * Adds an output option for the underlying data source.
    +   *
    +   * @since 2.0.0
    +   */
    +  def option(key: String, value: String): DataStreamWriter = {
    +    this.extraOptions += (key -> value)
    +    this
    +  }
    +
    +  /**
    +   * (Scala-specific) Adds output options for the underlying data source.
    +   *
    +   * @since 2.0.0
    +   */
    +  def options(options: scala.collection.Map[String, String]): DataStreamWriter = {
    +    this.extraOptions ++= options
    +    this
    +  }
    +
    +  /**
    +   * Adds output options for the underlying data source.
    +   *
    +   * @since 2.0.0
    +   */
    +  def options(options: java.util.Map[String, String]): DataStreamWriter = {
    +    this.options(options.asScala)
    +    this
    +  }
    +
    +  /**
    +   * Partitions the output by the given columns on the file system. If specified, the output is
    +   * laid out on the file system similar to Hive's partitioning scheme.\
    +   * @since 2.0.0
    +   */
    +  @scala.annotation.varargs
    +  def partitionBy(colNames: String*): DataStreamWriter = {
    --- End diff --
    
    I guess this was to make it consistent with DataFrameWriter


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-10820][SQL] Support for the continuous ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/11006#issuecomment-178417496
  
    **[Test build #2488 has finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/2488/consoleFull)** for PR 11006 at commit [`735e113`](https://github.com/apache/spark/commit/735e113dfd0a2eda60f01d8974c6f00df34c415f).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-10820][SQL] Support for the continuous ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11006#discussion_r51502783
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/DataStreamWriter.scala ---
    @@ -0,0 +1,133 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql
    +
    +import scala.collection.JavaConverters._
    +
    +import org.apache.spark.annotation.Experimental
    +import org.apache.spark.sql.execution.datasources.ResolvedDataSource
    +import org.apache.spark.sql.execution.streaming.StreamExecution
    +
    +/**
    + * :: Experimental ::
    + * Interface used to start a streaming query query execution.
    + *
    + * @since 2.0.0
    + */
    +@Experimental
    +final class DataStreamWriter private[sql](df: DataFrame) {
    +
    +  /**
    +   * Specifies the underlying output data source. Built-in options include "parquet", "json", etc.
    +   *
    +   * @since 2.0.0
    +   */
    +  def format(source: String): DataStreamWriter = {
    +    this.source = source
    +    this
    +  }
    +
    +  /**
    +   * Adds an output option for the underlying data source.
    +   *
    +   * @since 2.0.0
    +   */
    +  def option(key: String, value: String): DataStreamWriter = {
    +    this.extraOptions += (key -> value)
    +    this
    +  }
    +
    +  /**
    +   * (Scala-specific) Adds output options for the underlying data source.
    +   *
    +   * @since 2.0.0
    +   */
    +  def options(options: scala.collection.Map[String, String]): DataStreamWriter = {
    +    this.extraOptions ++= options
    +    this
    +  }
    +
    +  /**
    +   * Adds output options for the underlying data source.
    +   *
    +   * @since 2.0.0
    +   */
    +  def options(options: java.util.Map[String, String]): DataStreamWriter = {
    +    this.options(options.asScala)
    +    this
    +  }
    +
    +  /**
    +   * Partitions the output by the given columns on the file system. If specified, the output is
    +   * laid out on the file system similar to Hive's partitioning scheme.\
    +   * @since 2.0.0
    +   */
    +  @scala.annotation.varargs
    +  def partitionBy(colNames: String*): DataStreamWriter = {
    --- End diff --
    
    Is this used anywhere yet? I dont see even tests for this. If its not used yet and not tested, maybe better to not add this in this first pass


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-10820][SQL] Support for the continuous ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/11006#issuecomment-178267153
  
    **[Test build #50509 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50509/consoleFull)** for PR 11006 at commit [`5220ea8`](https://github.com/apache/spark/commit/5220ea8ba171064b328c85ccf20650452af0185d).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-10820][SQL] Support for the continuous ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/11006#issuecomment-178329506
  
    **[Test build #50527 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50527/consoleFull)** for PR 11006 at commit [`735e113`](https://github.com/apache/spark/commit/735e113dfd0a2eda60f01d8974c6f00df34c415f).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-10820][SQL] Support for the continuous ...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11006#discussion_r51483601
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala ---
    @@ -0,0 +1,346 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql
    +
    +import java.lang.Thread.UncaughtExceptionHandler
    +
    +import scala.collection.mutable
    +import scala.collection.mutable.ArrayBuffer
    +import scala.util.Random
    +
    +import org.scalatest.concurrent.Timeouts
    +import org.scalatest.time.SpanSugar._
    +
    +import org.apache.spark.sql.catalyst.encoders.{encoderFor, ExpressionEncoder, RowEncoder}
    +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
    +import org.apache.spark.sql.catalyst.util._
    +import org.apache.spark.sql.execution.streaming._
    +
    +/**
    + * A framework for implementing tests for streaming queries and sources.
    + *
    + * A test consists of a set of steps (expressed as a `StreamAction`) that are executed in order,
    + * blocking as necessary to let the stream catch up.  For example, the following adds some data to
    + * a stream, blocking until it can verify that the correct values are eventually produced.
    + *
    + * {{{
    + *  val inputData = MemoryStream[Int]
    +    val mapped = inputData.toDS().map(_ + 1)
    +
    +    testStream(mapped)(
    +      AddData(inputData, 1, 2, 3),
    +      CheckAnswer(2, 3, 4))
    + * }}}
    + *
    + * Note that while we do sleep to allow the other thread to progress without spinning,
    + * `StreamAction` checks should not depend on the amount of time spent sleeping.  Instead they
    + * should check the actual progress of the stream before verifying the required test condition.
    + *
    + * Currently it is assumed that all streaming queries will eventually complete in 10 seconds to
    + * avoid hanging forever in the case of failures. However, individual suites can change this
    + * by overriding `streamingTimeout`.
    + */
    +trait StreamTest extends QueryTest with Timeouts {
    +
    +  implicit class RichSource(s: Source) {
    +    def toDF(): DataFrame = new DataFrame(sqlContext, StreamingRelation(s))
    +  }
    +
    +  /** How long to wait for an active stream to catch up when checking a result. */
    +  val streamingTimout = 10.seconds
    +
    +  /** A trait for actions that can be performed while testing a streaming DataFrame. */
    +  trait StreamAction
    +
    +  /** A trait to mark actions that require the stream to be actively running. */
    +  trait StreamMustBeRunning
    +
    +  /**
    +   * Adds the given data to the stream.  Subsuquent check answers will block until this data has
    +   * been processed.
    +   */
    +  object AddData {
    +    def apply[A](source: MemoryStream[A], data: A*): AddDataMemory[A] =
    +      AddDataMemory(source, data)
    +  }
    +
    +  /** A trait that can be extended when testing other sources. */
    +  trait AddData extends StreamAction {
    +    def source: Source
    +
    +    /**
    +     * Called to trigger adding the data.  Should return the offset that will denote when this
    +     * new data has been processed.
    +     */
    +    def addData(): Offset
    +  }
    +
    +  case class AddDataMemory[A](source: MemoryStream[A], data: Seq[A]) extends AddData {
    +    override def toString: String = s"AddData to $source: ${data.mkString(",")}"
    +
    +    override def addData(): Offset = {
    +      source.addData(data)
    +    }
    +  }
    +
    +  /**
    +   * Checks to make sure that the current data stored in the sink matches the `expectedAnswer`.
    +   * This operation automatically blocks untill all added data has been processed.
    +   */
    +  object CheckAnswer {
    +    def apply[A : Encoder](data: A*): CheckAnswerRows = {
    +      val encoder = encoderFor[A]
    +      val toExternalRow = RowEncoder(encoder.schema)
    +      CheckAnswerRows(data.map(d => toExternalRow.fromRow(encoder.toRow(d))))
    +    }
    +
    +    def apply(rows: Row*): CheckAnswerRows = CheckAnswerRows(rows)
    +  }
    +
    +  case class CheckAnswerRows(expectedAnswer: Seq[Row])
    +      extends StreamAction with StreamMustBeRunning {
    +    override def toString: String = s"CheckAnswer: ${expectedAnswer.mkString(",")}"
    +  }
    +
    +  case class DropBatches(num: Int) extends StreamAction
    +
    +  /** Stops the stream.  It must currently be running. */
    +  case object StopStream extends StreamAction
    +
    +  /** Starts the stream, resuming if data has already been processed.  It must not be running. */
    +  case object StartStream extends StreamAction
    +
    +  /** Signals that a failure is expected and should not kill the test. */
    +  case object ExpectFailure extends StreamAction
    +
    +  /** A helper for running actions on a Streaming Dataset. See `checkAnswer(DataFrame)`. */
    +  def testStream(stream: Dataset[_])(actions: StreamAction*): Unit =
    +    testStream(stream.toDF())(actions: _*)
    +
    +  /**
    +   * Executes the specified actions on the the given streaming DataFrame and provides helpful
    +   * error messages in the case of failures or incorrect answers.
    +   *
    +   * Note that if the stream is not explictly started before an action that requires it to be
    +   * running then it will be automatically started before performing any other actions.
    +   */
    +  def testStream(stream: DataFrame)(actions: StreamAction*): Unit = {
    +    var pos = 0
    +    var currentPlan: LogicalPlan = stream.logicalPlan
    +    var currentStream: StreamExecution = null
    +    val awaiting = new mutable.HashMap[Source, Offset]()
    +    val sink = new MemorySink(stream.schema)
    +
    +    @volatile
    +    var streamDeathCause: Throwable = null
    +
    +    // If the test doesn't manually start the stream, we do it automatically at the beginning.
    +    val startedManually =
    +      actions.takeWhile(_.isInstanceOf[StreamMustBeRunning]).contains(StartStream)
    +    val startedTest = if (startedManually) actions else StartStream +: actions
    +
    +    def testActions = actions.zipWithIndex.map {
    +      case (a, i) =>
    +        if ((pos == i && startedManually) || (pos == (i + 1) && !startedManually)) {
    +          "=> " + a.toString
    +        } else {
    +          "   " + a.toString
    +        }
    +    }.mkString("\n")
    +
    +    def currentOffsets =
    +      if (currentStream != null) currentStream.currentOffsets.toString else "not started"
    +
    +    def threadState =
    +      if (currentStream != null && currentStream.microBatchThread.isAlive) "alive" else "dead"
    +    def testState =
    +      s"""
    +         |== Progress ==
    +         |$testActions
    +         |
    +         |== Stream ==
    +         |Stream state: $currentOffsets
    +         |Thread state: $threadState
    +         |${if (streamDeathCause != null) stackTraceToString(streamDeathCause) else ""}
    +         |
    +         |== Sink ==
    +         |$sink
    +         |
    +         |== Plan ==
    +         |${if (currentStream != null) currentStream.lastExecution else ""}
    +         """
    +
    +    def checkState(check: Boolean, error: String) = if (!check) {
    +      fail(
    +        s"""
    +           |Invalid State: $error
    +           |$testState
    +         """.stripMargin)
    +    }
    +
    +    val testThread = Thread.currentThread()
    +
    +    try {
    +      startedTest.foreach { action =>
    +        action match {
    +          case StartStream =>
    +            checkState(currentStream == null, "stream already running")
    +
    +            currentStream = new StreamExecution(sqlContext, stream.logicalPlan, sink)
    +            currentStream.microBatchThread.setUncaughtExceptionHandler(
    --- End diff --
    
    The exception between `microBatchThread.start()` and `microBatchThread.setUncaughtExceptionHandler` will be missing. How about adding an `UncaughtExceptionHandler ` parameter to `StreamExecution` and passing it into `StreamExecution`? So that we can set it before `microBatchThread.start()`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-10820][SQL] Support for the continuous ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/11006#issuecomment-178329536
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/50527/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-10820][SQL] Support for the continuous ...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11006#discussion_r51476814
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala ---
    @@ -0,0 +1,213 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming
    +
    +import java.lang.Thread.UncaughtExceptionHandler
    +
    +import scala.collection.mutable.ArrayBuffer
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.sql.{ContinuousQuery, DataFrame, SQLContext}
    +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap}
    +import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
    +import org.apache.spark.sql.catalyst.util._
    +import org.apache.spark.sql.execution.QueryExecution
    +
    +/**
    + * Manages the execution of a streaming Spark SQL query that is occurring in a separate thread.
    + * Unlike a standard query, a streaming query executes repeatedly each time new data arrives at any
    + * [[Source]] present in the query plan. Whenever new data arrives, a [[QueryExecution]] is created
    + * and the results are committed transactionally to the given [[Sink]].
    + */
    +class StreamExecution(
    +    sqlContext: SQLContext,
    +    private[sql] val logicalPlan: LogicalPlan,
    +    val sink: Sink) extends ContinuousQuery with Logging {
    +
    +  /** An monitor used to wait/notify when batches complete. */
    +  val awaitBatchLock = new Object
    +
    +  @volatile
    +  var batchRun = false
    --- End diff --
    
    private


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-10820][SQL] Support for the continuous ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/11006#issuecomment-178390554
  
    **[Test build #50540 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50540/consoleFull)** for PR 11006 at commit [`735e113`](https://github.com/apache/spark/commit/735e113dfd0a2eda60f01d8974c6f00df34c415f).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-10820][SQL] Support for the continuous ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/11006#issuecomment-178191582
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-10820][SQL] Support for the continuous ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11006#discussion_r51503402
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala ---
    @@ -0,0 +1,211 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming
    +
    +import java.lang.Thread.UncaughtExceptionHandler
    +
    +import scala.collection.mutable.ArrayBuffer
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.sql.{ContinuousQuery, DataFrame, SQLContext}
    +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap}
    +import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
    +import org.apache.spark.sql.catalyst.util._
    +import org.apache.spark.sql.execution.QueryExecution
    +
    +/**
    + * Manages the execution of a streaming Spark SQL query that is occurring in a separate thread.
    + * Unlike a standard query, a streaming query executes repeatedly each time new data arrives at any
    + * [[Source]] present in the query plan. Whenever new data arrives, a [[QueryExecution]] is created
    + * and the results are committed transactionally to the given [[Sink]].
    + */
    +class StreamExecution(
    +    sqlContext: SQLContext,
    +    private[sql] val logicalPlan: LogicalPlan,
    +    val sink: Sink) extends ContinuousQuery with Logging {
    +
    +  /** An monitor used to wait/notify when batches complete. */
    +  private val awaitBatchLock = new Object
    +
    +  @volatile
    +  private var batchRun = false
    +
    +  /** Minimum amount of time in between the start of each batch. */
    +  private val minBatchTime = 10
    +
    +  /** Tracks how much data we have processed from each input source. */
    +  private[sql] val currentOffsets = new StreamProgress
    --- End diff --
    
    please, currentOffsets --> streamProgress. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-10820][SQL] Support for the continuous ...

Posted by tedyu <gi...@git.apache.org>.
Github user tedyu commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11006#discussion_r51654597
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala ---
    @@ -0,0 +1,211 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming
    +
    +import java.lang.Thread.UncaughtExceptionHandler
    +
    +import scala.collection.mutable.ArrayBuffer
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.sql.{ContinuousQuery, DataFrame, SQLContext}
    +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap}
    +import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
    +import org.apache.spark.sql.catalyst.util._
    +import org.apache.spark.sql.execution.QueryExecution
    +
    +/**
    + * Manages the execution of a streaming Spark SQL query that is occurring in a separate thread.
    + * Unlike a standard query, a streaming query executes repeatedly each time new data arrives at any
    + * [[Source]] present in the query plan. Whenever new data arrives, a [[QueryExecution]] is created
    + * and the results are committed transactionally to the given [[Sink]].
    + */
    +class StreamExecution(
    +    sqlContext: SQLContext,
    +    private[sql] val logicalPlan: LogicalPlan,
    +    val sink: Sink) extends ContinuousQuery with Logging {
    +
    +  /** An monitor used to wait/notify when batches complete. */
    +  private val awaitBatchLock = new Object
    +
    +  @volatile
    +  private var batchRun = false
    +
    +  /** Minimum amount of time in between the start of each batch. */
    +  private val minBatchTime = 10
    +
    +  /** Tracks how much data we have processed from each input source. */
    +  private[sql] val streamProgress = new StreamProgress
    +
    +  /** All stream sources present the query plan. */
    +  private val sources =
    +    logicalPlan.collect { case s: StreamingRelation => s.source }
    +
    +  // Start the execution at the current offsets stored in the sink. (i.e. avoid reprocessing data
    +  // that we have already processed).
    +  {
    +    sink.currentOffset match {
    +      case Some(c: CompositeOffset) =>
    +        val storedProgress = c.offsets
    +        val sources = logicalPlan collect {
    +          case StreamingRelation(source, _) => source
    +        }
    +
    +        assert(sources.size == storedProgress.size)
    +        sources.zip(storedProgress).foreach { case (source, offset) =>
    +          offset.foreach(streamProgress.update(source, _))
    +        }
    +      case None => // We are starting this stream for the first time.
    +      case _ => throw new IllegalArgumentException("Expected composite offset from sink")
    +    }
    +  }
    +
    +  logInfo(s"Stream running at $streamProgress")
    +
    +  /** When false, signals to the microBatchThread that it should stop running. */
    +  @volatile private var shouldRun = true
    +
    +  /** The thread that runs the micro-batches of this stream. */
    +  private[sql] val microBatchThread = new Thread("stream execution thread") {
    +    override def run(): Unit = {
    +      SQLContext.setActive(sqlContext)
    +      while (shouldRun) {
    +        attemptBatch()
    +        Thread.sleep(minBatchTime) // TODO: Could be tighter
    +      }
    +    }
    +  }
    +  microBatchThread.setDaemon(true)
    +  microBatchThread.setUncaughtExceptionHandler(
    +    new UncaughtExceptionHandler {
    +      override def uncaughtException(t: Thread, e: Throwable): Unit = {
    +        streamDeathCause = e
    +      }
    +    })
    +  microBatchThread.start()
    +
    +  @volatile
    +  private[sql] var lastExecution: QueryExecution = null
    +  @volatile
    +  private[sql] var streamDeathCause: Throwable = null
    +
    +  /**
    +   * Checks to see if any new data is present in any of the sources.  When new data is available,
    +   * a batch is executed and passed to the sink, updating the currentOffsets.
    +   */
    +  private def attemptBatch(): Unit = {
    +    val startTime = System.nanoTime()
    +
    +    // A list of offsets that need to be updated if this batch is successful.
    +    // Populated while walking the tree.
    +    val newOffsets = new ArrayBuffer[(Source, Offset)]
    +    // A list of attributes that will need to be updated.
    +    var replacements = new ArrayBuffer[(Attribute, Attribute)]
    +    // Replace sources in the logical plan with data that has arrived since the last batch.
    +    val withNewSources = logicalPlan transform {
    +      case StreamingRelation(source, output) =>
    +        val prevOffset = streamProgress.get(source)
    +        val newBatch = source.getNextBatch(prevOffset)
    +
    +        newBatch.map { batch =>
    +          newOffsets += ((source, batch.end))
    +          val newPlan = batch.data.logicalPlan
    +
    +          assert(output.size == newPlan.output.size)
    +          replacements ++= output.zip(newPlan.output)
    +          newPlan
    +        }.getOrElse {
    +          LocalRelation(output)
    +        }
    +    }
    +
    +    // Rewire the plan to use the new attributes that were returned by the source.
    +    val replacementMap = AttributeMap(replacements)
    +    val newPlan = withNewSources transformAllExpressions {
    +      case a: Attribute if replacementMap.contains(a) => replacementMap(a)
    +    }
    +
    +    if (newOffsets.nonEmpty) {
    +      val optimizerStart = System.nanoTime()
    +
    +      lastExecution = new QueryExecution(sqlContext, newPlan)
    +      val executedPlan = lastExecution.executedPlan
    +      val optimizerTime = (System.nanoTime() - optimizerStart).toDouble / 1000000
    +      logDebug(s"Optimized batch in ${optimizerTime}ms")
    +
    +      streamProgress.synchronized {
    +        // Update the offsets and calculate a new composite offset
    +        newOffsets.foreach(streamProgress.update)
    +        val newStreamProgress = logicalPlan.collect {
    +          case StreamingRelation(source, _) => streamProgress.get(source)
    +        }
    +        val batchOffset = CompositeOffset(newStreamProgress)
    +
    +        // Construct the batch and send it to the sink.
    +        val nextBatch = new Batch(batchOffset, new DataFrame(sqlContext, newPlan))
    +        sink.addBatch(nextBatch)
    +      }
    +
    +      batchRun = true
    +      awaitBatchLock.synchronized {
    +        // Wake up any threads that are waiting for the stream to progress.
    +        awaitBatchLock.notifyAll()
    +      }
    +
    +      val batchTime = (System.nanoTime() - startTime).toDouble / 1000000
    +      logInfo(s"Compete up to $newOffsets in ${batchTime}ms")
    --- End diff --
    
    typo: Compete


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-10820][SQL] Support for the continuous ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/11006#issuecomment-178335884
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-10820][SQL] Support for the continuous ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/11006#issuecomment-178295397
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/50509/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-10820][SQL] Support for the continuous ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/11006#issuecomment-178143411
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/50496/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-10820][SQL] Support for the continuous ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/11006#issuecomment-178295383
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-10820][SQL] Support for the continuous ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/11006#issuecomment-178327196
  
    **[Test build #50529 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50529/consoleFull)** for PR 11006 at commit [`735e113`](https://github.com/apache/spark/commit/735e113dfd0a2eda60f01d8974c6f00df34c415f).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-10820][SQL] Support for the continuous ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/11006#issuecomment-178143406
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-10820][SQL] Support for the continuous ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/11006#issuecomment-178390314
  
    **[Test build #2488 has started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/2488/consoleFull)** for PR 11006 at commit [`735e113`](https://github.com/apache/spark/commit/735e113dfd0a2eda60f01d8974c6f00df34c415f).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-10820][SQL] Support for the continuous ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/11006#issuecomment-178322983
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/50526/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-10820][SQL] Support for the continuous ...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11006#discussion_r51507133
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala ---
    @@ -0,0 +1,211 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming
    +
    +import java.lang.Thread.UncaughtExceptionHandler
    +
    +import scala.collection.mutable.ArrayBuffer
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.sql.{ContinuousQuery, DataFrame, SQLContext}
    +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap}
    +import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
    +import org.apache.spark.sql.catalyst.util._
    +import org.apache.spark.sql.execution.QueryExecution
    +
    +/**
    + * Manages the execution of a streaming Spark SQL query that is occurring in a separate thread.
    + * Unlike a standard query, a streaming query executes repeatedly each time new data arrives at any
    + * [[Source]] present in the query plan. Whenever new data arrives, a [[QueryExecution]] is created
    + * and the results are committed transactionally to the given [[Sink]].
    + */
    +class StreamExecution(
    --- End diff --
    
    My goals with the naming here were two fold:
     - Mirror `QueryExecution` which has a similar purpose in the existing batch execution pipeline.
     - Have some separation in naming since it is likely that there will likely be more than one backend  that shares a common public interface.  I think its also possible that we should not have this inherit from `ContinuousQuery` but instead `ContinuousQuery` should just hold an instance of some specific execution strategy.  For this reason I'd probably avoid the Impl naming.
    
    I was considering naming this `BatchStreamExecution` or `MicroBatchExecution` or something to make that more clear though.  Thoughts?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org