You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by mukulmurthy <gi...@git.apache.org> on 2018/06/28 22:27:10 UTC

[GitHub] spark pull request #21662: [SPARK-24662][SQL][SS] Support limit in structure...

GitHub user mukulmurthy opened a pull request:

    https://github.com/apache/spark/pull/21662

    [SPARK-24662][SQL][SS] Support limit in structured streaming

    ## What changes were proposed in this pull request?
    
    Support the LIMIT operator in structured streaming.
    
    For streams in append or complete output mode, a stream with a LIMIT operator will return no more than the specified number of rows. LIMIT is still unsupported for the update output mode.
    
    ## How was this patch tested?
    
    New and existing unit tests.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/mukulmurthy/oss-spark SPARK-24662

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/21662.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #21662
    
----
commit 94b93f735b32fc6be0add4ac710987f626e46e1a
Author: Mukul Murthy <mu...@...>
Date:   2018-06-26T23:06:45Z

    Revert "[SPARK-24525][SS] Provide an option to limit number of rows in a MemorySink"
    
    This reverts commit e4fee395ecd93ad4579d9afbf0861f82a303e563.

commit 1ba894a83297ee1e728511c4e90e10e1abc8fcaf
Author: Mukul Murthy <mu...@...>
Date:   2018-06-26T23:38:32Z

    Add tests for streaming with LIMIT

commit b006f4a739eac54f191a904b64ad814881fa70e5
Author: Mukul Murthy <mu...@...>
Date:   2018-06-27T17:11:32Z

    Updated the unsupported operation checker and added test for Update mode

commit 66e1073b9e87f0fd604c8d43651264753a701279
Author: Mukul Murthy <mu...@...>
Date:   2018-06-27T23:33:05Z

    Scaffold StreamingLimitExec and insert it into query plan for streams

commit 585e5b3cf7fb6f182c02c7204ca80e6da78b203c
Author: Mukul Murthy <mu...@...>
Date:   2018-06-28T18:00:06Z

    Make StreamingLimitExec read and store state

commit 904d4379424e351711034d7294baa1a4955625cd
Author: Mukul Murthy <mu...@...>
Date:   2018-06-28T19:51:42Z

    Implement the limit and add more tests

commit 82b72286cc6eefdee078d99668dfb2b44f634ed5
Author: Mukul Murthy <mu...@...>
Date:   2018-06-28T22:18:18Z

    Cleaned up implementation, better testing

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21662: [SPARK-24662][SQL][SS] Support limit in structured strea...

