You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by jose-torres <gi...@git.apache.org> on 2018/05/15 18:13:03 UTC

[GitHub] spark pull request #21337: [SPARK-24234][SS] Reader for continuous processin...

GitHub user jose-torres opened a pull request:

    https://github.com/apache/spark/pull/21337

    [SPARK-24234][SS] Reader for continuous processing shuffle

    ## What changes were proposed in this pull request?
    
    Read RDD for continuous processing shuffle, as well as the initial RPC-based row receiver.
    
    https://docs.google.com/document/d/1IL4kJoKrZWeyIhklKUJqsW-yEN7V7aL05MmM65AYOfE/edit#heading=h.8t3ci57f7uii
    
    ## How was this patch tested?
    
    new unit tests


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/jose-torres/spark readerRddMaster

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/21337.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #21337
    
----
commit 1d6b71898e2a640e3c0809695d2b83f3f84eaa38
Author: Jose Torres <to...@...>
Date:   2018-05-15T18:07:54Z

    continuous shuffle read RDD

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21337: [SPARK-24234][SS] Reader for continuous processing shuff...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21337
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/90735/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21337: [SPARK-24234][SS] Reader for continuous processin...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21337#discussion_r189409608
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/ContinuousShuffleReadRDD.scala ---
    @@ -0,0 +1,61 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming.continuous.shuffle
    +
    +import java.util.UUID
    +
    +import org.apache.spark.{Partition, SparkContext, SparkEnv, TaskContext}
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.sql.catalyst.expressions.UnsafeRow
    +import org.apache.spark.sql.internal.SQLConf
    +import org.apache.spark.util.NextIterator
    +
    +case class ContinuousShuffleReadPartition(index: Int, queueSize: Int) extends Partition {
    +  // Initialized only on the executor, and only once even as we call compute() multiple times.
    +  lazy val (reader: ContinuousShuffleReader, endpoint) = {
    +    val env = SparkEnv.get.rpcEnv
    +    val receiver = new UnsafeRowReceiver(queueSize, env)
    +    val endpoint = env.setupEndpoint(s"UnsafeRowReceiver-${UUID.randomUUID().toString}", receiver)
    --- End diff --
    
    nit: dont need`toString` when its interpolated in the string.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21337: [SPARK-24234][SS] Reader for continuous processin...

Posted by jose-torres <gi...@git.apache.org>.
Github user jose-torres commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21337#discussion_r188456856
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/ContinuousShuffleReadRDD.scala ---
    @@ -0,0 +1,64 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming.continuous.shuffle
    +
    +import java.util.UUID
    +
    +import org.apache.spark.{Partition, SparkContext, SparkEnv, TaskContext}
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.sql.catalyst.expressions.UnsafeRow
    +import org.apache.spark.util.NextIterator
    +
    +case class ContinuousShuffleReadPartition(index: Int) extends Partition {
    +  // Initialized only on the executor, and only once even as we call compute() multiple times.
    +  lazy val (receiver, endpoint) = {
    +    val env = SparkEnv.get.rpcEnv
    +    val receiver = new UnsafeRowReceiver(env)
    +    val endpoint = env.setupEndpoint(UUID.randomUUID().toString, receiver)
    +    TaskContext.get().addTaskCompletionListener { ctx =>
    +      env.stop(endpoint)
    +    }
    +    (receiver, endpoint)
    +  }
    +}
    +
    +/**
    + * RDD at the bottom of each continuous processing shuffle task, reading from the
    + */
    +class ContinuousShuffleReadRDD(sc: SparkContext, numPartitions: Int)
    +    extends RDD[UnsafeRow](sc, Nil) {
    +
    +  override protected def getPartitions: Array[Partition] = {
    +    (0 until numPartitions).map(ContinuousShuffleReadPartition).toArray
    +  }
    +
    +  override def compute(split: Partition, context: TaskContext): Iterator[UnsafeRow] = {
    +    val receiver = split.asInstanceOf[ContinuousShuffleReadPartition].receiver
    +
    +    new NextIterator[UnsafeRow] {
    +      override def getNext(): UnsafeRow = receiver.poll() match {
    +        case ReceiverRow(r) => r
    +        case ReceiverEpochMarker() =>
    --- End diff --
    
    It should, but I think that's significant enough to justify its own PR. Added an explicit TODO to be safe.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21337: [SPARK-24234][SS] Reader for continuous processing shuff...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21337
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/90706/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21337: [SPARK-24234][SS] Reader for continuous processing shuff...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21337
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21337: [SPARK-24234][SS] Reader for continuous processing shuff...

Posted by jose-torres <gi...@git.apache.org>.
Github user jose-torres commented on the issue:

    https://github.com/apache/spark/pull/21337
  
    @HeartSaVioR @arunmahadevan @xuanyuanking 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21337: [SPARK-24234][SS] Reader for continuous processin...

Posted by jose-torres <gi...@git.apache.org>.
Github user jose-torres commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21337#discussion_r188456692
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/ContinuousShuffleReadRDD.scala ---
    @@ -0,0 +1,64 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming.continuous.shuffle
    +
    +import java.util.UUID
    +
    +import org.apache.spark.{Partition, SparkContext, SparkEnv, TaskContext}
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.sql.catalyst.expressions.UnsafeRow
    +import org.apache.spark.util.NextIterator
    +
    +case class ContinuousShuffleReadPartition(index: Int) extends Partition {
    +  // Initialized only on the executor, and only once even as we call compute() multiple times.
    +  lazy val (receiver, endpoint) = {
    +    val env = SparkEnv.get.rpcEnv
    +    val receiver = new UnsafeRowReceiver(env)
    +    val endpoint = env.setupEndpoint(UUID.randomUUID().toString, receiver)
    +    TaskContext.get().addTaskCompletionListener { ctx =>
    +      env.stop(endpoint)
    +    }
    +    (receiver, endpoint)
    +  }
    +}
    +
    +/**
    + * RDD at the bottom of each continuous processing shuffle task, reading from the
    --- End diff --
    
    Well, ContinuousShuffleReadRDD is a bit self-documenting as a reader. Added that it's receiving shuffle data from upstream tasks.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21337: [SPARK-24234][SS] Reader for continuous processing shuff...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21337
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21337: [SPARK-24234][SS] Reader for continuous processing shuff...

Posted by jose-torres <gi...@git.apache.org>.
Github user jose-torres commented on the issue:

    https://github.com/apache/spark/pull/21337
  
    @tdas 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21337: [SPARK-24234][SS] Reader for continuous processin...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21337#discussion_r189125545
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/ContinuousShuffleReadRDD.scala ---
    @@ -0,0 +1,67 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming.continuous.shuffle
    +
    +import java.util.UUID
    +
    +import org.apache.spark.{Partition, SparkContext, SparkEnv, TaskContext}
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.sql.catalyst.expressions.UnsafeRow
    +import org.apache.spark.sql.internal.SQLConf
    +import org.apache.spark.util.NextIterator
    +
    +case class ContinuousShuffleReadPartition(index: Int) extends Partition {
    +  // Initialized only on the executor, and only once even as we call compute() multiple times.
    +  lazy val (receiver, endpoint) = {
    +    val env = SparkEnv.get.rpcEnv
    +    val receiver = new UnsafeRowReceiver(SQLConf.get.continuousStreamingExecutorQueueSize, env)
    --- End diff --
    
    It might be more easier to understand the information flow if the queue size value is directly passed through the RDD as opposed to magically getting it through `SQLConf.get` (hard to debug issues like why is my conf from my session not being used).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21337: [SPARK-24234][SS] Reader for continuous processing shuff...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21337
  
    **[Test build #90817 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90817/testReport)** for PR 21337 at commit [`00f910e`](https://github.com/apache/spark/commit/00f910ea39b76a24e1e21acdf3d6a20fd7784fa9).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21337: [SPARK-24234][SS] Reader for continuous processin...

Posted by jose-torres <gi...@git.apache.org>.
Github user jose-torres commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21337#discussion_r188831887
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/shuffle/ContinuousShuffleReadSuite.scala ---
    @@ -0,0 +1,122 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming.continuous.shuffle
    +
    +import org.apache.spark.{TaskContext, TaskContextImpl}
    +import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeProjection}
    +import org.apache.spark.sql.streaming.StreamTest
    +import org.apache.spark.sql.types.{DataType, IntegerType}
    +
    +class ContinuousShuffleReadSuite extends StreamTest {
    +
    +  private def unsafeRow(value: Int) = {
    +    UnsafeProjection.create(Array(IntegerType : DataType))(
    +      new GenericInternalRow(Array(value: Any)))
    +  }
    +
    +  var ctx: TaskContextImpl = _
    +
    +  override def beforeEach(): Unit = {
    +    super.beforeEach()
    +    ctx = TaskContext.empty()
    +    TaskContext.setTaskContext(ctx)
    +  }
    +
    +  override def afterEach(): Unit = {
    +    ctx.markTaskCompleted(None)
    +    ctx = null
    +    super.afterEach()
    +  }
    +
    +  test("receiver stopped with row last") {
    +    val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1)
    +    val endpoint = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint
    +    endpoint.askSync[Unit](ReceiverEpochMarker())
    +    endpoint.askSync[Unit](ReceiverRow(unsafeRow(111)))
    +
    +    ctx.markTaskCompleted(None)
    +    val receiver = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].receiver
    +    eventually(timeout(streamingTimeout)) {
    +      assert(receiver.stopped.get())
    +    }
    +  }
    +
    +  test("receiver stopped with marker last") {
    +    val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1)
    +    val endpoint = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint
    +    endpoint.askSync[Unit](ReceiverRow(unsafeRow(111)))
    +    endpoint.askSync[Unit](ReceiverEpochMarker())
    +
    +    ctx.markTaskCompleted(None)
    +    val receiver = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].receiver
    +    eventually(timeout(streamingTimeout)) {
    +      assert(receiver.stopped.get())
    +    }
    +  }
    +
    +  test("one epoch") {
    +    val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1)
    +    val endpoint = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint
    +    endpoint.askSync[Unit](ReceiverRow(unsafeRow(111)))
    +    endpoint.askSync[Unit](ReceiverRow(unsafeRow(222)))
    +    endpoint.askSync[Unit](ReceiverRow(unsafeRow(333)))
    +    endpoint.askSync[Unit](ReceiverEpochMarker())
    +
    +    val iter = rdd.compute(rdd.partitions(0), ctx)
    +    assert(iter.next().getInt(0) == 111)
    +    assert(iter.next().getInt(0) == 222)
    +    assert(iter.next().getInt(0) == 333)
    +    assert(!iter.hasNext)
    +  }
    +
    +  test("multiple epochs") {
    +    val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1)
    +    val endpoint = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint
    +    endpoint.askSync[Unit](ReceiverRow(unsafeRow(111)))
    +    endpoint.askSync[Unit](ReceiverEpochMarker())
    +    endpoint.askSync[Unit](ReceiverRow(unsafeRow(222)))
    +    endpoint.askSync[Unit](ReceiverRow(unsafeRow(333)))
    +    endpoint.askSync[Unit](ReceiverEpochMarker())
    +
    +    val firstEpoch = rdd.compute(rdd.partitions(0), ctx)
    +    assert(firstEpoch.next().getInt(0) == 111)
    +    assert(!firstEpoch.hasNext)
    +
    +    val secondEpoch = rdd.compute(rdd.partitions(0), ctx)
    +    assert(secondEpoch.next().getInt(0) == 222)
    +    assert(secondEpoch.next().getInt(0) == 333)
    +    assert(!secondEpoch.hasNext)
    +  }
    +
    +  test("empty epochs") {
    +    val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1)
    +    val endpoint = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint
    +    endpoint.askSync[Unit](ReceiverEpochMarker())
    +    endpoint.askSync[Unit](ReceiverEpochMarker())
    +    endpoint.askSync[Unit](ReceiverRow(unsafeRow(111)))
    +    endpoint.askSync[Unit](ReceiverEpochMarker())
    +    endpoint.askSync[Unit](ReceiverEpochMarker())
    +
    +    assert(rdd.compute(rdd.partitions(0), ctx).isEmpty)
    +    assert(rdd.compute(rdd.partitions(0), ctx).isEmpty)
    +    val thirdEpoch = rdd.compute(rdd.partitions(0), ctx)
    +    assert(thirdEpoch.next().getInt(0) == 111)
    +    assert(rdd.compute(rdd.partitions(0), ctx).isEmpty)
    +    assert(rdd.compute(rdd.partitions(0), ctx).isEmpty)
    +  }
    --- End diff --
    
    Added the simple one. I agree we don't need to reiterate all of them; RDD partitions being independent is pretty well enforced by the framework.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21337: [SPARK-24234][SS] Reader for continuous processing shuff...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21337
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21337: [SPARK-24234][SS] Reader for continuous processing shuff...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21337
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21337: [SPARK-24234][SS] Reader for continuous processing shuff...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21337
  
    **[Test build #90735 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90735/testReport)** for PR 21337 at commit [`955ac79`](https://github.com/apache/spark/commit/955ac79eb05dc389e632d1aaa6c59396835c6ed5).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21337: [SPARK-24234][SS] Reader for continuous processing shuff...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21337
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/90814/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21337: [SPARK-24234][SS] Reader for continuous processin...

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21337#discussion_r188397958
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/UnsafeRowReceiver.scala ---
    @@ -0,0 +1,56 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming.continuous.shuffle
    +
    +import java.util.concurrent.{ArrayBlockingQueue, BlockingQueue}
    +import java.util.concurrent.atomic.AtomicBoolean
    +
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint}
    +import org.apache.spark.sql.catalyst.expressions.UnsafeRow
    +
    +/**
    + * Messages for the UnsafeRowReceiver endpoint. Either an incoming row or an epoch marker.
    + */
    +private[shuffle] sealed trait UnsafeRowReceiverMessage extends Serializable
    +private[shuffle] case class ReceiverRow(row: UnsafeRow) extends UnsafeRowReceiverMessage
    +private[shuffle] case class ReceiverEpochMarker() extends UnsafeRowReceiverMessage
    +
    +/**
    + * RPC endpoint for receiving rows into a continuous processing shuffle task.
    + */
    +private[shuffle] class UnsafeRowReceiver(val rpcEnv: RpcEnv)
    +    extends ThreadSafeRpcEndpoint with Logging {
    +  private val queue = new ArrayBlockingQueue[UnsafeRowReceiverMessage](1024)
    --- End diff --
    
    1. may be good to state the assumption that the queue will be continuously drained irrespective of the markers etc.
    2. can the queue size be made configurable?
    



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21337: [SPARK-24234][SS] Reader for continuous processin...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21337#discussion_r189409482
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/shuffle/ContinuousShuffleReadSuite.scala ---
    @@ -0,0 +1,159 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming.continuous.shuffle
    +
    +import org.apache.spark.{TaskContext, TaskContextImpl}
    +import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeProjection}
    +import org.apache.spark.sql.streaming.StreamTest
    +import org.apache.spark.sql.types.{DataType, IntegerType}
    +
    +class ContinuousShuffleReadSuite extends StreamTest {
    +
    +  private def unsafeRow(value: Int) = {
    +    UnsafeProjection.create(Array(IntegerType : DataType))(
    +      new GenericInternalRow(Array(value: Any)))
    +  }
    +
    +  // In this unit test, we emulate that we're in the task thread where
    +  // ContinuousShuffleReadRDD.compute() will be evaluated. This requires a task context
    +  // thread local to be set.
    +  var ctx: TaskContextImpl = _
    +
    +  override def beforeEach(): Unit = {
    +    super.beforeEach()
    +    ctx = TaskContext.empty()
    +    TaskContext.setTaskContext(ctx)
    +  }
    +
    +  override def afterEach(): Unit = {
    +    ctx.markTaskCompleted(None)
    +    TaskContext.unset()
    +    ctx = null
    +    super.afterEach()
    +  }
    +
    +  test("receiver stopped with row last") {
    +    val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1)
    +    val endpoint = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint
    +    endpoint.askSync[Unit](ReceiverEpochMarker())
    +    endpoint.askSync[Unit](ReceiverRow(unsafeRow(111)))
    +
    +    ctx.markTaskCompleted(None)
    +    val receiver = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].reader
    +    eventually(timeout(streamingTimeout)) {
    +      assert(receiver.asInstanceOf[UnsafeRowReceiver].stopped.get())
    +    }
    +  }
    +
    +  test("receiver stopped with marker last") {
    +    val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1)
    +    val endpoint = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint
    +    endpoint.askSync[Unit](ReceiverRow(unsafeRow(111)))
    +    endpoint.askSync[Unit](ReceiverEpochMarker())
    +
    +    ctx.markTaskCompleted(None)
    +    val receiver = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].reader
    +    eventually(timeout(streamingTimeout)) {
    +      assert(receiver.asInstanceOf[UnsafeRowReceiver].stopped.get())
    +    }
    +  }
    +
    +  test("one epoch") {
    +    val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1)
    +    val endpoint = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint
    +    endpoint.askSync[Unit](ReceiverRow(unsafeRow(111)))
    +    endpoint.askSync[Unit](ReceiverRow(unsafeRow(222)))
    +    endpoint.askSync[Unit](ReceiverRow(unsafeRow(333)))
    +    endpoint.askSync[Unit](ReceiverEpochMarker())
    +
    +    val iter = rdd.compute(rdd.partitions(0), ctx)
    +    assert(iter.toSeq.map(_.getInt(0)) == Seq(111, 222, 333))
    +  }
    +
    +  test("multiple epochs") {
    +    val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1)
    +    val endpoint = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint
    +    endpoint.askSync[Unit](ReceiverRow(unsafeRow(111)))
    +    endpoint.askSync[Unit](ReceiverEpochMarker())
    +    endpoint.askSync[Unit](ReceiverRow(unsafeRow(222)))
    +    endpoint.askSync[Unit](ReceiverRow(unsafeRow(333)))
    +    endpoint.askSync[Unit](ReceiverEpochMarker())
    +
    +    val firstEpoch = rdd.compute(rdd.partitions(0), ctx)
    +    assert(firstEpoch.toSeq.map(_.getInt(0)) == Seq(111))
    +
    +    val secondEpoch = rdd.compute(rdd.partitions(0), ctx)
    +    assert(secondEpoch.toSeq.map(_.getInt(0)) == Seq(222, 333))
    +  }
    +
    +  test("empty epochs") {
    +    val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1)
    +    val endpoint = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint
    +    endpoint.askSync[Unit](ReceiverEpochMarker())
    +    endpoint.askSync[Unit](ReceiverEpochMarker())
    +    endpoint.askSync[Unit](ReceiverRow(unsafeRow(111)))
    +    endpoint.askSync[Unit](ReceiverEpochMarker())
    +    endpoint.askSync[Unit](ReceiverEpochMarker())
    +    endpoint.askSync[Unit](ReceiverEpochMarker())
    +
    +    assert(rdd.compute(rdd.partitions(0), ctx).isEmpty)
    +    assert(rdd.compute(rdd.partitions(0), ctx).isEmpty)
    +
    +    val thirdEpoch = rdd.compute(rdd.partitions(0), ctx)
    +    assert(thirdEpoch.toSeq.map(_.getInt(0)) == Seq(111))
    +
    +    assert(rdd.compute(rdd.partitions(0), ctx).isEmpty)
    +    assert(rdd.compute(rdd.partitions(0), ctx).isEmpty)
    +  }
    +
    +  test("multiple partitions") {
    +    val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 5)
    +    // Send all data before processing to ensure there's no crossover.
    +    for (p <- rdd.partitions) {
    +      val part = p.asInstanceOf[ContinuousShuffleReadPartition]
    +      // Send index for identification.
    +      part.endpoint.askSync[Unit](ReceiverRow(unsafeRow(part.index)))
    +      part.endpoint.askSync[Unit](ReceiverEpochMarker())
    +    }
    +
    +    for (p <- rdd.partitions) {
    +      val part = p.asInstanceOf[ContinuousShuffleReadPartition]
    +      val iter = rdd.compute(part, ctx)
    +      assert(iter.next().getInt(0) == part.index)
    +      assert(!iter.hasNext)
    +    }
    +  }
    +
    +  test("blocks waiting for new rows") {
    +    val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1)
    +
    +    val readRow = new Thread {
    +      override def run(): Unit = {
    +        // set the non-inheritable thread local
    +        TaskContext.setTaskContext(ctx)
    +        val epoch = rdd.compute(rdd.partitions(0), ctx)
    +        epoch.next().getInt(0)
    +      }
    +    }
    +
    +    readRow.start()
    +    eventually(timeout(streamingTimeout)) {
    +      assert(readRow.getState == Thread.State.WAITING)
    +    }
    +  }
    --- End diff --
    
    with a finally.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21337: [SPARK-24234][SS] Reader for continuous processing shuff...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21337
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21337: [SPARK-24234][SS] Reader for continuous processing shuff...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21337
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21337: [SPARK-24234][SS] Reader for continuous processing shuff...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21337
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/90656/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21337: [SPARK-24234][SS] Reader for continuous processin...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/21337


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21337: [SPARK-24234][SS] Reader for continuous processin...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21337#discussion_r188632188
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/ContinuousShuffleReadRDD.scala ---
    @@ -0,0 +1,64 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming.continuous.shuffle
    +
    +import java.util.UUID
    +
    +import org.apache.spark.{Partition, SparkContext, SparkEnv, TaskContext}
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.sql.catalyst.expressions.UnsafeRow
    +import org.apache.spark.util.NextIterator
    +
    +case class ContinuousShuffleReadPartition(index: Int) extends Partition {
    +  // Initialized only on the executor, and only once even as we call compute() multiple times.
    +  lazy val (receiver, endpoint) = {
    +    val env = SparkEnv.get.rpcEnv
    +    val receiver = new UnsafeRowReceiver(env)
    +    val endpoint = env.setupEndpoint(UUID.randomUUID().toString, receiver)
    +    TaskContext.get().addTaskCompletionListener { ctx =>
    +      env.stop(endpoint)
    +    }
    +    (receiver, endpoint)
    +  }
    +}
    +
    +/**
    + * RDD at the bottom of each continuous processing shuffle task, reading from the
    + */
    +class ContinuousShuffleReadRDD(sc: SparkContext, numPartitions: Int)
    +    extends RDD[UnsafeRow](sc, Nil) {
    +
    +  override protected def getPartitions: Array[Partition] = {
    +    (0 until numPartitions).map(ContinuousShuffleReadPartition).toArray
    +  }
    +
    +  override def compute(split: Partition, context: TaskContext): Iterator[UnsafeRow] = {
    +    val receiver = split.asInstanceOf[ContinuousShuffleReadPartition].receiver
    +
    +    new NextIterator[UnsafeRow] {
    +      override def getNext(): UnsafeRow = receiver.poll() match {
    +        case ReceiverRow(r) => r
    +        case ReceiverEpochMarker() =>
    --- End diff --
    
    Does this ensure at-least-once? Then we could start from this, and improve it from another PR as @jose-torres stated.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21337: [SPARK-24234][SS] Reader for continuous processing shuff...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21337
  
    **[Test build #90706 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90706/testReport)** for PR 21337 at commit [`2ea8a6f`](https://github.com/apache/spark/commit/2ea8a6f94216e8b184e5780ec3e6ffb2838de382).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21337: [SPARK-24234][SS] Reader for continuous processing shuff...

Posted by jose-torres <gi...@git.apache.org>.
Github user jose-torres commented on the issue:

    https://github.com/apache/spark/pull/21337
  
    Actually, now that I think about it, there's a lot of value in having that interface, both to make sure we don't accidentally leak across it and possibly allow debugging mode in the future. I'll add it.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21337: [SPARK-24234][SS] Reader for continuous processing shuff...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21337
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21337: [SPARK-24234][SS] Reader for continuous processing shuff...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21337
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21337: [SPARK-24234][SS] Reader for continuous processin...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21337#discussion_r189129070
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/ContinuousShuffleReadRDD.scala ---
    @@ -0,0 +1,67 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming.continuous.shuffle
    +
    +import java.util.UUID
    +
    +import org.apache.spark.{Partition, SparkContext, SparkEnv, TaskContext}
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.sql.catalyst.expressions.UnsafeRow
    +import org.apache.spark.sql.internal.SQLConf
    +import org.apache.spark.util.NextIterator
    +
    +case class ContinuousShuffleReadPartition(index: Int) extends Partition {
    +  // Initialized only on the executor, and only once even as we call compute() multiple times.
    +  lazy val (receiver, endpoint) = {
    +    val env = SparkEnv.get.rpcEnv
    +    val receiver = new UnsafeRowReceiver(SQLConf.get.continuousStreamingExecutorQueueSize, env)
    +    val endpoint = env.setupEndpoint(UUID.randomUUID().toString, receiver)
    --- End diff --
    
    would be better for debugging if the endpoint name had a prefix like "UnsafeRowReceiver" that distinguishes it from other endpoints.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21337: [SPARK-24234][SS] Reader for continuous processing shuff...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the issue:

    https://github.com/apache/spark/pull/21337
  
    other than a few nits, LGTM.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21337: [SPARK-24234][SS] Reader for continuous processin...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21337#discussion_r189410027
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/shuffle/ContinuousShuffleReadSuite.scala ---
    @@ -0,0 +1,159 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming.continuous.shuffle
    +
    +import org.apache.spark.{TaskContext, TaskContextImpl}
    +import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeProjection}
    +import org.apache.spark.sql.streaming.StreamTest
    +import org.apache.spark.sql.types.{DataType, IntegerType}
    +
    +class ContinuousShuffleReadSuite extends StreamTest {
    +
    +  private def unsafeRow(value: Int) = {
    +    UnsafeProjection.create(Array(IntegerType : DataType))(
    +      new GenericInternalRow(Array(value: Any)))
    +  }
    +
    +  // In this unit test, we emulate that we're in the task thread where
    +  // ContinuousShuffleReadRDD.compute() will be evaluated. This requires a task context
    +  // thread local to be set.
    +  var ctx: TaskContextImpl = _
    +
    +  override def beforeEach(): Unit = {
    +    super.beforeEach()
    +    ctx = TaskContext.empty()
    +    TaskContext.setTaskContext(ctx)
    +  }
    +
    +  override def afterEach(): Unit = {
    +    ctx.markTaskCompleted(None)
    +    TaskContext.unset()
    +    ctx = null
    +    super.afterEach()
    +  }
    +
    +  test("receiver stopped with row last") {
    +    val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1)
    +    val endpoint = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint
    +    endpoint.askSync[Unit](ReceiverEpochMarker())
    +    endpoint.askSync[Unit](ReceiverRow(unsafeRow(111)))
    +
    +    ctx.markTaskCompleted(None)
    +    val receiver = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].reader
    +    eventually(timeout(streamingTimeout)) {
    +      assert(receiver.asInstanceOf[UnsafeRowReceiver].stopped.get())
    +    }
    +  }
    +
    +  test("receiver stopped with marker last") {
    +    val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1)
    +    val endpoint = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint
    +    endpoint.askSync[Unit](ReceiverRow(unsafeRow(111)))
    +    endpoint.askSync[Unit](ReceiverEpochMarker())
    +
    +    ctx.markTaskCompleted(None)
    +    val receiver = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].reader
    +    eventually(timeout(streamingTimeout)) {
    +      assert(receiver.asInstanceOf[UnsafeRowReceiver].stopped.get())
    +    }
    +  }
    +
    +  test("one epoch") {
    +    val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1)
    +    val endpoint = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint
    +    endpoint.askSync[Unit](ReceiverRow(unsafeRow(111)))
    +    endpoint.askSync[Unit](ReceiverRow(unsafeRow(222)))
    +    endpoint.askSync[Unit](ReceiverRow(unsafeRow(333)))
    +    endpoint.askSync[Unit](ReceiverEpochMarker())
    +
    +    val iter = rdd.compute(rdd.partitions(0), ctx)
    +    assert(iter.toSeq.map(_.getInt(0)) == Seq(111, 222, 333))
    +  }
    +
    +  test("multiple epochs") {
    +    val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1)
    +    val endpoint = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint
    +    endpoint.askSync[Unit](ReceiverRow(unsafeRow(111)))
    +    endpoint.askSync[Unit](ReceiverEpochMarker())
    +    endpoint.askSync[Unit](ReceiverRow(unsafeRow(222)))
    +    endpoint.askSync[Unit](ReceiverRow(unsafeRow(333)))
    +    endpoint.askSync[Unit](ReceiverEpochMarker())
    --- End diff --
    
    simply
    `def send(end: ThreadsafeRPCEndpoint, messages: UnsafeRowReceiverMessage*) { ... }`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21337: [SPARK-24234][SS] Reader for continuous processing shuff...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21337
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/90761/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21337: [SPARK-24234][SS] Reader for continuous processing shuff...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21337
  
    **[Test build #90656 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90656/testReport)** for PR 21337 at commit [`1d6b718`](https://github.com/apache/spark/commit/1d6b71898e2a640e3c0809695d2b83f3f84eaa38).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `case class ContinuousShuffleReadPartition(index: Int) extends Partition `
      * `class ContinuousShuffleReadRDD(sc: SparkContext, numPartitions: Int)`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21337: [SPARK-24234][SS] Reader for continuous processin...

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21337#discussion_r188392937
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/ContinuousShuffleReadRDD.scala ---
    @@ -0,0 +1,64 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming.continuous.shuffle
    +
    +import java.util.UUID
    +
    +import org.apache.spark.{Partition, SparkContext, SparkEnv, TaskContext}
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.sql.catalyst.expressions.UnsafeRow
    +import org.apache.spark.util.NextIterator
    +
    +case class ContinuousShuffleReadPartition(index: Int) extends Partition {
    +  // Initialized only on the executor, and only once even as we call compute() multiple times.
    +  lazy val (receiver, endpoint) = {
    +    val env = SparkEnv.get.rpcEnv
    +    val receiver = new UnsafeRowReceiver(env)
    +    val endpoint = env.setupEndpoint(UUID.randomUUID().toString, receiver)
    +    TaskContext.get().addTaskCompletionListener { ctx =>
    +      env.stop(endpoint)
    +    }
    +    (receiver, endpoint)
    +  }
    +}
    +
    +/**
    + * RDD at the bottom of each continuous processing shuffle task, reading from the
    --- End diff --
    
    incomplete comment?
    
    Bottom is a bit ambiguous. Can we explicitly state whats at the bottom (reader or writer) and if this receiving the shuffle data from downstream ?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21337: [SPARK-24234][SS] Reader for continuous processing shuff...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21337
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/90817/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21337: [SPARK-24234][SS] Reader for continuous processin...

Posted by jose-torres <gi...@git.apache.org>.
Github user jose-torres commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21337#discussion_r189133964
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/UnsafeRowReceiver.scala ---
    @@ -0,0 +1,64 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming.continuous.shuffle
    +
    +import java.util.concurrent.{ArrayBlockingQueue, BlockingQueue}
    +import java.util.concurrent.atomic.AtomicBoolean
    +
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint}
    +import org.apache.spark.sql.catalyst.expressions.UnsafeRow
    +
    +/**
    + * Messages for the UnsafeRowReceiver endpoint. Either an incoming row or an epoch marker.
    + */
    +private[shuffle] sealed trait UnsafeRowReceiverMessage extends Serializable
    +private[shuffle] case class ReceiverRow(row: UnsafeRow) extends UnsafeRowReceiverMessage
    +private[shuffle] case class ReceiverEpochMarker() extends UnsafeRowReceiverMessage
    +
    +/**
    + * RPC endpoint for receiving rows into a continuous processing shuffle task. Continuous shuffle
    + * writers will send rows here, with continuous shuffle readers polling for new rows as needed.
    + *
    + * TODO: Support multiple source tasks. We need to output a single epoch marker once all
    + * source tasks have sent one.
    + */
    +private[shuffle] class UnsafeRowReceiver(
    +      queueSize: Int,
    +      override val rpcEnv: RpcEnv)
    +    extends ThreadSafeRpcEndpoint with Logging {
    +  // Note that this queue will be drained from the main task thread and populated in the RPC
    +  // response thread.
    +  private val queue = new ArrayBlockingQueue[UnsafeRowReceiverMessage](queueSize)
    +  var stopped = new AtomicBoolean(false)
    --- End diff --
    
    Restricted visibility and made it a proper val.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21337: [SPARK-24234][SS] Reader for continuous processin...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21337#discussion_r189130600
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/shuffle/ContinuousShuffleReadSuite.scala ---
    @@ -0,0 +1,137 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming.continuous.shuffle
    +
    +import org.apache.spark.{TaskContext, TaskContextImpl}
    +import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeProjection}
    +import org.apache.spark.sql.streaming.StreamTest
    +import org.apache.spark.sql.types.{DataType, IntegerType}
    +
    +class ContinuousShuffleReadSuite extends StreamTest {
    +
    +  private def unsafeRow(value: Int) = {
    +    UnsafeProjection.create(Array(IntegerType : DataType))(
    +      new GenericInternalRow(Array(value: Any)))
    +  }
    +
    +  var ctx: TaskContextImpl = _
    +
    +  override def beforeEach(): Unit = {
    +    super.beforeEach()
    +    ctx = TaskContext.empty()
    +    TaskContext.setTaskContext(ctx)
    +  }
    +
    +  override def afterEach(): Unit = {
    +    ctx.markTaskCompleted(None)
    +    TaskContext.unset()
    +    ctx = null
    +    super.afterEach()
    +  }
    +
    +  test("receiver stopped with row last") {
    +    val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1)
    +    val endpoint = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint
    +    endpoint.askSync[Unit](ReceiverEpochMarker())
    +    endpoint.askSync[Unit](ReceiverRow(unsafeRow(111)))
    +
    +    ctx.markTaskCompleted(None)
    +    val receiver = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].receiver
    +    eventually(timeout(streamingTimeout)) {
    +      assert(receiver.stopped.get())
    +    }
    +  }
    +
    +  test("receiver stopped with marker last") {
    +    val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1)
    +    val endpoint = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint
    +    endpoint.askSync[Unit](ReceiverRow(unsafeRow(111)))
    +    endpoint.askSync[Unit](ReceiverEpochMarker())
    +
    +    ctx.markTaskCompleted(None)
    +    val receiver = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].receiver
    +    eventually(timeout(streamingTimeout)) {
    +      assert(receiver.stopped.get())
    +    }
    +  }
    +
    +  test("one epoch") {
    +    val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1)
    +    val endpoint = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint
    +    endpoint.askSync[Unit](ReceiverRow(unsafeRow(111)))
    +    endpoint.askSync[Unit](ReceiverRow(unsafeRow(222)))
    +    endpoint.askSync[Unit](ReceiverRow(unsafeRow(333)))
    +    endpoint.askSync[Unit](ReceiverEpochMarker())
    +
    +    val iter = rdd.compute(rdd.partitions(0), ctx)
    +    assert(iter.next().getInt(0) == 111)
    +    assert(iter.next().getInt(0) == 222)
    +    assert(iter.next().getInt(0) == 333)
    +    assert(!iter.hasNext)
    +  }
    +
    +  test("multiple epochs") {
    +    val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1)
    +    val endpoint = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint
    +    endpoint.askSync[Unit](ReceiverRow(unsafeRow(111)))
    +    endpoint.askSync[Unit](ReceiverEpochMarker())
    +    endpoint.askSync[Unit](ReceiverRow(unsafeRow(222)))
    +    endpoint.askSync[Unit](ReceiverRow(unsafeRow(333)))
    +    endpoint.askSync[Unit](ReceiverEpochMarker())
    +
    +    val firstEpoch = rdd.compute(rdd.partitions(0), ctx)
    +    assert(firstEpoch.next().getInt(0) == 111)
    +    assert(!firstEpoch.hasNext)
    +
    +    val secondEpoch = rdd.compute(rdd.partitions(0), ctx)
    +    assert(secondEpoch.next().getInt(0) == 222)
    +    assert(secondEpoch.next().getInt(0) == 333)
    +    assert(!secondEpoch.hasNext)
    +  }
    +
    +  test("empty epochs") {
    +    val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1)
    +    val endpoint = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint
    +    endpoint.askSync[Unit](ReceiverEpochMarker())
    +    endpoint.askSync[Unit](ReceiverEpochMarker())
    +    endpoint.askSync[Unit](ReceiverRow(unsafeRow(111)))
    +    endpoint.askSync[Unit](ReceiverEpochMarker())
    +    endpoint.askSync[Unit](ReceiverEpochMarker())
    +
    +    assert(rdd.compute(rdd.partitions(0), ctx).isEmpty)
    +    assert(rdd.compute(rdd.partitions(0), ctx).isEmpty)
    +    val thirdEpoch = rdd.compute(rdd.partitions(0), ctx)
    +    assert(thirdEpoch.next().getInt(0) == 111)
    +    assert(rdd.compute(rdd.partitions(0), ctx).isEmpty)
    +    assert(rdd.compute(rdd.partitions(0), ctx).isEmpty)
    +  }
    +
    +  test("multiple partitions") {
    +    val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 5)
    +    for (p <- rdd.partitions) {
    --- End diff --
    
    I would rather add data to all the partitions all at once, and try to read from all the partitions. This would test that each partition has their own distinct receivers correctly configured. Thats the real point of a multi-partition test.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21337: [SPARK-24234][SS] Reader for continuous processin...

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21337#discussion_r188397203
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/UnsafeRowReceiver.scala ---
    @@ -0,0 +1,56 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming.continuous.shuffle
    +
    +import java.util.concurrent.{ArrayBlockingQueue, BlockingQueue}
    +import java.util.concurrent.atomic.AtomicBoolean
    +
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint}
    +import org.apache.spark.sql.catalyst.expressions.UnsafeRow
    +
    +/**
    + * Messages for the UnsafeRowReceiver endpoint. Either an incoming row or an epoch marker.
    + */
    +private[shuffle] sealed trait UnsafeRowReceiverMessage extends Serializable
    +private[shuffle] case class ReceiverRow(row: UnsafeRow) extends UnsafeRowReceiverMessage
    +private[shuffle] case class ReceiverEpochMarker() extends UnsafeRowReceiverMessage
    +
    +/**
    + * RPC endpoint for receiving rows into a continuous processing shuffle task.
    --- End diff --
    
    may be good to add who the senders and receivers are to get an idea of where this fits in. 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21337: [SPARK-24234][SS] Reader for continuous processin...

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21337#discussion_r188396282
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/ContinuousShuffleReadRDD.scala ---
    @@ -0,0 +1,64 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming.continuous.shuffle
    +
    +import java.util.UUID
    +
    +import org.apache.spark.{Partition, SparkContext, SparkEnv, TaskContext}
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.sql.catalyst.expressions.UnsafeRow
    +import org.apache.spark.util.NextIterator
    +
    +case class ContinuousShuffleReadPartition(index: Int) extends Partition {
    +  // Initialized only on the executor, and only once even as we call compute() multiple times.
    +  lazy val (receiver, endpoint) = {
    +    val env = SparkEnv.get.rpcEnv
    +    val receiver = new UnsafeRowReceiver(env)
    +    val endpoint = env.setupEndpoint(UUID.randomUUID().toString, receiver)
    +    TaskContext.get().addTaskCompletionListener { ctx =>
    +      env.stop(endpoint)
    +    }
    +    (receiver, endpoint)
    +  }
    +}
    +
    +/**
    + * RDD at the bottom of each continuous processing shuffle task, reading from the
    + */
    +class ContinuousShuffleReadRDD(sc: SparkContext, numPartitions: Int)
    +    extends RDD[UnsafeRow](sc, Nil) {
    +
    +  override protected def getPartitions: Array[Partition] = {
    +    (0 until numPartitions).map(ContinuousShuffleReadPartition).toArray
    +  }
    +
    +  override def compute(split: Partition, context: TaskContext): Iterator[UnsafeRow] = {
    +    val receiver = split.asInstanceOf[ContinuousShuffleReadPartition].receiver
    +
    +    new NextIterator[UnsafeRow] {
    +      override def getNext(): UnsafeRow = receiver.poll() match {
    +        case ReceiverRow(r) => r
    +        case ReceiverEpochMarker() =>
    --- End diff --
    
    shouldn't this wait for epoch markers from all child tasks ?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21337: [SPARK-24234][SS] Reader for continuous processin...

Posted by jose-torres <gi...@git.apache.org>.
Github user jose-torres commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21337#discussion_r188829780
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/UnsafeRowReceiver.scala ---
    @@ -0,0 +1,56 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming.continuous.shuffle
    +
    +import java.util.concurrent.{ArrayBlockingQueue, BlockingQueue}
    +import java.util.concurrent.atomic.AtomicBoolean
    +
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint}
    +import org.apache.spark.sql.catalyst.expressions.UnsafeRow
    +
    +/**
    + * Messages for the UnsafeRowReceiver endpoint. Either an incoming row or an epoch marker.
    + */
    +private[shuffle] sealed trait UnsafeRowReceiverMessage extends Serializable
    +private[shuffle] case class ReceiverRow(row: UnsafeRow) extends UnsafeRowReceiverMessage
    +private[shuffle] case class ReceiverEpochMarker() extends UnsafeRowReceiverMessage
    +
    +/**
    + * RPC endpoint for receiving rows into a continuous processing shuffle task.
    + */
    +private[shuffle] class UnsafeRowReceiver(val rpcEnv: RpcEnv)
    +    extends ThreadSafeRpcEndpoint with Logging {
    +  private val queue = new ArrayBlockingQueue[UnsafeRowReceiverMessage](1024)
    --- End diff --
    
    There's actually an existing config I forgot to pull in. (I think it makes sense to use the same config for all continuous read buffers unless and until we see a need to split it.)


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21337: [SPARK-24234][SS] Reader for continuous processing shuff...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21337
  
    **[Test build #90735 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90735/testReport)** for PR 21337 at commit [`955ac79`](https://github.com/apache/spark/commit/955ac79eb05dc389e632d1aaa6c59396835c6ed5).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21337: [SPARK-24234][SS] Reader for continuous processing shuff...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21337
  
    **[Test build #90762 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90762/testReport)** for PR 21337 at commit [`de21b1c`](https://github.com/apache/spark/commit/de21b1c25a333d44c0521fe151b468e51f0bdc47).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21337: [SPARK-24234][SS] Reader for continuous processin...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21337#discussion_r188628202
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/ContinuousShuffleReadRDD.scala ---
    @@ -0,0 +1,64 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming.continuous.shuffle
    +
    +import java.util.UUID
    +
    +import org.apache.spark.{Partition, SparkContext, SparkEnv, TaskContext}
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.sql.catalyst.expressions.UnsafeRow
    +import org.apache.spark.util.NextIterator
    +
    +case class ContinuousShuffleReadPartition(index: Int) extends Partition {
    +  // Initialized only on the executor, and only once even as we call compute() multiple times.
    +  lazy val (receiver, endpoint) = {
    +    val env = SparkEnv.get.rpcEnv
    +    val receiver = new UnsafeRowReceiver(env)
    +    val endpoint = env.setupEndpoint(UUID.randomUUID().toString, receiver)
    +    TaskContext.get().addTaskCompletionListener { ctx =>
    +      env.stop(endpoint)
    +    }
    +    (receiver, endpoint)
    +  }
    +}
    +
    +/**
    + * RDD at the bottom of each continuous processing shuffle task, reading from the
    --- End diff --
    
    If my understanding is right, bottom will be the RDD which will be just injected before shuffling, so that would be neither reader nor writer.
    
    `first` and `last` would be good alternative for me if bottom looks like ambiguous. 
    
    As @arunmahadevan stated, comment looks like incomplete.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21337: [SPARK-24234][SS] Reader for continuous processin...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21337#discussion_r188638980
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/shuffle/ContinuousShuffleReadSuite.scala ---
    @@ -0,0 +1,122 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming.continuous.shuffle
    +
    +import org.apache.spark.{TaskContext, TaskContextImpl}
    +import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeProjection}
    +import org.apache.spark.sql.streaming.StreamTest
    +import org.apache.spark.sql.types.{DataType, IntegerType}
    +
    +class ContinuousShuffleReadSuite extends StreamTest {
    +
    +  private def unsafeRow(value: Int) = {
    +    UnsafeProjection.create(Array(IntegerType : DataType))(
    +      new GenericInternalRow(Array(value: Any)))
    +  }
    +
    +  var ctx: TaskContextImpl = _
    +
    +  override def beforeEach(): Unit = {
    +    super.beforeEach()
    +    ctx = TaskContext.empty()
    +    TaskContext.setTaskContext(ctx)
    +  }
    +
    +  override def afterEach(): Unit = {
    +    ctx.markTaskCompleted(None)
    +    ctx = null
    +    super.afterEach()
    +  }
    +
    +  test("receiver stopped with row last") {
    +    val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1)
    +    val endpoint = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint
    +    endpoint.askSync[Unit](ReceiverEpochMarker())
    +    endpoint.askSync[Unit](ReceiverRow(unsafeRow(111)))
    +
    +    ctx.markTaskCompleted(None)
    +    val receiver = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].receiver
    +    eventually(timeout(streamingTimeout)) {
    +      assert(receiver.stopped.get())
    +    }
    +  }
    +
    +  test("receiver stopped with marker last") {
    +    val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1)
    +    val endpoint = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint
    +    endpoint.askSync[Unit](ReceiverRow(unsafeRow(111)))
    +    endpoint.askSync[Unit](ReceiverEpochMarker())
    +
    +    ctx.markTaskCompleted(None)
    +    val receiver = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].receiver
    +    eventually(timeout(streamingTimeout)) {
    +      assert(receiver.stopped.get())
    +    }
    +  }
    +
    +  test("one epoch") {
    +    val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1)
    +    val endpoint = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint
    +    endpoint.askSync[Unit](ReceiverRow(unsafeRow(111)))
    +    endpoint.askSync[Unit](ReceiverRow(unsafeRow(222)))
    +    endpoint.askSync[Unit](ReceiverRow(unsafeRow(333)))
    +    endpoint.askSync[Unit](ReceiverEpochMarker())
    +
    +    val iter = rdd.compute(rdd.partitions(0), ctx)
    +    assert(iter.next().getInt(0) == 111)
    +    assert(iter.next().getInt(0) == 222)
    +    assert(iter.next().getInt(0) == 333)
    +    assert(!iter.hasNext)
    +  }
    +
    +  test("multiple epochs") {
    +    val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1)
    +    val endpoint = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint
    +    endpoint.askSync[Unit](ReceiverRow(unsafeRow(111)))
    +    endpoint.askSync[Unit](ReceiverEpochMarker())
    +    endpoint.askSync[Unit](ReceiverRow(unsafeRow(222)))
    +    endpoint.askSync[Unit](ReceiverRow(unsafeRow(333)))
    +    endpoint.askSync[Unit](ReceiverEpochMarker())
    +
    +    val firstEpoch = rdd.compute(rdd.partitions(0), ctx)
    +    assert(firstEpoch.next().getInt(0) == 111)
    +    assert(!firstEpoch.hasNext)
    +
    +    val secondEpoch = rdd.compute(rdd.partitions(0), ctx)
    +    assert(secondEpoch.next().getInt(0) == 222)
    +    assert(secondEpoch.next().getInt(0) == 333)
    +    assert(!secondEpoch.hasNext)
    +  }
    +
    +  test("empty epochs") {
    +    val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1)
    +    val endpoint = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint
    +    endpoint.askSync[Unit](ReceiverEpochMarker())
    +    endpoint.askSync[Unit](ReceiverEpochMarker())
    +    endpoint.askSync[Unit](ReceiverRow(unsafeRow(111)))
    +    endpoint.askSync[Unit](ReceiverEpochMarker())
    +    endpoint.askSync[Unit](ReceiverEpochMarker())
    +
    +    assert(rdd.compute(rdd.partitions(0), ctx).isEmpty)
    +    assert(rdd.compute(rdd.partitions(0), ctx).isEmpty)
    +    val thirdEpoch = rdd.compute(rdd.partitions(0), ctx)
    +    assert(thirdEpoch.next().getInt(0) == 111)
    +    assert(rdd.compute(rdd.partitions(0), ctx).isEmpty)
    +    assert(rdd.compute(rdd.partitions(0), ctx).isEmpty)
    +  }
    --- End diff --
    
    May be better to add test(s) for multiple partitions. I guess we don't need to reiterate all of tests, but just simple one with multiple partitions to ensure all RPC endpoints are working properly.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21337: [SPARK-24234][SS] Reader for continuous processin...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21337#discussion_r189127139
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/UnsafeRowReceiver.scala ---
    @@ -0,0 +1,64 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming.continuous.shuffle
    +
    +import java.util.concurrent.{ArrayBlockingQueue, BlockingQueue}
    +import java.util.concurrent.atomic.AtomicBoolean
    +
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint}
    +import org.apache.spark.sql.catalyst.expressions.UnsafeRow
    +
    +/**
    + * Messages for the UnsafeRowReceiver endpoint. Either an incoming row or an epoch marker.
    + */
    +private[shuffle] sealed trait UnsafeRowReceiverMessage extends Serializable
    +private[shuffle] case class ReceiverRow(row: UnsafeRow) extends UnsafeRowReceiverMessage
    +private[shuffle] case class ReceiverEpochMarker() extends UnsafeRowReceiverMessage
    +
    +/**
    + * RPC endpoint for receiving rows into a continuous processing shuffle task. Continuous shuffle
    + * writers will send rows here, with continuous shuffle readers polling for new rows as needed.
    + *
    + * TODO: Support multiple source tasks. We need to output a single epoch marker once all
    + * source tasks have sent one.
    + */
    +private[shuffle] class UnsafeRowReceiver(
    +      queueSize: Int,
    +      override val rpcEnv: RpcEnv)
    +    extends ThreadSafeRpcEndpoint with Logging {
    +  // Note that this queue will be drained from the main task thread and populated in the RPC
    +  // response thread.
    +  private val queue = new ArrayBlockingQueue[UnsafeRowReceiverMessage](queueSize)
    +  var stopped = new AtomicBoolean(false)
    --- End diff --
    
    why is this a var? and public?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21337: [SPARK-24234][SS] Reader for continuous processing shuff...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21337
  
    **[Test build #90762 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90762/testReport)** for PR 21337 at commit [`de21b1c`](https://github.com/apache/spark/commit/de21b1c25a333d44c0521fe151b468e51f0bdc47).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21337: [SPARK-24234][SS] Reader for continuous processing shuff...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21337
  
    **[Test build #90761 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90761/testReport)** for PR 21337 at commit [`97f7e8f`](https://github.com/apache/spark/commit/97f7e8ff865e6054d0d70914ce9bb51880b161f6).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `trait ContinuousShuffleReader `


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21337: [SPARK-24234][SS] Reader for continuous processin...

Posted by jose-torres <gi...@git.apache.org>.
Github user jose-torres commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21337#discussion_r188829815
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/ContinuousShuffleReadRDD.scala ---
    @@ -0,0 +1,64 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming.continuous.shuffle
    +
    +import java.util.UUID
    +
    +import org.apache.spark.{Partition, SparkContext, SparkEnv, TaskContext}
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.sql.catalyst.expressions.UnsafeRow
    +import org.apache.spark.util.NextIterator
    +
    +case class ContinuousShuffleReadPartition(index: Int) extends Partition {
    +  // Initialized only on the executor, and only once even as we call compute() multiple times.
    +  lazy val (receiver, endpoint) = {
    +    val env = SparkEnv.get.rpcEnv
    +    val receiver = new UnsafeRowReceiver(env)
    +    val endpoint = env.setupEndpoint(UUID.randomUUID().toString, receiver)
    +    TaskContext.get().addTaskCompletionListener { ctx =>
    +      env.stop(endpoint)
    +    }
    +    (receiver, endpoint)
    +  }
    +}
    +
    +/**
    + * RDD at the bottom of each continuous processing shuffle task, reading from the
    --- End diff --
    
    Clarified and completed the comment.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21337: [SPARK-24234][SS] Reader for continuous processin...

Posted by xuanyuanking <gi...@git.apache.org>.
Github user xuanyuanking commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21337#discussion_r188604001
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/ContinuousShuffleReadRDD.scala ---
    @@ -0,0 +1,64 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming.continuous.shuffle
    +
    +import java.util.UUID
    +
    +import org.apache.spark.{Partition, SparkContext, SparkEnv, TaskContext}
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.sql.catalyst.expressions.UnsafeRow
    +import org.apache.spark.util.NextIterator
    +
    +case class ContinuousShuffleReadPartition(index: Int) extends Partition {
    +  // Initialized only on the executor, and only once even as we call compute() multiple times.
    +  lazy val (receiver, endpoint) = {
    +    val env = SparkEnv.get.rpcEnv
    +    val receiver = new UnsafeRowReceiver(env)
    +    val endpoint = env.setupEndpoint(UUID.randomUUID().toString, receiver)
    +    TaskContext.get().addTaskCompletionListener { ctx =>
    +      env.stop(endpoint)
    +    }
    +    (receiver, endpoint)
    +  }
    +}
    +
    +/**
    + * RDD at the bottom of each continuous processing shuffle task, reading from the
    --- End diff --
    
    "Bottom is a bit ambiguous" +1 for this. 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21337: [SPARK-24234][SS] Reader for continuous processing shuff...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21337
  
    **[Test build #90706 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90706/testReport)** for PR 21337 at commit [`2ea8a6f`](https://github.com/apache/spark/commit/2ea8a6f94216e8b184e5780ec3e6ffb2838de382).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21337: [SPARK-24234][SS] Reader for continuous processing shuff...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21337
  
    **[Test build #90814 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90814/testReport)** for PR 21337 at commit [`154843d`](https://github.com/apache/spark/commit/154843d799683c5cdfc035033475f223f85f0d66).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21337: [SPARK-24234][SS] Reader for continuous processing shuff...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21337
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/90762/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21337: [SPARK-24234][SS] Reader for continuous processin...

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21337#discussion_r188682238
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/ContinuousShuffleReadRDD.scala ---
    @@ -0,0 +1,64 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming.continuous.shuffle
    +
    +import java.util.UUID
    +
    +import org.apache.spark.{Partition, SparkContext, SparkEnv, TaskContext}
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.sql.catalyst.expressions.UnsafeRow
    +import org.apache.spark.util.NextIterator
    +
    +case class ContinuousShuffleReadPartition(index: Int) extends Partition {
    +  // Initialized only on the executor, and only once even as we call compute() multiple times.
    +  lazy val (receiver, endpoint) = {
    +    val env = SparkEnv.get.rpcEnv
    +    val receiver = new UnsafeRowReceiver(env)
    +    val endpoint = env.setupEndpoint(UUID.randomUUID().toString, receiver)
    +    TaskContext.get().addTaskCompletionListener { ctx =>
    +      env.stop(endpoint)
    +    }
    +    (receiver, endpoint)
    +  }
    +}
    +
    +/**
    + * RDD at the bottom of each continuous processing shuffle task, reading from the
    + */
    +class ContinuousShuffleReadRDD(sc: SparkContext, numPartitions: Int)
    +    extends RDD[UnsafeRow](sc, Nil) {
    +
    +  override protected def getPartitions: Array[Partition] = {
    +    (0 until numPartitions).map(ContinuousShuffleReadPartition).toArray
    +  }
    +
    +  override def compute(split: Partition, context: TaskContext): Iterator[UnsafeRow] = {
    +    val receiver = split.asInstanceOf[ContinuousShuffleReadPartition].receiver
    +
    +    new NextIterator[UnsafeRow] {
    +      override def getNext(): UnsafeRow = receiver.poll() match {
    +        case ReceiverRow(r) => r
    +        case ReceiverEpochMarker() =>
    --- End diff --
    
    If the task does not wait for markers from all its children, it would not guarantee at-least once.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21337: [SPARK-24234][SS] Reader for continuous processing shuff...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21337
  
    **[Test build #90814 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90814/testReport)** for PR 21337 at commit [`154843d`](https://github.com/apache/spark/commit/154843d799683c5cdfc035033475f223f85f0d66).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class ContinuousShuffleReadRDD(`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21337: [SPARK-24234][SS] Reader for continuous processing shuff...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21337
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21337: [SPARK-24234][SS] Reader for continuous processin...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21337#discussion_r189409455
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/shuffle/ContinuousShuffleReadSuite.scala ---
    @@ -0,0 +1,159 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming.continuous.shuffle
    +
    +import org.apache.spark.{TaskContext, TaskContextImpl}
    +import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeProjection}
    +import org.apache.spark.sql.streaming.StreamTest
    +import org.apache.spark.sql.types.{DataType, IntegerType}
    +
    +class ContinuousShuffleReadSuite extends StreamTest {
    +
    +  private def unsafeRow(value: Int) = {
    +    UnsafeProjection.create(Array(IntegerType : DataType))(
    +      new GenericInternalRow(Array(value: Any)))
    +  }
    +
    +  // In this unit test, we emulate that we're in the task thread where
    +  // ContinuousShuffleReadRDD.compute() will be evaluated. This requires a task context
    +  // thread local to be set.
    +  var ctx: TaskContextImpl = _
    +
    +  override def beforeEach(): Unit = {
    +    super.beforeEach()
    +    ctx = TaskContext.empty()
    +    TaskContext.setTaskContext(ctx)
    +  }
    +
    +  override def afterEach(): Unit = {
    +    ctx.markTaskCompleted(None)
    +    TaskContext.unset()
    +    ctx = null
    +    super.afterEach()
    +  }
    +
    +  test("receiver stopped with row last") {
    +    val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1)
    +    val endpoint = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint
    +    endpoint.askSync[Unit](ReceiverEpochMarker())
    +    endpoint.askSync[Unit](ReceiverRow(unsafeRow(111)))
    +
    +    ctx.markTaskCompleted(None)
    +    val receiver = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].reader
    +    eventually(timeout(streamingTimeout)) {
    +      assert(receiver.asInstanceOf[UnsafeRowReceiver].stopped.get())
    +    }
    +  }
    +
    +  test("receiver stopped with marker last") {
    +    val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1)
    +    val endpoint = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint
    +    endpoint.askSync[Unit](ReceiverRow(unsafeRow(111)))
    +    endpoint.askSync[Unit](ReceiverEpochMarker())
    +
    +    ctx.markTaskCompleted(None)
    +    val receiver = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].reader
    +    eventually(timeout(streamingTimeout)) {
    +      assert(receiver.asInstanceOf[UnsafeRowReceiver].stopped.get())
    +    }
    +  }
    +
    +  test("one epoch") {
    +    val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1)
    +    val endpoint = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint
    +    endpoint.askSync[Unit](ReceiverRow(unsafeRow(111)))
    +    endpoint.askSync[Unit](ReceiverRow(unsafeRow(222)))
    +    endpoint.askSync[Unit](ReceiverRow(unsafeRow(333)))
    +    endpoint.askSync[Unit](ReceiverEpochMarker())
    +
    +    val iter = rdd.compute(rdd.partitions(0), ctx)
    +    assert(iter.toSeq.map(_.getInt(0)) == Seq(111, 222, 333))
    +  }
    +
    +  test("multiple epochs") {
    +    val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1)
    +    val endpoint = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint
    +    endpoint.askSync[Unit](ReceiverRow(unsafeRow(111)))
    +    endpoint.askSync[Unit](ReceiverEpochMarker())
    +    endpoint.askSync[Unit](ReceiverRow(unsafeRow(222)))
    +    endpoint.askSync[Unit](ReceiverRow(unsafeRow(333)))
    +    endpoint.askSync[Unit](ReceiverEpochMarker())
    +
    +    val firstEpoch = rdd.compute(rdd.partitions(0), ctx)
    +    assert(firstEpoch.toSeq.map(_.getInt(0)) == Seq(111))
    +
    +    val secondEpoch = rdd.compute(rdd.partitions(0), ctx)
    +    assert(secondEpoch.toSeq.map(_.getInt(0)) == Seq(222, 333))
    +  }
    +
    +  test("empty epochs") {
    +    val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1)
    +    val endpoint = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint
    +    endpoint.askSync[Unit](ReceiverEpochMarker())
    +    endpoint.askSync[Unit](ReceiverEpochMarker())
    +    endpoint.askSync[Unit](ReceiverRow(unsafeRow(111)))
    +    endpoint.askSync[Unit](ReceiverEpochMarker())
    +    endpoint.askSync[Unit](ReceiverEpochMarker())
    +    endpoint.askSync[Unit](ReceiverEpochMarker())
    +
    +    assert(rdd.compute(rdd.partitions(0), ctx).isEmpty)
    +    assert(rdd.compute(rdd.partitions(0), ctx).isEmpty)
    +
    +    val thirdEpoch = rdd.compute(rdd.partitions(0), ctx)
    +    assert(thirdEpoch.toSeq.map(_.getInt(0)) == Seq(111))
    +
    +    assert(rdd.compute(rdd.partitions(0), ctx).isEmpty)
    +    assert(rdd.compute(rdd.partitions(0), ctx).isEmpty)
    +  }
    +
    +  test("multiple partitions") {
    +    val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 5)
    +    // Send all data before processing to ensure there's no crossover.
    +    for (p <- rdd.partitions) {
    +      val part = p.asInstanceOf[ContinuousShuffleReadPartition]
    +      // Send index for identification.
    +      part.endpoint.askSync[Unit](ReceiverRow(unsafeRow(part.index)))
    +      part.endpoint.askSync[Unit](ReceiverEpochMarker())
    +    }
    +
    +    for (p <- rdd.partitions) {
    +      val part = p.asInstanceOf[ContinuousShuffleReadPartition]
    +      val iter = rdd.compute(part, ctx)
    +      assert(iter.next().getInt(0) == part.index)
    +      assert(!iter.hasNext)
    +    }
    +  }
    +
    +  test("blocks waiting for new rows") {
    +    val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1)
    +
    +    val readRow = new Thread {
    --- End diff --
    
    nit: not a row. its a thread.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21337: [SPARK-24234][SS] Reader for continuous processing shuff...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the issue:

    https://github.com/apache/spark/pull/21337
  
    LGTM. Will merge after tests pass.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21337: [SPARK-24234][SS] Reader for continuous processin...

Posted by xuanyuanking <gi...@git.apache.org>.
Github user xuanyuanking commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21337#discussion_r188601016
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/UnsafeRowReceiver.scala ---
    @@ -0,0 +1,56 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming.continuous.shuffle
    +
    +import java.util.concurrent.{ArrayBlockingQueue, BlockingQueue}
    +import java.util.concurrent.atomic.AtomicBoolean
    +
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint}
    +import org.apache.spark.sql.catalyst.expressions.UnsafeRow
    +
    +/**
    + * Messages for the UnsafeRowReceiver endpoint. Either an incoming row or an epoch marker.
    + */
    +private[shuffle] sealed trait UnsafeRowReceiverMessage extends Serializable
    +private[shuffle] case class ReceiverRow(row: UnsafeRow) extends UnsafeRowReceiverMessage
    +private[shuffle] case class ReceiverEpochMarker() extends UnsafeRowReceiverMessage
    +
    +/**
    + * RPC endpoint for receiving rows into a continuous processing shuffle task.
    + */
    +private[shuffle] class UnsafeRowReceiver(val rpcEnv: RpcEnv)
    --- End diff --
    
    override val rpcEnv here?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21337: [SPARK-24234][SS] Reader for continuous processin...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21337#discussion_r188636306
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/UnsafeRowReceiver.scala ---
    @@ -0,0 +1,56 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming.continuous.shuffle
    +
    +import java.util.concurrent.{ArrayBlockingQueue, BlockingQueue}
    +import java.util.concurrent.atomic.AtomicBoolean
    +
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint}
    +import org.apache.spark.sql.catalyst.expressions.UnsafeRow
    +
    +/**
    + * Messages for the UnsafeRowReceiver endpoint. Either an incoming row or an epoch marker.
    + */
    +private[shuffle] sealed trait UnsafeRowReceiverMessage extends Serializable
    +private[shuffle] case class ReceiverRow(row: UnsafeRow) extends UnsafeRowReceiverMessage
    +private[shuffle] case class ReceiverEpochMarker() extends UnsafeRowReceiverMessage
    +
    +/**
    + * RPC endpoint for receiving rows into a continuous processing shuffle task.
    + */
    +private[shuffle] class UnsafeRowReceiver(val rpcEnv: RpcEnv)
    +    extends ThreadSafeRpcEndpoint with Logging {
    +  private val queue = new ArrayBlockingQueue[UnsafeRowReceiverMessage](1024)
    --- End diff --
    
    I guess we can handle 2 as TODO if we would like to focus on proposed patch.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21337: [SPARK-24234][SS] Reader for continuous processing shuff...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21337
  
    **[Test build #90705 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90705/testReport)** for PR 21337 at commit [`46456dc`](https://github.com/apache/spark/commit/46456dc75a6aec9659b18523c421999debd060eb).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21337: [SPARK-24234][SS] Reader for continuous processing shuff...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21337
  
    **[Test build #90705 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90705/testReport)** for PR 21337 at commit [`46456dc`](https://github.com/apache/spark/commit/46456dc75a6aec9659b18523c421999debd060eb).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21337: [SPARK-24234][SS] Reader for continuous processing shuff...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21337
  
    **[Test build #90656 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90656/testReport)** for PR 21337 at commit [`1d6b718`](https://github.com/apache/spark/commit/1d6b71898e2a640e3c0809695d2b83f3f84eaa38).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21337: [SPARK-24234][SS] Reader for continuous processin...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21337#discussion_r189409872
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/shuffle/ContinuousShuffleReadSuite.scala ---
    @@ -0,0 +1,159 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming.continuous.shuffle
    +
    +import org.apache.spark.{TaskContext, TaskContextImpl}
    +import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeProjection}
    +import org.apache.spark.sql.streaming.StreamTest
    +import org.apache.spark.sql.types.{DataType, IntegerType}
    +
    +class ContinuousShuffleReadSuite extends StreamTest {
    +
    +  private def unsafeRow(value: Int) = {
    +    UnsafeProjection.create(Array(IntegerType : DataType))(
    +      new GenericInternalRow(Array(value: Any)))
    +  }
    +
    +  // In this unit test, we emulate that we're in the task thread where
    +  // ContinuousShuffleReadRDD.compute() will be evaluated. This requires a task context
    +  // thread local to be set.
    +  var ctx: TaskContextImpl = _
    +
    +  override def beforeEach(): Unit = {
    +    super.beforeEach()
    +    ctx = TaskContext.empty()
    +    TaskContext.setTaskContext(ctx)
    +  }
    +
    +  override def afterEach(): Unit = {
    +    ctx.markTaskCompleted(None)
    +    TaskContext.unset()
    +    ctx = null
    +    super.afterEach()
    +  }
    +
    +  test("receiver stopped with row last") {
    +    val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1)
    +    val endpoint = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint
    +    endpoint.askSync[Unit](ReceiverEpochMarker())
    +    endpoint.askSync[Unit](ReceiverRow(unsafeRow(111)))
    +
    +    ctx.markTaskCompleted(None)
    +    val receiver = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].reader
    +    eventually(timeout(streamingTimeout)) {
    +      assert(receiver.asInstanceOf[UnsafeRowReceiver].stopped.get())
    +    }
    +  }
    +
    +  test("receiver stopped with marker last") {
    +    val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1)
    +    val endpoint = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint
    +    endpoint.askSync[Unit](ReceiverRow(unsafeRow(111)))
    +    endpoint.askSync[Unit](ReceiverEpochMarker())
    +
    +    ctx.markTaskCompleted(None)
    +    val receiver = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].reader
    +    eventually(timeout(streamingTimeout)) {
    +      assert(receiver.asInstanceOf[UnsafeRowReceiver].stopped.get())
    +    }
    +  }
    +
    +  test("one epoch") {
    +    val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1)
    +    val endpoint = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint
    +    endpoint.askSync[Unit](ReceiverRow(unsafeRow(111)))
    +    endpoint.askSync[Unit](ReceiverRow(unsafeRow(222)))
    +    endpoint.askSync[Unit](ReceiverRow(unsafeRow(333)))
    +    endpoint.askSync[Unit](ReceiverEpochMarker())
    +
    +    val iter = rdd.compute(rdd.partitions(0), ctx)
    +    assert(iter.toSeq.map(_.getInt(0)) == Seq(111, 222, 333))
    +  }
    +
    +  test("multiple epochs") {
    +    val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1)
    +    val endpoint = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint
    +    endpoint.askSync[Unit](ReceiverRow(unsafeRow(111)))
    +    endpoint.askSync[Unit](ReceiverEpochMarker())
    +    endpoint.askSync[Unit](ReceiverRow(unsafeRow(222)))
    +    endpoint.askSync[Unit](ReceiverRow(unsafeRow(333)))
    +    endpoint.askSync[Unit](ReceiverEpochMarker())
    --- End diff --
    
    Would be super NICE if there was a function that allowed this to be 
    `send(endpoint, ReceiverRow(unsafeRow(111)), ReceiverEpochMarker(), ReceiverRow(unsafeRow(222)), ReceiverRow(unsafeRow(333)), ...) `


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21337: [SPARK-24234][SS] Reader for continuous processing shuff...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21337
  
    **[Test build #90817 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90817/testReport)** for PR 21337 at commit [`00f910e`](https://github.com/apache/spark/commit/00f910ea39b76a24e1e21acdf3d6a20fd7784fa9).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21337: [SPARK-24234][SS] Reader for continuous processin...

Posted by jose-torres <gi...@git.apache.org>.
Github user jose-torres commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21337#discussion_r188829894
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/ContinuousShuffleReadRDD.scala ---
    @@ -0,0 +1,64 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming.continuous.shuffle
    +
    +import java.util.UUID
    +
    +import org.apache.spark.{Partition, SparkContext, SparkEnv, TaskContext}
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.sql.catalyst.expressions.UnsafeRow
    +import org.apache.spark.util.NextIterator
    +
    +case class ContinuousShuffleReadPartition(index: Int) extends Partition {
    +  // Initialized only on the executor, and only once even as we call compute() multiple times.
    +  lazy val (receiver, endpoint) = {
    +    val env = SparkEnv.get.rpcEnv
    +    val receiver = new UnsafeRowReceiver(env)
    +    val endpoint = env.setupEndpoint(UUID.randomUUID().toString, receiver)
    +    TaskContext.get().addTaskCompletionListener { ctx =>
    +      env.stop(endpoint)
    +    }
    +    (receiver, endpoint)
    +  }
    +}
    +
    +/**
    + * RDD at the bottom of each continuous processing shuffle task, reading from the
    + */
    +class ContinuousShuffleReadRDD(sc: SparkContext, numPartitions: Int)
    +    extends RDD[UnsafeRow](sc, Nil) {
    +
    +  override protected def getPartitions: Array[Partition] = {
    +    (0 until numPartitions).map(ContinuousShuffleReadPartition).toArray
    +  }
    +
    +  override def compute(split: Partition, context: TaskContext): Iterator[UnsafeRow] = {
    +    val receiver = split.asInstanceOf[ContinuousShuffleReadPartition].receiver
    +
    +    new NextIterator[UnsafeRow] {
    +      override def getNext(): UnsafeRow = receiver.poll() match {
    +        case ReceiverRow(r) => r
    +        case ReceiverEpochMarker() =>
    --- End diff --
    
    The way I see it is that the current implementation only supports one UnsafeRow source, and the followup PR will extend it to multiple.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21337: [SPARK-24234][SS] Reader for continuous processing shuff...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21337
  
    **[Test build #90761 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90761/testReport)** for PR 21337 at commit [`97f7e8f`](https://github.com/apache/spark/commit/97f7e8ff865e6054d0d70914ce9bb51880b161f6).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21337: [SPARK-24234][SS] Reader for continuous processing shuff...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21337
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/90705/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21337: [SPARK-24234][SS] Reader for continuous processin...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21337#discussion_r189129893
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/shuffle/ContinuousShuffleReadSuite.scala ---
    @@ -0,0 +1,137 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming.continuous.shuffle
    +
    +import org.apache.spark.{TaskContext, TaskContextImpl}
    +import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeProjection}
    +import org.apache.spark.sql.streaming.StreamTest
    +import org.apache.spark.sql.types.{DataType, IntegerType}
    +
    +class ContinuousShuffleReadSuite extends StreamTest {
    +
    +  private def unsafeRow(value: Int) = {
    +    UnsafeProjection.create(Array(IntegerType : DataType))(
    +      new GenericInternalRow(Array(value: Any)))
    +  }
    +
    +  var ctx: TaskContextImpl = _
    +
    +  override def beforeEach(): Unit = {
    +    super.beforeEach()
    +    ctx = TaskContext.empty()
    +    TaskContext.setTaskContext(ctx)
    +  }
    +
    +  override def afterEach(): Unit = {
    +    ctx.markTaskCompleted(None)
    +    TaskContext.unset()
    +    ctx = null
    +    super.afterEach()
    +  }
    +
    +  test("receiver stopped with row last") {
    +    val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1)
    +    val endpoint = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint
    +    endpoint.askSync[Unit](ReceiverEpochMarker())
    +    endpoint.askSync[Unit](ReceiverRow(unsafeRow(111)))
    +
    +    ctx.markTaskCompleted(None)
    +    val receiver = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].receiver
    +    eventually(timeout(streamingTimeout)) {
    +      assert(receiver.stopped.get())
    +    }
    +  }
    +
    +  test("receiver stopped with marker last") {
    +    val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1)
    +    val endpoint = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint
    +    endpoint.askSync[Unit](ReceiverRow(unsafeRow(111)))
    +    endpoint.askSync[Unit](ReceiverEpochMarker())
    +
    +    ctx.markTaskCompleted(None)
    +    val receiver = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].receiver
    +    eventually(timeout(streamingTimeout)) {
    +      assert(receiver.stopped.get())
    +    }
    +  }
    +
    +  test("one epoch") {
    +    val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1)
    +    val endpoint = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint
    +    endpoint.askSync[Unit](ReceiverRow(unsafeRow(111)))
    +    endpoint.askSync[Unit](ReceiverRow(unsafeRow(222)))
    +    endpoint.askSync[Unit](ReceiverRow(unsafeRow(333)))
    +    endpoint.askSync[Unit](ReceiverEpochMarker())
    +
    +    val iter = rdd.compute(rdd.partitions(0), ctx)
    +    assert(iter.next().getInt(0) == 111)
    +    assert(iter.next().getInt(0) == 222)
    +    assert(iter.next().getInt(0) == 333)
    +    assert(!iter.hasNext)
    --- End diff --
    
    can be compressed to `assert(iter.toSeq == Seq(111, 222, 333))`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21337: [SPARK-24234][SS] Reader for continuous processin...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21337#discussion_r189409378
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/shuffle/ContinuousShuffleReadSuite.scala ---
    @@ -0,0 +1,159 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming.continuous.shuffle
    +
    +import org.apache.spark.{TaskContext, TaskContextImpl}
    +import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeProjection}
    +import org.apache.spark.sql.streaming.StreamTest
    +import org.apache.spark.sql.types.{DataType, IntegerType}
    +
    +class ContinuousShuffleReadSuite extends StreamTest {
    +
    +  private def unsafeRow(value: Int) = {
    +    UnsafeProjection.create(Array(IntegerType : DataType))(
    +      new GenericInternalRow(Array(value: Any)))
    +  }
    +
    +  // In this unit test, we emulate that we're in the task thread where
    +  // ContinuousShuffleReadRDD.compute() will be evaluated. This requires a task context
    +  // thread local to be set.
    +  var ctx: TaskContextImpl = _
    +
    +  override def beforeEach(): Unit = {
    +    super.beforeEach()
    +    ctx = TaskContext.empty()
    +    TaskContext.setTaskContext(ctx)
    +  }
    +
    +  override def afterEach(): Unit = {
    +    ctx.markTaskCompleted(None)
    +    TaskContext.unset()
    +    ctx = null
    +    super.afterEach()
    +  }
    +
    +  test("receiver stopped with row last") {
    +    val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1)
    +    val endpoint = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint
    +    endpoint.askSync[Unit](ReceiverEpochMarker())
    +    endpoint.askSync[Unit](ReceiverRow(unsafeRow(111)))
    +
    +    ctx.markTaskCompleted(None)
    +    val receiver = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].reader
    +    eventually(timeout(streamingTimeout)) {
    +      assert(receiver.asInstanceOf[UnsafeRowReceiver].stopped.get())
    +    }
    +  }
    +
    +  test("receiver stopped with marker last") {
    +    val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1)
    +    val endpoint = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint
    +    endpoint.askSync[Unit](ReceiverRow(unsafeRow(111)))
    +    endpoint.askSync[Unit](ReceiverEpochMarker())
    +
    +    ctx.markTaskCompleted(None)
    +    val receiver = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].reader
    +    eventually(timeout(streamingTimeout)) {
    +      assert(receiver.asInstanceOf[UnsafeRowReceiver].stopped.get())
    +    }
    +  }
    +
    +  test("one epoch") {
    +    val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1)
    +    val endpoint = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint
    +    endpoint.askSync[Unit](ReceiverRow(unsafeRow(111)))
    +    endpoint.askSync[Unit](ReceiverRow(unsafeRow(222)))
    +    endpoint.askSync[Unit](ReceiverRow(unsafeRow(333)))
    +    endpoint.askSync[Unit](ReceiverEpochMarker())
    +
    +    val iter = rdd.compute(rdd.partitions(0), ctx)
    +    assert(iter.toSeq.map(_.getInt(0)) == Seq(111, 222, 333))
    +  }
    +
    +  test("multiple epochs") {
    +    val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1)
    +    val endpoint = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint
    +    endpoint.askSync[Unit](ReceiverRow(unsafeRow(111)))
    +    endpoint.askSync[Unit](ReceiverEpochMarker())
    +    endpoint.askSync[Unit](ReceiverRow(unsafeRow(222)))
    +    endpoint.askSync[Unit](ReceiverRow(unsafeRow(333)))
    +    endpoint.askSync[Unit](ReceiverEpochMarker())
    +
    +    val firstEpoch = rdd.compute(rdd.partitions(0), ctx)
    +    assert(firstEpoch.toSeq.map(_.getInt(0)) == Seq(111))
    +
    +    val secondEpoch = rdd.compute(rdd.partitions(0), ctx)
    +    assert(secondEpoch.toSeq.map(_.getInt(0)) == Seq(222, 333))
    +  }
    +
    +  test("empty epochs") {
    +    val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1)
    +    val endpoint = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint
    +    endpoint.askSync[Unit](ReceiverEpochMarker())
    +    endpoint.askSync[Unit](ReceiverEpochMarker())
    +    endpoint.askSync[Unit](ReceiverRow(unsafeRow(111)))
    +    endpoint.askSync[Unit](ReceiverEpochMarker())
    +    endpoint.askSync[Unit](ReceiverEpochMarker())
    +    endpoint.askSync[Unit](ReceiverEpochMarker())
    +
    +    assert(rdd.compute(rdd.partitions(0), ctx).isEmpty)
    +    assert(rdd.compute(rdd.partitions(0), ctx).isEmpty)
    +
    +    val thirdEpoch = rdd.compute(rdd.partitions(0), ctx)
    +    assert(thirdEpoch.toSeq.map(_.getInt(0)) == Seq(111))
    +
    +    assert(rdd.compute(rdd.partitions(0), ctx).isEmpty)
    +    assert(rdd.compute(rdd.partitions(0), ctx).isEmpty)
    +  }
    +
    +  test("multiple partitions") {
    +    val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 5)
    +    // Send all data before processing to ensure there's no crossover.
    +    for (p <- rdd.partitions) {
    +      val part = p.asInstanceOf[ContinuousShuffleReadPartition]
    +      // Send index for identification.
    +      part.endpoint.askSync[Unit](ReceiverRow(unsafeRow(part.index)))
    +      part.endpoint.askSync[Unit](ReceiverEpochMarker())
    +    }
    +
    +    for (p <- rdd.partitions) {
    +      val part = p.asInstanceOf[ContinuousShuffleReadPartition]
    +      val iter = rdd.compute(part, ctx)
    +      assert(iter.next().getInt(0) == part.index)
    +      assert(!iter.hasNext)
    +    }
    +  }
    +
    +  test("blocks waiting for new rows") {
    +    val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1)
    +
    +    val readRow = new Thread {
    +      override def run(): Unit = {
    +        // set the non-inheritable thread local
    +        TaskContext.setTaskContext(ctx)
    +        val epoch = rdd.compute(rdd.partitions(0), ctx)
    +        epoch.next().getInt(0)
    +      }
    +    }
    +
    +    readRow.start()
    +    eventually(timeout(streamingTimeout)) {
    +      assert(readRow.getState == Thread.State.WAITING)
    +    }
    +  }
    --- End diff --
    
    nit: terminate the thread.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org