Posted by mukulmurthy <gi...@git.apache.org>.
Github user mukulmurthy commented on the issue:

    https://github.com/apache/spark/pull/21662
  
    @tdas @marmbrus @jose-torres for review


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21662: [SPARK-24662][SQL][SS] Support limit in structured strea...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21662
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21662: [SPARK-24662][SQL][SS] Support limit in structured strea...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21662
  
    **[Test build #92654 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/92654/testReport)** for PR 21662 at commit [`d12fb1f`](https://github.com/apache/spark/commit/d12fb1fa67707440ab88ce793bd724e4fb27ad0b).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21662: [SPARK-24662][SQL][SS] Support limit in structured strea...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21662
  
    **[Test build #92440 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/92440/testReport)** for PR 21662 at commit [`82b7228`](https://github.com/apache/spark/commit/82b72286cc6eefdee078d99668dfb2b44f634ed5).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21662: [SPARK-24662][SQL][SS] Support limit in structure...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21662#discussion_r199286570
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingLimitExec.scala ---
    @@ -0,0 +1,96 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.sql.execution.streaming
    +
    +import java.util.concurrent.TimeUnit.NANOSECONDS
    +
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.sql.catalyst.InternalRow
    +import org.apache.spark.sql.catalyst.expressions.Attribute
    +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
    +import org.apache.spark.sql.catalyst.expressions.UnsafeProjection
    +import org.apache.spark.sql.catalyst.expressions.UnsafeRow
    +import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, Distribution, Partitioning}
    +import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
    +import org.apache.spark.sql.execution.streaming.state.StateStoreOps
    +import org.apache.spark.sql.types.{LongType, NullType, StructField, StructType}
    +import org.apache.spark.util.CompletionIterator
    +
    +/**
    + * A physical operator for executing a streaming limit, which makes sure no more than streamLimit
    + * rows are returned.
    + */
    +case class StreamingLimitExec(
    +    streamLimit: Long,
    +    child: SparkPlan,
    +    stateInfo: Option[StatefulOperatorStateInfo] = None)
    +  extends UnaryExecNode with StateStoreWriter {
    +
    +  private val keySchema = StructType(Array(StructField("key", NullType)))
    +  private val valueSchema = StructType(Array(StructField("value", LongType)))
    +
    +  override protected def doExecute(): RDD[InternalRow] = {
    +    metrics // force lazy init at driver
    +
    +    child.execute().mapPartitionsWithStateStore(
    +      getStateInfo,
    +      keySchema,
    +      valueSchema,
    +      indexOrdinal = None,
    +      sqlContext.sessionState,
    +      Some(sqlContext.streams.stateStoreCoordinator)) { (store, iter) =>
    +      val key = UnsafeProjection.create(keySchema)(new GenericInternalRow(Array[Any](null)))
    +      val numOutputRows = longMetric("numOutputRows")
    +      val numUpdatedStateRows = longMetric("numUpdatedStateRows")
    +      val allUpdatesTimeMs = longMetric("allUpdatesTimeMs")
    +      val commitTimeMs = longMetric("commitTimeMs")
    +      val updatesStartTimeNs = System.nanoTime
    +
    +      val startCount: Long = Option(store.get(key)).map(_.getLong(0)).getOrElse(0L)
    +      var rowCount = startCount
    +
    +      val result = iter.filter { r =>
    +        val x = rowCount < streamLimit
    --- End diff --
    
    Oh and we should be planning a `LocalLimit` before this and perhaps `GlobalStreamingLimitExec` would be a better name to make the functionality obvious.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21662: [SPARK-24662][SQL][SS] Support limit in structure...

Posted by mukulmurthy <gi...@git.apache.org>.
Github user mukulmurthy commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21662#discussion_r199606669
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingLimitExec.scala ---
    @@ -0,0 +1,96 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.sql.execution.streaming
    +
    +import java.util.concurrent.TimeUnit.NANOSECONDS
    +
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.sql.catalyst.InternalRow
    +import org.apache.spark.sql.catalyst.expressions.Attribute
    +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
    +import org.apache.spark.sql.catalyst.expressions.UnsafeProjection
    +import org.apache.spark.sql.catalyst.expressions.UnsafeRow
    +import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, Distribution, Partitioning}
    +import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
    +import org.apache.spark.sql.execution.streaming.state.StateStoreOps
    +import org.apache.spark.sql.types.{LongType, NullType, StructField, StructType}
    +import org.apache.spark.util.CompletionIterator
    +
    +/**
    + * A physical operator for executing a streaming limit, which makes sure no more than streamLimit
    + * rows are returned.
    + */
    +case class StreamingLimitExec(
    +    streamLimit: Long,
    +    child: SparkPlan,
    +    stateInfo: Option[StatefulOperatorStateInfo] = None)
    +  extends UnaryExecNode with StateStoreWriter {
    +
    +  private val keySchema = StructType(Array(StructField("key", NullType)))
    +  private val valueSchema = StructType(Array(StructField("value", LongType)))
    +
    +  override protected def doExecute(): RDD[InternalRow] = {
    +    metrics // force lazy init at driver
    --- End diff --
    
    Can chat more about this one offline but from talking to TD it doesn't sound like there's a simple fix for this 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21662: [SPARK-24662][SQL][SS] Support limit in structure...

Posted by jose-torres <gi...@git.apache.org>.
Github user jose-torres commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21662#discussion_r199283820
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkSuite.scala ---
    @@ -70,35 +68,9 @@ class MemorySinkSuite extends StreamTest with BeforeAndAfter {
         checkAnswer(sink.allData, 1 to 9)
       }
     
    -  test("directly add data in Append output mode with row limit") {
    --- End diff --
    
    Nit: I'd kinda prefer reverting as a separate PR


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21662: [SPARK-24662][SQL][SS] Support limit in structured strea...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21662
  
    **[Test build #92440 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/92440/testReport)** for PR 21662 at commit [`82b7228`](https://github.com/apache/spark/commit/82b72286cc6eefdee078d99668dfb2b44f634ed5).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21662: [SPARK-24662][SQL][SS] Support limit in structured strea...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21662
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21662: [SPARK-24662][SQL][SS] Support limit in structure...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21662#discussion_r201097931
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingGlobalLimitExec.scala ---
    @@ -0,0 +1,102 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.sql.execution.streaming
    +
    +import java.util.concurrent.TimeUnit.NANOSECONDS
    +
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.sql.catalyst.InternalRow
    +import org.apache.spark.sql.catalyst.expressions.Attribute
    +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
    +import org.apache.spark.sql.catalyst.expressions.UnsafeProjection
    +import org.apache.spark.sql.catalyst.expressions.UnsafeRow
    +import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, Distribution, Partitioning}
    +import org.apache.spark.sql.catalyst.streaming.InternalOutputModes
    +import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
    +import org.apache.spark.sql.execution.streaming.state.StateStoreOps
    +import org.apache.spark.sql.streaming.OutputMode
    +import org.apache.spark.sql.types.{LongType, NullType, StructField, StructType}
    +import org.apache.spark.util.CompletionIterator
    +
    +/**
    + * A physical operator for executing a streaming limit, which makes sure no more than streamLimit
    + * rows are returned. This operator is meant for streams in Append mode only.
    + */
    +case class StreamingGlobalLimitExec(
    +    streamLimit: Long,
    +    child: SparkPlan,
    +    stateInfo: Option[StatefulOperatorStateInfo] = None,
    +    outputMode: Option[OutputMode] = None)
    +  extends UnaryExecNode with StateStoreWriter {
    +
    +  private val keySchema = StructType(Array(StructField("key", NullType)))
    +  private val valueSchema = StructType(Array(StructField("value", LongType)))
    +
    +  override protected def doExecute(): RDD[InternalRow] = {
    +    metrics // force lazy init at driver
    +
    +    assert(outputMode.isDefined && outputMode.get == InternalOutputModes.Append,
    +      "StreamingGlobalLimitExec is only valid for streams in Append output mode")
    +
    +    child.execute().mapPartitionsWithStateStore(
    +        getStateInfo,
    +        keySchema,
    +        valueSchema,
    +        indexOrdinal = None,
    +        sqlContext.sessionState,
    +        Some(sqlContext.streams.stateStoreCoordinator)) { (store, iter) =>
    +      val key = UnsafeProjection.create(keySchema)(new GenericInternalRow(Array[Any](null)))
    +      val numOutputRows = longMetric("numOutputRows")
    +      val numUpdatedStateRows = longMetric("numUpdatedStateRows")
    +      val allUpdatesTimeMs = longMetric("allUpdatesTimeMs")
    +      val commitTimeMs = longMetric("commitTimeMs")
    +      val updatesStartTimeNs = System.nanoTime
    +
    +      val startCount: Long = Option(store.get(key)).map(_.getLong(0)).getOrElse(0L)
    +      var rowCount = startCount
    --- End diff --
    
    can you comment that rowCount is the cumulative count? or maybe name it `numCumulativeOutputRows`?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21662: [SPARK-24662][SQL][SS] Support limit in structured strea...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21662
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/92654/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21662: [SPARK-24662][SQL][SS] Support limit in structure...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21662#discussion_r199279216
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingLimitExec.scala ---
    @@ -0,0 +1,96 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.sql.execution.streaming
    +
    +import java.util.concurrent.TimeUnit.NANOSECONDS
    +
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.sql.catalyst.InternalRow
    +import org.apache.spark.sql.catalyst.expressions.Attribute
    +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
    +import org.apache.spark.sql.catalyst.expressions.UnsafeProjection
    +import org.apache.spark.sql.catalyst.expressions.UnsafeRow
    +import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, Distribution, Partitioning}
    +import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
    +import org.apache.spark.sql.execution.streaming.state.StateStoreOps
    +import org.apache.spark.sql.types.{LongType, NullType, StructField, StructType}
    +import org.apache.spark.util.CompletionIterator
    +
    +/**
    + * A physical operator for executing a streaming limit, which makes sure no more than streamLimit
    + * rows are returned.
    + */
    +case class StreamingLimitExec(
    +    streamLimit: Long,
    +    child: SparkPlan,
    +    stateInfo: Option[StatefulOperatorStateInfo] = None)
    +  extends UnaryExecNode with StateStoreWriter {
    +
    +  private val keySchema = StructType(Array(StructField("key", NullType)))
    +  private val valueSchema = StructType(Array(StructField("value", LongType)))
    +
    +  override protected def doExecute(): RDD[InternalRow] = {
    +    metrics // force lazy init at driver
    +
    +    child.execute().mapPartitionsWithStateStore(
    +      getStateInfo,
    +      keySchema,
    +      valueSchema,
    +      indexOrdinal = None,
    +      sqlContext.sessionState,
    +      Some(sqlContext.streams.stateStoreCoordinator)) { (store, iter) =>
    --- End diff --
    
    Nit: I'd indent 4 above to distinguish these two blocks visually.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21662: [SPARK-24662][SQL][SS] Support limit in structure...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21662#discussion_r199278369
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala ---
    @@ -72,6 +72,8 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
             case Limit(IntegerLiteral(limit), Project(projectList, Sort(order, true, child)))
                 if limit < conf.topKSortFallbackThreshold =>
               TakeOrderedAndProjectExec(limit, order, projectList, planLater(child)) :: Nil
    +        case Limit(IntegerLiteral(limit), child) if plan.isStreaming =>
    +          StreamingLimitExec(limit, planLater(child)) :: Nil
    --- End diff --
    
    I would create a different one only to continue the pattern of isolating streaming specific Strategies.  You'll then need to inject your new Strategy in `IncrementalExecution`.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21662: [SPARK-24662][SQL][SS] Support limit in structure...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21662#discussion_r199278862
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingLimitExec.scala ---
    @@ -0,0 +1,96 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.sql.execution.streaming
    +
    +import java.util.concurrent.TimeUnit.NANOSECONDS
    +
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.sql.catalyst.InternalRow
    +import org.apache.spark.sql.catalyst.expressions.Attribute
    +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
    +import org.apache.spark.sql.catalyst.expressions.UnsafeProjection
    +import org.apache.spark.sql.catalyst.expressions.UnsafeRow
    +import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, Distribution, Partitioning}
    +import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
    +import org.apache.spark.sql.execution.streaming.state.StateStoreOps
    +import org.apache.spark.sql.types.{LongType, NullType, StructField, StructType}
    +import org.apache.spark.util.CompletionIterator
    +
    +/**
    + * A physical operator for executing a streaming limit, which makes sure no more than streamLimit
    + * rows are returned.
    + */
    +case class StreamingLimitExec(
    +    streamLimit: Long,
    +    child: SparkPlan,
    +    stateInfo: Option[StatefulOperatorStateInfo] = None)
    +  extends UnaryExecNode with StateStoreWriter {
    +
    +  private val keySchema = StructType(Array(StructField("key", NullType)))
    +  private val valueSchema = StructType(Array(StructField("value", LongType)))
    +
    +  override protected def doExecute(): RDD[InternalRow] = {
    +    metrics // force lazy init at driver
    +
    +    child.execute().mapPartitionsWithStateStore(
    +      getStateInfo,
    +      keySchema,
    +      valueSchema,
    +      indexOrdinal = None,
    +      sqlContext.sessionState,
    +      Some(sqlContext.streams.stateStoreCoordinator)) { (store, iter) =>
    +      val key = UnsafeProjection.create(keySchema)(new GenericInternalRow(Array[Any](null)))
    +      val numOutputRows = longMetric("numOutputRows")
    +      val numUpdatedStateRows = longMetric("numUpdatedStateRows")
    +      val allUpdatesTimeMs = longMetric("allUpdatesTimeMs")
    +      val commitTimeMs = longMetric("commitTimeMs")
    +      val updatesStartTimeNs = System.nanoTime
    +
    +      val startCount: Long = Option(store.get(key)).map(_.getLong(0)).getOrElse(0L)
    +      var rowCount = startCount
    +
    +      val result = iter.filter { r =>
    +        val x = rowCount < streamLimit
    +        if (x) {
    +          rowCount += 1
    +        }
    +        x
    +      }
    +
    +      CompletionIterator[InternalRow, Iterator[InternalRow]](result, {
    --- End diff --
    
    Do you need these type parameters?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21662: [SPARK-24662][SQL][SS] Support limit in structure...

Posted by mukulmurthy <gi...@git.apache.org>.
Github user mukulmurthy commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21662#discussion_r199006704
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala ---
    @@ -72,6 +72,8 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
             case Limit(IntegerLiteral(limit), Project(projectList, Sort(order, true, child)))
                 if limit < conf.topKSortFallbackThreshold =>
               TakeOrderedAndProjectExec(limit, order, projectList, planLater(child)) :: Nil
    +        case Limit(IntegerLiteral(limit), child) if plan.isStreaming =>
    +          StreamingLimitExec(limit, planLater(child)) :: Nil
    --- End diff --
    
    I didn't need to create a separate Strategy for this because there's already this Strategy for limits, but I can create a different one if needed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21662: [SPARK-24662][SQL][SS] Support limit in structure...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21662#discussion_r199280992
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala ---
    @@ -615,7 +615,7 @@ class StreamSuite extends StreamTest {
         // Get an existing checkpoint generated by Spark v2.1.
         // v2.1 does not record # shuffle partitions in the offset metadata.
         val resourceUri =
    -      this.getClass.getResource("/structured-streaming/checkpoint-version-2.1.0").toURI
    +    this.getClass.getResource("/structured-streaming/checkpoint-version-2.1.0").toURI
    --- End diff --
    
    I'd undo these spurious changes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21662: [SPARK-24662][SQL][SS] Support limit in structured strea...

Posted by mukulmurthy <gi...@git.apache.org>.
Github user mukulmurthy commented on the issue:

    https://github.com/apache/spark/pull/21662
  
    retest this please


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21662: [SPARK-24662][SQL][SS] Support limit in structured strea...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21662
  
    **[Test build #92771 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/92771/testReport)** for PR 21662 at commit [`06ceaf9`](https://github.com/apache/spark/commit/06ceaf940598ad158d156b41c8357b83ebeec9d0).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21662: [SPARK-24662][SQL][SS] Support limit in structure...

Posted by mukulmurthy <gi...@git.apache.org>.
Github user mukulmurthy commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21662#discussion_r199300670
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkSuite.scala ---
    @@ -70,35 +68,9 @@ class MemorySinkSuite extends StreamTest with BeforeAndAfter {
         checkAnswer(sink.allData, 1 to 9)
       }
     
    -  test("directly add data in Append output mode with row limit") {
    --- End diff --
    
    I thought about it, but I didn't want to have the feature not working (even though no one is probably using it), and figured it would be easier this way since there's basically no file overlap between the two (except for a test class).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21662: [SPARK-24662][SQL][SS] Support limit in structure...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21662#discussion_r199279042
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingLimitExec.scala ---
    @@ -0,0 +1,96 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.sql.execution.streaming
    +
    +import java.util.concurrent.TimeUnit.NANOSECONDS
    +
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.sql.catalyst.InternalRow
    +import org.apache.spark.sql.catalyst.expressions.Attribute
    +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
    +import org.apache.spark.sql.catalyst.expressions.UnsafeProjection
    +import org.apache.spark.sql.catalyst.expressions.UnsafeRow
    +import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, Distribution, Partitioning}
    +import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
    +import org.apache.spark.sql.execution.streaming.state.StateStoreOps
    +import org.apache.spark.sql.types.{LongType, NullType, StructField, StructType}
    +import org.apache.spark.util.CompletionIterator
    +
    +/**
    + * A physical operator for executing a streaming limit, which makes sure no more than streamLimit
    + * rows are returned.
    + */
    +case class StreamingLimitExec(
    +    streamLimit: Long,
    +    child: SparkPlan,
    +    stateInfo: Option[StatefulOperatorStateInfo] = None)
    +  extends UnaryExecNode with StateStoreWriter {
    +
    +  private val keySchema = StructType(Array(StructField("key", NullType)))
    +  private val valueSchema = StructType(Array(StructField("value", LongType)))
    +
    +  override protected def doExecute(): RDD[InternalRow] = {
    +    metrics // force lazy init at driver
    --- End diff --
    
    Existing: Do we really do this in every operator?  Why isn't this the responsibility of the parent class?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21662: [SPARK-24662][SQL][SS] Support limit in structured strea...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21662
  
    **[Test build #92536 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/92536/testReport)** for PR 21662 at commit [`8671944`](https://github.com/apache/spark/commit/8671944b801907b2dced7027ea3da3fb04ed2e8f).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21662: [SPARK-24662][SQL][SS] Support limit in structured strea...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21662
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/92440/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21662: [SPARK-24662][SQL][SS] Support limit in structure...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21662#discussion_r200532939
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala ---
    @@ -354,6 +355,27 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
         }
       }
     
    +  /**
    +   * Used to plan the streaming global limit operator.
    +   * We need to check for either a direct Limit or a Limit wrapped in a ReturnAnswer operator,
    +   * following the example of the SpecialLimits Strategy above, because we want to use
    +   * the normal limit exec for streams that are not in Append mode.
    --- End diff --
    
    Can you also add comments on why we handle only append mode?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21662: [SPARK-24662][SQL][SS] Support limit in structure...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21662#discussion_r200532848
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala ---
    @@ -136,6 +137,11 @@ class IncrementalExecution(
                   j.left.output, j.right.output, j.leftKeys, j.rightKeys, j.condition.full,
                   Some(offsetSeqMetadata.batchWatermarkMs))
             )
    +
    +      case l: StreamingGlobalLimitExec =>
    +        l.copy(
    +        stateInfo = Some(nextStatefulOperationStateInfo),
    --- End diff --
    
    incorrect indent.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21662: [SPARK-24662][SQL][SS] Support limit in structured strea...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21662
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21662: [SPARK-24662][SQL][SS] Support limit in structure...

Posted by jose-torres <gi...@git.apache.org>.
Github user jose-torres commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21662#discussion_r200454145
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala ---
    @@ -354,6 +355,24 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
         }
       }
     
    +  /**
    +   * Used to plan the streaming global limit operator.
    --- End diff --
    
    As discussed offline, comment on why we need ReturnAnswer handling


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21662: [SPARK-24662][SQL][SS] Support limit in structured strea...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21662
  
    **[Test build #92654 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/92654/testReport)** for PR 21662 at commit [`d12fb1f`](https://github.com/apache/spark/commit/d12fb1fa67707440ab88ce793bd724e4fb27ad0b).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21662: [SPARK-24662][SQL][SS] Support limit in structure...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21662#discussion_r199286349
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingLimitExec.scala ---
    @@ -0,0 +1,96 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.sql.execution.streaming
    +
    +import java.util.concurrent.TimeUnit.NANOSECONDS
    +
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.sql.catalyst.InternalRow
    +import org.apache.spark.sql.catalyst.expressions.Attribute
    +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
    +import org.apache.spark.sql.catalyst.expressions.UnsafeProjection
    +import org.apache.spark.sql.catalyst.expressions.UnsafeRow
    +import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, Distribution, Partitioning}
    +import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
    +import org.apache.spark.sql.execution.streaming.state.StateStoreOps
    +import org.apache.spark.sql.types.{LongType, NullType, StructField, StructType}
    +import org.apache.spark.util.CompletionIterator
    +
    +/**
    + * A physical operator for executing a streaming limit, which makes sure no more than streamLimit
    + * rows are returned.
    + */
    +case class StreamingLimitExec(
    +    streamLimit: Long,
    +    child: SparkPlan,
    +    stateInfo: Option[StatefulOperatorStateInfo] = None)
    +  extends UnaryExecNode with StateStoreWriter {
    +
    +  private val keySchema = StructType(Array(StructField("key", NullType)))
    +  private val valueSchema = StructType(Array(StructField("value", LongType)))
    +
    +  override protected def doExecute(): RDD[InternalRow] = {
    +    metrics // force lazy init at driver
    +
    +    child.execute().mapPartitionsWithStateStore(
    +      getStateInfo,
    +      keySchema,
    +      valueSchema,
    +      indexOrdinal = None,
    +      sqlContext.sessionState,
    +      Some(sqlContext.streams.stateStoreCoordinator)) { (store, iter) =>
    +      val key = UnsafeProjection.create(keySchema)(new GenericInternalRow(Array[Any](null)))
    +      val numOutputRows = longMetric("numOutputRows")
    +      val numUpdatedStateRows = longMetric("numUpdatedStateRows")
    +      val allUpdatesTimeMs = longMetric("allUpdatesTimeMs")
    +      val commitTimeMs = longMetric("commitTimeMs")
    +      val updatesStartTimeNs = System.nanoTime
    +
    +      val startCount: Long = Option(store.get(key)).map(_.getLong(0)).getOrElse(0L)
    +      var rowCount = startCount
    +
    +      val result = iter.filter { r =>
    +        val x = rowCount < streamLimit
    --- End diff --
    
    I think its okay due to `override def requiredChildDistribution: Seq[Distribution] = AllTuples :: Nil`.
    
    +1 to making sure there are tests with more than one partition though.



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21662: [SPARK-24662][SQL][SS] Support limit in structure...

Posted by jose-torres <gi...@git.apache.org>.
Github user jose-torres commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21662#discussion_r199284336
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingLimitExec.scala ---
    @@ -0,0 +1,96 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.sql.execution.streaming
    +
    +import java.util.concurrent.TimeUnit.NANOSECONDS
    +
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.sql.catalyst.InternalRow
    +import org.apache.spark.sql.catalyst.expressions.Attribute
    +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
    +import org.apache.spark.sql.catalyst.expressions.UnsafeProjection
    +import org.apache.spark.sql.catalyst.expressions.UnsafeRow
    +import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, Distribution, Partitioning}
    +import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
    +import org.apache.spark.sql.execution.streaming.state.StateStoreOps
    +import org.apache.spark.sql.types.{LongType, NullType, StructField, StructType}
    +import org.apache.spark.util.CompletionIterator
    +
    +/**
    + * A physical operator for executing a streaming limit, which makes sure no more than streamLimit
    + * rows are returned.
    + */
    +case class StreamingLimitExec(
    +    streamLimit: Long,
    +    child: SparkPlan,
    +    stateInfo: Option[StatefulOperatorStateInfo] = None)
    +  extends UnaryExecNode with StateStoreWriter {
    +
    +  private val keySchema = StructType(Array(StructField("key", NullType)))
    +  private val valueSchema = StructType(Array(StructField("value", LongType)))
    +
    +  override protected def doExecute(): RDD[InternalRow] = {
    +    metrics // force lazy init at driver
    +
    +    child.execute().mapPartitionsWithStateStore(
    +      getStateInfo,
    +      keySchema,
    +      valueSchema,
    +      indexOrdinal = None,
    +      sqlContext.sessionState,
    +      Some(sqlContext.streams.stateStoreCoordinator)) { (store, iter) =>
    +      val key = UnsafeProjection.create(keySchema)(new GenericInternalRow(Array[Any](null)))
    +      val numOutputRows = longMetric("numOutputRows")
    +      val numUpdatedStateRows = longMetric("numUpdatedStateRows")
    +      val allUpdatesTimeMs = longMetric("allUpdatesTimeMs")
    +      val commitTimeMs = longMetric("commitTimeMs")
    +      val updatesStartTimeNs = System.nanoTime
    +
    +      val startCount: Long = Option(store.get(key)).map(_.getLong(0)).getOrElse(0L)
    +      var rowCount = startCount
    +
    +      val result = iter.filter { r =>
    +        val x = rowCount < streamLimit
    --- End diff --
    
    Isn't this going to result in `streamLimit` records in each partition? I would expect we'd need something like the Global/LocalLimit split.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21662: [SPARK-24662][SQL][SS] Support limit in structured strea...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21662
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21662: [SPARK-24662][SQL][SS] Support limit in structured strea...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21662
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21662: [SPARK-24662][SQL][SS] Support limit in structure...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21662#discussion_r199278041
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala ---
    @@ -315,8 +315,10 @@ object UnsupportedOperationChecker {
             case GroupingSets(_, _, child, _) if child.isStreaming =>
               throwError("GroupingSets is not supported on streaming DataFrames/Datasets")
     
    -        case GlobalLimit(_, _) | LocalLimit(_, _) if subPlan.children.forall(_.isStreaming) =>
    -          throwError("Limits are not supported on streaming DataFrames/Datasets")
    +        case GlobalLimit(_, _) | LocalLimit(_, _) if
    +          subPlan.children.forall(_.isStreaming) && outputMode == InternalOutputModes.Update =>
    --- End diff --
    
    It is today (though as we discussed I think the query planner would be a better place if we were to rearchitect).
    
    Style nit: line break at the high syntactic level (i.e. before the if) and indent 4 space for a continuation like this (to distinguish the guard from the code executed when matched.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21662: [SPARK-24662][SQL][SS] Support limit in structure...

Posted by jose-torres <gi...@git.apache.org>.
Github user jose-torres commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21662#discussion_r199284559
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala ---
    @@ -805,6 +806,75 @@ class StreamSuite extends StreamTest {
         }
       }
     
    +  test("streaming limit without state") {
    --- End diff --
    
    Related to my above comment, I think all of these tests end up only testing a single input partition.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21662: [SPARK-24662][SQL][SS] Support limit in structured strea...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21662
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/92771/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21662: [SPARK-24662][SQL][SS] Support limit in structured strea...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21662
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21662: [SPARK-24662][SQL][SS] Support limit in structure...

Posted by mukulmurthy <gi...@git.apache.org>.
Github user mukulmurthy commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21662#discussion_r199582352
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingLimitExec.scala ---
    @@ -0,0 +1,96 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.sql.execution.streaming
    +
    +import java.util.concurrent.TimeUnit.NANOSECONDS
    +
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.sql.catalyst.InternalRow
    +import org.apache.spark.sql.catalyst.expressions.Attribute
    +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
    +import org.apache.spark.sql.catalyst.expressions.UnsafeProjection
    +import org.apache.spark.sql.catalyst.expressions.UnsafeRow
    +import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, Distribution, Partitioning}
    +import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
    +import org.apache.spark.sql.execution.streaming.state.StateStoreOps
    +import org.apache.spark.sql.types.{LongType, NullType, StructField, StructType}
    +import org.apache.spark.util.CompletionIterator
    +
    +/**
    + * A physical operator for executing a streaming limit, which makes sure no more than streamLimit
    + * rows are returned.
    + */
    +case class StreamingLimitExec(
    +    streamLimit: Long,
    +    child: SparkPlan,
    +    stateInfo: Option[StatefulOperatorStateInfo] = None)
    +  extends UnaryExecNode with StateStoreWriter {
    +
    +  private val keySchema = StructType(Array(StructField("key", NullType)))
    +  private val valueSchema = StructType(Array(StructField("value", LongType)))
    +
    +  override protected def doExecute(): RDD[InternalRow] = {
    +    metrics // force lazy init at driver
    +
    +    child.execute().mapPartitionsWithStateStore(
    +      getStateInfo,
    +      keySchema,
    +      valueSchema,
    +      indexOrdinal = None,
    +      sqlContext.sessionState,
    +      Some(sqlContext.streams.stateStoreCoordinator)) { (store, iter) =>
    +      val key = UnsafeProjection.create(keySchema)(new GenericInternalRow(Array[Any](null)))
    +      val numOutputRows = longMetric("numOutputRows")
    +      val numUpdatedStateRows = longMetric("numUpdatedStateRows")
    +      val allUpdatesTimeMs = longMetric("allUpdatesTimeMs")
    +      val commitTimeMs = longMetric("commitTimeMs")
    +      val updatesStartTimeNs = System.nanoTime
    +
    +      val startCount: Long = Option(store.get(key)).map(_.getLong(0)).getOrElse(0L)
    +      var rowCount = startCount
    +
    +      val result = iter.filter { r =>
    +        val x = rowCount < streamLimit
    +        if (x) {
    +          rowCount += 1
    +        }
    +        x
    +      }
    +
    +      CompletionIterator[InternalRow, Iterator[InternalRow]](result, {
    --- End diff --
    
    yup :(


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21662: [SPARK-24662][SQL][SS] Support limit in structure...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/21662


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21662: [SPARK-24662][SQL][SS] Support limit in structure...

Posted by jose-torres <gi...@git.apache.org>.
Github user jose-torres commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21662#discussion_r200454033
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala ---
    @@ -805,6 +806,75 @@ class StreamSuite extends StreamTest {
         }
       }
     
    +  test("streaming limit without state") {
    --- End diff --
    
    resolved


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21662: [SPARK-24662][SQL][SS] Support limit in structure...

Posted by jose-torres <gi...@git.apache.org>.
Github user jose-torres commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21662#discussion_r199288285
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingLimitExec.scala ---
    @@ -0,0 +1,96 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.sql.execution.streaming
    +
    +import java.util.concurrent.TimeUnit.NANOSECONDS
    +
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.sql.catalyst.InternalRow
    +import org.apache.spark.sql.catalyst.expressions.Attribute
    +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
    +import org.apache.spark.sql.catalyst.expressions.UnsafeProjection
    +import org.apache.spark.sql.catalyst.expressions.UnsafeRow
    +import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, Distribution, Partitioning}
    +import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
    +import org.apache.spark.sql.execution.streaming.state.StateStoreOps
    +import org.apache.spark.sql.types.{LongType, NullType, StructField, StructType}
    +import org.apache.spark.util.CompletionIterator
    +
    +/**
    + * A physical operator for executing a streaming limit, which makes sure no more than streamLimit
    + * rows are returned.
    + */
    +case class StreamingLimitExec(
    +    streamLimit: Long,
    +    child: SparkPlan,
    +    stateInfo: Option[StatefulOperatorStateInfo] = None)
    +  extends UnaryExecNode with StateStoreWriter {
    +
    +  private val keySchema = StructType(Array(StructField("key", NullType)))
    +  private val valueSchema = StructType(Array(StructField("value", LongType)))
    +
    +  override protected def doExecute(): RDD[InternalRow] = {
    +    metrics // force lazy init at driver
    +
    +    child.execute().mapPartitionsWithStateStore(
    +      getStateInfo,
    +      keySchema,
    +      valueSchema,
    +      indexOrdinal = None,
    +      sqlContext.sessionState,
    +      Some(sqlContext.streams.stateStoreCoordinator)) { (store, iter) =>
    +      val key = UnsafeProjection.create(keySchema)(new GenericInternalRow(Array[Any](null)))
    +      val numOutputRows = longMetric("numOutputRows")
    +      val numUpdatedStateRows = longMetric("numUpdatedStateRows")
    +      val allUpdatesTimeMs = longMetric("allUpdatesTimeMs")
    +      val commitTimeMs = longMetric("commitTimeMs")
    +      val updatesStartTimeNs = System.nanoTime
    +
    +      val startCount: Long = Option(store.get(key)).map(_.getLong(0)).getOrElse(0L)
    +      var rowCount = startCount
    +
    +      val result = iter.filter { r =>
    +        val x = rowCount < streamLimit
    --- End diff --
    
    Oh, I missed that distribution. Makes sense then.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21662: [SPARK-24662][SQL][SS] Support limit in structured strea...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21662
  
    **[Test build #92536 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/92536/testReport)** for PR 21662 at commit [`8671944`](https://github.com/apache/spark/commit/8671944b801907b2dced7027ea3da3fb04ed2e8f).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21662: [SPARK-24662][SQL][SS] Support limit in structure...

Posted by mukulmurthy <gi...@git.apache.org>.
Github user mukulmurthy commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21662#discussion_r199006582
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala ---
    @@ -315,8 +315,10 @@ object UnsupportedOperationChecker {
             case GroupingSets(_, _, child, _) if child.isStreaming =>
               throwError("GroupingSets is not supported on streaming DataFrames/Datasets")
     
    -        case GlobalLimit(_, _) | LocalLimit(_, _) if subPlan.children.forall(_.isStreaming) =>
    -          throwError("Limits are not supported on streaming DataFrames/Datasets")
    +        case GlobalLimit(_, _) | LocalLimit(_, _) if
    +          subPlan.children.forall(_.isStreaming) && outputMode == InternalOutputModes.Update =>
    --- End diff --
    
    Is this the right way / place to do this check?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21662: [SPARK-24662][SQL][SS] Support limit in structured strea...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21662
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/92536/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21662: [SPARK-24662][SQL][SS] Support limit in structured strea...

Posted by mukulmurthy <gi...@git.apache.org>.
Github user mukulmurthy commented on the issue:

    https://github.com/apache/spark/pull/21662
  
    Jenkins, retest this please


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21662: [SPARK-24662][SQL][SS] Support limit in structured strea...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21662
  
    **[Test build #92778 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/92778/testReport)** for PR 21662 at commit [`06ceaf9`](https://github.com/apache/spark/commit/06ceaf940598ad158d156b41c8357b83ebeec9d0).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21662: [SPARK-24662][SQL][SS] Support limit in structured strea...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21662
  
    **[Test build #92771 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/92771/testReport)** for PR 21662 at commit [`06ceaf9`](https://github.com/apache/spark/commit/06ceaf940598ad158d156b41c8357b83ebeec9d0).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21662: [SPARK-24662][SQL][SS] Support limit in structured strea...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21662
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/92778/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21662: [SPARK-24662][SQL][SS] Support limit in structured strea...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21662
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21662: [SPARK-24662][SQL][SS] Support limit in structured strea...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21662
  
    **[Test build #92778 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/92778/testReport)** for PR 21662 at commit [`06ceaf9`](https://github.com/apache/spark/commit/06ceaf940598ad158d156b41c8357b83ebeec9d0).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org