You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by tdas <gi...@git.apache.org> on 2014/10/31 00:06:38 UTC

[GitHub] spark pull request: [SPARK-4029][Streaming] Update streaming drive...

GitHub user tdas opened a pull request:

    https://github.com/apache/spark/pull/3026

    [SPARK-4029][Streaming] Update streaming driver to reliably save and recover received block metadata on driver failures

    As part of the initiative of preventing data loss on driver failure, this JIRA tracks the sub task of modifying the streaming driver to reliably save received block metadata, and recover them on driver restart.
    
    This was solved by introducing a `ReceivedBlockTracker` that takes all the responsibility of managing the metadata of received blocks (i.e. `ReceivedBlockInfo`, and any actions on them (e.g, allocating blocks to batches, etc.). All actions to block info get written out to a write ahead log (using `WriteAheadLogManager`). On recovery, all the actions are replaying to recreate the pre-failure state of the `ReceivedBlockTracker`, which include the batch-to-block allocations and the unallocated blocks. 
    
    Furthermore, the `ReceiverInputDStream` was modified to create `WriteAheadLogBackedBlockRDD`s when file segment info is present in the `ReceivedBlockInfo`. After recovery of all the block info (through recovery `ReceivedBlockTracker`), the `WriteAheadLogBackedBlockRDD`s gets recreated with the recovered info, and jobs submitted. The data of the blocks gets pulled from the write ahead logs, thanks to the segment info present in the `ReceivedBlockInfo`.
    
    This is still a WIP. Things that are missing here are.
    - Unit test that tests the driver recovery, by killing and restarting the streaming context, and verifying all the input data gets processed.
    - Cleaning up the received data write ahead log, by calling `ReceivedBlockHandler.cleanupOldBlocks`.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tdas/spark driver-ha-rbt

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/3026.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #3026
    
----
commit 7ae0a7fb1301a346921235bb4c661c92d71536f3
Author: Tathagata Das <ta...@gmail.com>
Date:   2014-10-30T22:41:13Z

    Transferred changes from driver-ha-working branch

commit 25611d6c29dd4115fc17b4e36bffa33eaac8e2a6
Author: Tathagata Das <ta...@gmail.com>
Date:   2014-10-30T22:56:50Z

    Minor changes before submitting PR

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4029][Streaming] Update streaming drive...

Posted by harishreedharan <gi...@git.apache.org>.
Github user harishreedharan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3026#discussion_r19698295
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala ---
    @@ -58,24 +54,45 @@ abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingCont
     
       def stop() {}
     
    -  /** Ask ReceiverInputTracker for received data blocks and generates RDDs with them. */
    +  /**
    +   * Generates RDDs with blocks received by the receiver of this stream. */
       override def compute(validTime: Time): Option[RDD[T]] = {
    -    // If this is called for any time before the start time of the context,
    -    // then this returns an empty RDD. This may happen when recovering from a
    -    // master failure
    -    if (validTime >= graph.startTime) {
    -      val blockInfo = ssc.scheduler.receiverTracker.getReceivedBlockInfo(id)
    -      receivedBlockInfo(validTime) = blockInfo
    -      val blockIds = blockInfo.map { _.blockStoreResult.blockId.asInstanceOf[BlockId] }
    -      Some(new BlockRDD[T](ssc.sc, blockIds))
    -    } else {
    -      Some(new BlockRDD[T](ssc.sc, Array.empty))
    -    }
    -  }
    +    val blockRDD = {
     
    -  /** Get information on received blocks. */
    -  private[streaming] def getReceivedBlockInfo(time: Time) = {
    -    receivedBlockInfo.get(time).getOrElse(Array.empty[ReceivedBlockInfo])
    +      if (validTime < graph.startTime) {
    +        // If this is called for any time before the start time of the context,
    +        // then this returns an empty RDD. This may happen when recovering from a
    +        // driver failure without any write ahead log to recover pre-failure data.
    +        new BlockRDD[T](ssc.sc, Array.empty)
    +      } else {
    +        // Otherwise, ask the tracker for all the blocks that have been allocated to this stream
    +        // for this batch
    +        val blockInfos =
    +          ssc.scheduler.receiverTracker.getBlocksOfBatch(validTime).get(id).getOrElse(Seq.empty)
    +        val blockStoreResults = blockInfos.map { _.blockStoreResult }
    +        val blockIds = blockStoreResults.map { _.blockId.asInstanceOf[BlockId] }.toArray
    +
    +        // Check whether all the results are of the same type
    +        val resultTypes = blockStoreResults.map { _.getClass }.distinct
    +        if (resultTypes.size > 1) {
    +          logWarning("Multiple result types in block information, WAL information will be ignored.")
    +        }
    +
    +        // If all the results are of type WriteAheadLogBasedStoreResult, then create
    +        // WriteAheadLogBackedBlockRDD else create simple BlockRDD.
    +        if (resultTypes.size == 1 && resultTypes.head == classOf[WriteAheadLogBasedStoreResult]) {
    +          val logSegments = blockStoreResults.map {
    +            _.asInstanceOf[WriteAheadLogBasedStoreResult].segment
    +          }.toArray
    +          // Since storeInBlockManager = false, the storage level does not matter.
    +          new WriteAheadLogBackedBlockRDD[T](ssc.sparkContext,
    +            blockIds, logSegments, storeInBlockManager = false, StorageLevel.NONE)
    --- End diff --
    
    If we are not storing in the BlockManager on recovery, does that param even make sense? Shouldn't we be storing the recovered data in the BM - if the same RDD is used for two distinct transformations, wouldn't it help if the data is in the BM?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4029][Streaming] Update streaming drive...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/3026#issuecomment-61345817
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/22654/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4029][Streaming] Update streaming drive...

Posted by harishreedharan <gi...@git.apache.org>.
Github user harishreedharan commented on the pull request:

    https://github.com/apache/spark/pull/3026#issuecomment-61345576
  
    This looks good. Apart from the one question I had above, this looks good to go


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4029][Streaming] Update streaming drive...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3026#discussion_r19696840
  
    --- Diff: streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala ---
    @@ -0,0 +1,230 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.streaming
    +
    +import java.io.File
    +
    +import scala.Some
    +import scala.collection.mutable.ArrayBuffer
    +import scala.concurrent.duration._
    +import scala.language.{implicitConversions, postfixOps}
    +import scala.util.Random
    +
    +import com.google.common.io.Files
    +import org.apache.commons.io.FileUtils
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.spark.{Logging, SparkConf}
    +import org.apache.spark.storage.StreamBlockId
    +import org.apache.spark.streaming.scheduler._
    +import org.apache.spark.streaming.util.WriteAheadLogSuite._
    +import org.apache.spark.streaming.util.{WriteAheadLogReader, Clock, ManualClock, SystemClock}
    +import org.apache.spark.util.Utils
    +import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
    +import org.scalatest.concurrent.Eventually._
    +import org.apache.spark.streaming.scheduler.ReceivedBlockInfo
    +import org.apache.spark.storage.StreamBlockId
    +import scala.Some
    +import org.apache.spark.streaming.receiver.BlockManagerBasedStoreResult
    +
    +class ReceivedBlockTrackerSuite
    +  extends FunSuite with BeforeAndAfter with Matchers with Logging {
    +
    +  val conf = new SparkConf().setMaster("local[2]").setAppName("ReceivedBlockTrackerSuite")
    +  conf.set("spark.streaming.receivedBlockTracker.writeAheadLog.rotationIntervalSecs", "1")
    +
    +  val hadoopConf = new Configuration()
    +  val akkaTimeout = 10 seconds
    +  val streamId = 1
    +
    +  var allReceivedBlockTrackers = new ArrayBuffer[ReceivedBlockTracker]()
    +  var checkpointDirectory: File = null
    +
    +  before {
    +    checkpointDirectory = Files.createTempDir()
    +  }
    +
    +  after {
    +    allReceivedBlockTrackers.foreach { _.stop() }
    +    if (checkpointDirectory != null && checkpointDirectory.exists()) {
    +      FileUtils.deleteDirectory(checkpointDirectory)
    +      checkpointDirectory = null
    +    }
    +  }
    +
    +  test("block addition, and block to batch allocation") {
    +    val receivedBlockTracker = createTracker(enableCheckpoint = false)
    +    receivedBlockTracker.getUnallocatedBlocks(streamId) shouldEqual Seq.empty
    +
    +    val blockInfos = generateBlockInfos()
    +    blockInfos.map(receivedBlockTracker.addBlock)
    +
    +    receivedBlockTracker.getUnallocatedBlocks(streamId) shouldEqual blockInfos
    +    receivedBlockTracker.allocateBlocksToBatch(1)
    +    receivedBlockTracker.getBlocksOfBatch(1, streamId) shouldEqual blockInfos
    +    receivedBlockTracker.getUnallocatedBlocks(streamId) should have size 0
    +    receivedBlockTracker.allocateBlocksToBatch(2)
    +    receivedBlockTracker.getBlocksOfBatch(2, streamId) should have size 0
    +  }
    +
    +  test("block addition, block to batch allocation and cleanup with write ahead log") {
    +    val manualClock = new ManualClock
    +    conf.getInt(
    +      "spark.streaming.receivedBlockTracker.writeAheadLog.rotationIntervalSecs", -1) should be (1)
    +
    +    // Set the time increment level to twice the rotation interval so that every increment creates
    +    // a new log file
    +    val timeIncrementMillis = 2000L
    +    def incrementTime() {
    +      manualClock.addToTime(timeIncrementMillis)
    +    }
    +
    +    // Generate and add blocks to the given tracker
    +    def addBlockInfos(tracker: ReceivedBlockTracker): Seq[ReceivedBlockInfo] = {
    +      val blockInfos = generateBlockInfos()
    +      blockInfos.map(tracker.addBlock)
    +      blockInfos
    +    }
    +
    +    // Print the data present in the log ahead files in the log directory
    +    def printLogFiles(message: String) {
    +      val fileContents = getWriteAheadLogFiles().map { file =>
    +        (s"\n>>>>> $file: <<<<<\n${getWrittenLogData(file).mkString("\n")}")
    +      }.mkString("\n")
    +      logInfo(s"\n\n=====================\n$message\n$fileContents\n=====================\n")
    +    }
    +
    +    // Start tracker and add blocks
    +    val tracker1 = createTracker(enableCheckpoint = true, clock = manualClock)
    +    val blockInfos1 = addBlockInfos(tracker1)
    +    tracker1.getUnallocatedBlocks(streamId).toList shouldEqual blockInfos1
    +
    +    // Verify whether write ahead log has correct contents
    +    val expectedWrittenData1 = blockInfos1.map(BlockAdditionEvent)
    +    getWrittenLogData() shouldEqual expectedWrittenData1
    +    getWriteAheadLogFiles() should have size 1
    +
    +    // Restart tracker and verify recovered list of unallocated blocks
    +    incrementTime()
    +    val tracker2 = createTracker(enableCheckpoint = true, clock = manualClock)
    +    tracker2.getUnallocatedBlocks(streamId).toList shouldEqual blockInfos1
    +
    +    // Allocate blocks to batch and verify whether the unallocated blocks got allocated
    +    val batchTime1 = manualClock.currentTime
    +    tracker2.allocateBlocksToBatch(batchTime1)
    +    tracker2.getBlocksOfBatch(batchTime1, streamId) shouldEqual blockInfos1
    +
    +    // Add more blocks and allocate to another batch
    +    incrementTime()
    +    val batchTime2 = manualClock.currentTime
    +    val blockInfos2 = addBlockInfos(tracker2)
    +    tracker2.allocateBlocksToBatch(batchTime2)
    +    tracker2.getBlocksOfBatch(batchTime2, streamId) shouldEqual blockInfos2
    +
    +    // Verify whether log has correct contents
    +    val expectedWrittenData2 = expectedWrittenData1 ++
    +      Seq(createBatchAllocation(batchTime1, blockInfos1)) ++
    +      blockInfos2.map(BlockAdditionEvent) ++
    +      Seq(createBatchAllocation(batchTime2, blockInfos2))
    +    getWrittenLogData() shouldEqual expectedWrittenData2
    +
    +    // Restart tracker and verify recovered state
    +    incrementTime()
    +    val tracker3 = createTracker(enableCheckpoint = true, clock = manualClock)
    +    tracker3.getBlocksOfBatch(batchTime1, streamId) shouldEqual blockInfos1
    +    tracker3.getBlocksOfBatch(batchTime2, streamId) shouldEqual blockInfos2
    +    tracker3.getUnallocatedBlocks(streamId) shouldBe empty
    +
    +    // Cleanup first batch but not second batch
    +    val oldestLogFile = getWriteAheadLogFiles().head
    +    incrementTime()
    +    tracker3.cleanupOldBatches(batchTime2)
    +
    +    // Verify that the batch allocations have been cleaned, and the act has been written to log
    +    tracker3.getBlocksOfBatch(batchTime1, streamId) shouldEqual Seq.empty
    +    getWrittenLogData(getWriteAheadLogFiles().last) should contain(createBatchCleanup(batchTime1))
    +
    +    // Verify that at least one log file gets deleted
    +    eventually(timeout(10 seconds), interval(10 millisecond )) {
    --- End diff --
    
    Removed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4029][Streaming] Update streaming drive...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/3026#issuecomment-61714251
  
      [Test build #22895 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22895/consoleFull) for   PR 3026 at commit [`1d704bb`](https://github.com/apache/spark/commit/1d704bbacf5e0b12ecedfa70e61358d171a9b1c0).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4029][Streaming] Update streaming drive...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3026#discussion_r19696817
  
    --- Diff: streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala ---
    @@ -0,0 +1,230 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.streaming
    +
    +import java.io.File
    +
    +import scala.Some
    +import scala.collection.mutable.ArrayBuffer
    +import scala.concurrent.duration._
    +import scala.language.{implicitConversions, postfixOps}
    +import scala.util.Random
    +
    +import com.google.common.io.Files
    +import org.apache.commons.io.FileUtils
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.spark.{Logging, SparkConf}
    +import org.apache.spark.storage.StreamBlockId
    +import org.apache.spark.streaming.scheduler._
    +import org.apache.spark.streaming.util.WriteAheadLogSuite._
    +import org.apache.spark.streaming.util.{WriteAheadLogReader, Clock, ManualClock, SystemClock}
    +import org.apache.spark.util.Utils
    +import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
    +import org.scalatest.concurrent.Eventually._
    +import org.apache.spark.streaming.scheduler.ReceivedBlockInfo
    +import org.apache.spark.storage.StreamBlockId
    +import scala.Some
    --- End diff --
    
    Done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4029][Streaming] Update streaming drive...

Posted by pwendell <gi...@git.apache.org>.
Github user pwendell commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3026#discussion_r19652209
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala ---
    @@ -0,0 +1,207 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.streaming.scheduler
    +
    +import java.nio.ByteBuffer
    +
    +import scala.collection.mutable
    +import scala.language.implicitConversions
    +
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.fs.Path
    +
    +import org.apache.spark.{Logging, SparkConf}
    +import org.apache.spark.storage.StreamBlockId
    +import org.apache.spark.streaming.Time
    +import org.apache.spark.streaming.util.{Clock, WriteAheadLogManager}
    +import org.apache.spark.util.Utils
    +
    +/** Trait representing any action done in the ReceivedBlockTracker */
    --- End diff --
    
    This doc doesn't add much compared with the name itself.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4029][Streaming] Update streaming drive...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/3026#issuecomment-61348394
  
    **[Test build #22652 timed out](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22652/consoleFull)**     for PR 3026 at commit [`47fc1e3`](https://github.com/apache/spark/commit/47fc1e3acb015d119ca9328330418a8afa08d85a)     after a configured wait of `120m`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4029][Streaming] Update streaming drive...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3026#discussion_r19693518
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala ---
    @@ -0,0 +1,205 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.streaming.scheduler
    +
    +import java.nio.ByteBuffer
    +
    +import scala.collection.mutable
    +import scala.language.implicitConversions
    +
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.fs.Path
    +
    +import org.apache.spark.{Logging, SparkConf}
    +import org.apache.spark.streaming.Time
    +import org.apache.spark.streaming.util.{Clock, WriteAheadLogManager}
    +import org.apache.spark.util.Utils
    +
    +/** Trait representing any event in the ReceivedBlockTracker that updates its state. */
    +private[streaming] sealed trait ReceivedBlockTrackerLogEvent
    +
    +private[streaming] case class BlockAdditionEvent(receivedBlockInfo: ReceivedBlockInfo)
    +  extends ReceivedBlockTrackerLogEvent
    +private[streaming] case class BatchAllocationEvent(time: Time, allocatedBlocks: AllocatedBlocks)
    +  extends ReceivedBlockTrackerLogEvent
    +private[streaming] case class BatchCleanupEvent(times: Seq[Time])
    +  extends ReceivedBlockTrackerLogEvent
    +
    +
    +/** Class representing the blocks of all the streams allocated to a batch */
    +private[streaming] case class AllocatedBlocks(streamIdToAllocatedBlocks: Map[Int, Seq[ReceivedBlockInfo]]) {
    +  def getBlockForStream(streamId: Int) = streamIdToAllocatedBlocks(streamId)
    +}
    +
    +/**
    + * Class that keep track of all the received blocks, and allocate them to batches
    + * when required. All actions taken by this class can be saved to a write ahead log
    + * (if a checkpoint directory has been provided), so that the state of the tracker
    + * (received blocks and block-to-batch allocations) can be recovered after driver failure.
    + *
    + * Note that when any instance of this class is created with a checkpoint directory,
    + * it will try reading events from logs in the directory.
    + */
    +private[streaming] class ReceivedBlockTracker(
    +    conf: SparkConf,
    +    hadoopConf: Configuration,
    +    streamIds: Seq[Int],
    +    clock: Clock,
    +    checkpointDirOption: Option[String])
    +  extends Logging {
    +
    +  private type ReceivedBlockQueue = mutable.Queue[ReceivedBlockInfo]
    +  
    +  private val streamIdToUnallocatedBlockQueues = new mutable.HashMap[Int, ReceivedBlockQueue]
    +  private val timeToAllocatedBlocks = new mutable.HashMap[Time, AllocatedBlocks]
    +
    +  private val logManagerRollingIntervalSecs = conf.getInt(
    +    "spark.streaming.receivedBlockTracker.writeAheadLog.rotationIntervalSecs", 60)
    +  private val logManagerOption = checkpointDirOption.map { checkpointDir =>
    +    new WriteAheadLogManager(
    +      ReceivedBlockTracker.checkpointDirToLogDir(checkpointDir),
    +      hadoopConf,
    +      rollingIntervalSecs = logManagerRollingIntervalSecs,
    +      callerName = "ReceivedBlockHandlerMaster",
    +      clock = clock
    +    )
    +  }
    +
    +  // Recover block information from write ahead logs
    +  recoverFromWriteAheadLogs()
    +
    +  /** Add received block. This event will get written to the write ahead log (if enabled). */
    +  def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = synchronized {
    +    try {
    +      writeToLog(BlockAdditionEvent(receivedBlockInfo))
    +      getReceivedBlockQueue(receivedBlockInfo.streamId) += receivedBlockInfo
    +      logDebug(s"Stream ${receivedBlockInfo.streamId} received " +
    +        s"block ${receivedBlockInfo.blockStoreResult.blockId}")
    +      true
    +    } catch {
    +      case e: Exception =>
    +        logError(s"Error adding block $receivedBlockInfo", e)
    +        false
    +    }
    +  }
    +
    +  /**
    +   * Allocate all unallocated blocks to the given batch.
    --- End diff --
    
    I was actually thinking of adding a stronger assertion! If the last allocated batch is time T, then you cannot allocate any batch for time <= T


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4029][Streaming] Update streaming drive...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3026#discussion_r19694469
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala ---
    @@ -84,28 +90,35 @@ class ReceiverTracker(ssc: StreamingContext) extends Logging {
       def stop() = synchronized {
         if (!receiverInputStreams.isEmpty && actor != null) {
           // First, stop the receivers
    -      receiverExecutor.stop()
    +      if (!skipReceiverLaunch) receiverExecutor.stop()
     
           // Finally, stop the actor
           ssc.env.actorSystem.stop(actor)
           actor = null
    +      receivedBlockTracker.stop()
           logInfo("ReceiverTracker stopped")
         }
       }
     
    -  /** Return all the blocks received from a receiver. */
    -  def getReceivedBlockInfo(streamId: Int): Array[ReceivedBlockInfo] = {
    -    val receivedBlockInfo = getReceivedBlockInfoQueue(streamId).dequeueAll(x => true)
    -    logInfo("Stream " + streamId + " received " + receivedBlockInfo.size + " blocks")
    -    receivedBlockInfo.toArray
    +  /** Allocate all unallocated blocks to the given batch. */
    +  def allocateBlocksToBatch(batchTime: Time): Unit = {
    +    if (receiverInputStreams.nonEmpty) {
    +      receivedBlockTracker.allocateBlocksToBatch(batchTime)
    +    }
       }
     
    -  private def getReceivedBlockInfoQueue(streamId: Int) = {
    -    receivedBlockInfo.getOrElseUpdate(streamId, new SynchronizedQueue[ReceivedBlockInfo])
    +  /** Get all the block for batch time . */
    --- End diff --
    
    Nit: extra space before the period.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4029][Streaming] Update streaming drive...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/3026#issuecomment-61185207
  
      [Test build #22571 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22571/consoleFull) for   PR 3026 at commit [`25611d6`](https://github.com/apache/spark/commit/25611d6c29dd4115fc17b4e36bffa33eaac8e2a6).
     * This patch **fails RAT tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class WriteAheadLogBackedBlockRDDPartition(`
      * `class WriteAheadLogBackedBlockRDD[T: ClassTag](`
      * `case class AllocatedBlocks(streamIdToAllocatedBlocks: Map[Int, Seq[ReceivedBlockInfo]]) `
      * `class ReceivedBlockTracker(`
      * `class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false) extends Logging `



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4029][Streaming] Update streaming drive...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/3026#issuecomment-61193354
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/22574/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4029][Streaming] Update streaming drive...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/3026#issuecomment-61208208
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/22587/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4029][Streaming] Update streaming drive...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/3026


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4029][Streaming] Update streaming drive...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/3026#issuecomment-61185195
  
      [Test build #22571 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22571/consoleFull) for   PR 3026 at commit [`25611d6`](https://github.com/apache/spark/commit/25611d6c29dd4115fc17b4e36bffa33eaac8e2a6).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4029][Streaming] Update streaming drive...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/3026#issuecomment-61201570
  
      [Test build #22587 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22587/consoleFull) for   PR 3026 at commit [`19aec7d`](https://github.com/apache/spark/commit/19aec7d35c51b331b130bc7667619de883ac9f0b).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4029][Streaming] Update streaming drive...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/3026#issuecomment-61727187
  
      [Test build #22895 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22895/consoleFull) for   PR 3026 at commit [`1d704bb`](https://github.com/apache/spark/commit/1d704bbacf5e0b12ecedfa70e61358d171a9b1c0).
     * This patch **passes all tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `case class AllocatedBlocks(streamIdToAllocatedBlocks: Map[Int, Seq[ReceivedBlockInfo]]) `
      * `class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false) extends Logging `



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4029][Streaming] Update streaming drive...

Posted by pwendell <gi...@git.apache.org>.
Github user pwendell commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3026#discussion_r19696472
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala ---
    @@ -0,0 +1,205 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.streaming.scheduler
    +
    +import java.nio.ByteBuffer
    +
    +import scala.collection.mutable
    +import scala.language.implicitConversions
    +
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.fs.Path
    +
    +import org.apache.spark.{Logging, SparkConf}
    +import org.apache.spark.streaming.Time
    +import org.apache.spark.streaming.util.{Clock, WriteAheadLogManager}
    +import org.apache.spark.util.Utils
    +
    +/** Trait representing any event in the ReceivedBlockTracker that updates its state. */
    +private[streaming] sealed trait ReceivedBlockTrackerLogEvent
    +
    +private[streaming] case class BlockAdditionEvent(receivedBlockInfo: ReceivedBlockInfo)
    +  extends ReceivedBlockTrackerLogEvent
    +private[streaming] case class BatchAllocationEvent(time: Time, allocatedBlocks: AllocatedBlocks)
    +  extends ReceivedBlockTrackerLogEvent
    +private[streaming] case class BatchCleanupEvent(times: Seq[Time])
    +  extends ReceivedBlockTrackerLogEvent
    +
    +
    +/** Class representing the blocks of all the streams allocated to a batch */
    +private[streaming] case class AllocatedBlocks(streamIdToAllocatedBlocks: Map[Int, Seq[ReceivedBlockInfo]]) {
    +  def getBlockForStream(streamId: Int) = streamIdToAllocatedBlocks(streamId)
    +}
    +
    +/**
    + * Class that keep track of all the received blocks, and allocate them to batches
    + * when required. All actions taken by this class can be saved to a write ahead log
    + * (if a checkpoint directory has been provided), so that the state of the tracker
    + * (received blocks and block-to-batch allocations) can be recovered after driver failure.
    + *
    + * Note that when any instance of this class is created with a checkpoint directory,
    + * it will try reading events from logs in the directory.
    + */
    +private[streaming] class ReceivedBlockTracker(
    +    conf: SparkConf,
    +    hadoopConf: Configuration,
    +    streamIds: Seq[Int],
    +    clock: Clock,
    +    checkpointDirOption: Option[String])
    +  extends Logging {
    +
    +  private type ReceivedBlockQueue = mutable.Queue[ReceivedBlockInfo]
    +  
    +  private val streamIdToUnallocatedBlockQueues = new mutable.HashMap[Int, ReceivedBlockQueue]
    +  private val timeToAllocatedBlocks = new mutable.HashMap[Time, AllocatedBlocks]
    +
    +  private val logManagerRollingIntervalSecs = conf.getInt(
    +    "spark.streaming.receivedBlockTracker.writeAheadLog.rotationIntervalSecs", 60)
    +  private val logManagerOption = checkpointDirOption.map { checkpointDir =>
    +    new WriteAheadLogManager(
    +      ReceivedBlockTracker.checkpointDirToLogDir(checkpointDir),
    +      hadoopConf,
    +      rollingIntervalSecs = logManagerRollingIntervalSecs,
    +      callerName = "ReceivedBlockHandlerMaster",
    +      clock = clock
    +    )
    +  }
    +
    +  // Recover block information from write ahead logs
    +  recoverFromWriteAheadLogs()
    +
    +  /** Add received block. This event will get written to the write ahead log (if enabled). */
    +  def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = synchronized {
    +    try {
    +      writeToLog(BlockAdditionEvent(receivedBlockInfo))
    +      getReceivedBlockQueue(receivedBlockInfo.streamId) += receivedBlockInfo
    +      logDebug(s"Stream ${receivedBlockInfo.streamId} received " +
    +        s"block ${receivedBlockInfo.blockStoreResult.blockId}")
    +      true
    +    } catch {
    +      case e: Exception =>
    +        logError(s"Error adding block $receivedBlockInfo", e)
    +        false
    +    }
    +  }
    +
    +  /**
    +   * Allocate all unallocated blocks to the given batch.
    --- End diff --
    
    Yeah that is a good idea - I think any checking around the calling of this function would be useful.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4029][Streaming] Update streaming drive...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3026#discussion_r19695044
  
    --- Diff: streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala ---
    @@ -0,0 +1,230 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.streaming
    +
    +import java.io.File
    +
    +import scala.Some
    +import scala.collection.mutable.ArrayBuffer
    +import scala.concurrent.duration._
    +import scala.language.{implicitConversions, postfixOps}
    +import scala.util.Random
    +
    +import com.google.common.io.Files
    +import org.apache.commons.io.FileUtils
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.spark.{Logging, SparkConf}
    +import org.apache.spark.storage.StreamBlockId
    +import org.apache.spark.streaming.scheduler._
    +import org.apache.spark.streaming.util.WriteAheadLogSuite._
    +import org.apache.spark.streaming.util.{WriteAheadLogReader, Clock, ManualClock, SystemClock}
    +import org.apache.spark.util.Utils
    +import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
    +import org.scalatest.concurrent.Eventually._
    +import org.apache.spark.streaming.scheduler.ReceivedBlockInfo
    +import org.apache.spark.storage.StreamBlockId
    +import scala.Some
    --- End diff --
    
    This file needs a bit of import-order cleanup.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4029][Streaming] Update streaming drive...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3026#discussion_r19694824
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala ---
    @@ -60,22 +56,35 @@ abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingCont
     
       /** Ask ReceiverInputTracker for received data blocks and generates RDDs with them. */
       override def compute(validTime: Time): Option[RDD[T]] = {
    -    // If this is called for any time before the start time of the context,
    -    // then this returns an empty RDD. This may happen when recovering from a
    -    // master failure
    -    if (validTime >= graph.startTime) {
    -      val blockInfo = ssc.scheduler.receiverTracker.getReceivedBlockInfo(id)
    -      receivedBlockInfo(validTime) = blockInfo
    -      val blockIds = blockInfo.map { _.blockStoreResult.blockId.asInstanceOf[BlockId] }
    -      Some(new BlockRDD[T](ssc.sc, blockIds))
    -    } else {
    -      Some(new BlockRDD[T](ssc.sc, Array.empty))
    +    val blockRDD = {
    +      if (validTime >= graph.startTime) {
    +        val blockStoreResults = getReceivedBlockInfo(validTime).map { _.blockStoreResult }
    +        val blockIds = blockStoreResults.map { _.blockId.asInstanceOf[BlockId] }.toArray
    +        val isWriteAheadLogBased = blockStoreResults.forall {
    --- End diff --
    
    Is it the case that either _all_ `blockStoreResults` are WAL results or none are?  Maybe we can add a stronger assertion here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4029][Streaming] Update streaming drive...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/3026#issuecomment-61186625
  
      [Test build #22572 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22572/consoleFull) for   PR 3026 at commit [`cda62ee`](https://github.com/apache/spark/commit/cda62ee40f34bd4b65c657ed4194780248f6ca83).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4029][Streaming] Update streaming drive...

Posted by pwendell <gi...@git.apache.org>.
Github user pwendell commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3026#discussion_r19696931
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala ---
    @@ -0,0 +1,205 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.streaming.scheduler
    +
    +import java.nio.ByteBuffer
    +
    +import scala.collection.mutable
    +import scala.language.implicitConversions
    +
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.fs.Path
    +
    +import org.apache.spark.{Logging, SparkConf}
    +import org.apache.spark.streaming.Time
    +import org.apache.spark.streaming.util.{Clock, WriteAheadLogManager}
    +import org.apache.spark.util.Utils
    +
    +/** Trait representing any event in the ReceivedBlockTracker that updates its state. */
    +private[streaming] sealed trait ReceivedBlockTrackerLogEvent
    +
    +private[streaming] case class BlockAdditionEvent(receivedBlockInfo: ReceivedBlockInfo)
    +  extends ReceivedBlockTrackerLogEvent
    +private[streaming] case class BatchAllocationEvent(time: Time, allocatedBlocks: AllocatedBlocks)
    +  extends ReceivedBlockTrackerLogEvent
    +private[streaming] case class BatchCleanupEvent(times: Seq[Time])
    +  extends ReceivedBlockTrackerLogEvent
    +
    +
    +/** Class representing the blocks of all the streams allocated to a batch */
    +private[streaming] case class AllocatedBlocks(streamIdToAllocatedBlocks: Map[Int, Seq[ReceivedBlockInfo]]) {
    +  def getBlockForStream(streamId: Int) = streamIdToAllocatedBlocks(streamId)
    +}
    +
    +/**
    + * Class that keep track of all the received blocks, and allocate them to batches
    + * when required. All actions taken by this class can be saved to a write ahead log
    + * (if a checkpoint directory has been provided), so that the state of the tracker
    + * (received blocks and block-to-batch allocations) can be recovered after driver failure.
    + *
    + * Note that when any instance of this class is created with a checkpoint directory,
    + * it will try reading events from logs in the directory.
    + */
    +private[streaming] class ReceivedBlockTracker(
    +    conf: SparkConf,
    +    hadoopConf: Configuration,
    +    streamIds: Seq[Int],
    +    clock: Clock,
    +    checkpointDirOption: Option[String])
    +  extends Logging {
    +
    +  private type ReceivedBlockQueue = mutable.Queue[ReceivedBlockInfo]
    +  
    +  private val streamIdToUnallocatedBlockQueues = new mutable.HashMap[Int, ReceivedBlockQueue]
    +  private val timeToAllocatedBlocks = new mutable.HashMap[Time, AllocatedBlocks]
    +
    +  private val logManagerRollingIntervalSecs = conf.getInt(
    +    "spark.streaming.receivedBlockTracker.writeAheadLog.rotationIntervalSecs", 60)
    +  private val logManagerOption = checkpointDirOption.map { checkpointDir =>
    +    new WriteAheadLogManager(
    +      ReceivedBlockTracker.checkpointDirToLogDir(checkpointDir),
    +      hadoopConf,
    +      rollingIntervalSecs = logManagerRollingIntervalSecs,
    +      callerName = "ReceivedBlockHandlerMaster",
    +      clock = clock
    +    )
    +  }
    +
    +  // Recover block information from write ahead logs
    +  recoverFromWriteAheadLogs()
    +
    +  /** Add received block. This event will get written to the write ahead log (if enabled). */
    +  def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = synchronized {
    +    try {
    +      writeToLog(BlockAdditionEvent(receivedBlockInfo))
    +      getReceivedBlockQueue(receivedBlockInfo.streamId) += receivedBlockInfo
    +      logDebug(s"Stream ${receivedBlockInfo.streamId} received " +
    +        s"block ${receivedBlockInfo.blockStoreResult.blockId}")
    +      true
    +    } catch {
    +      case e: Exception =>
    +        logError(s"Error adding block $receivedBlockInfo", e)
    +        false
    +    }
    +  }
    +
    +  /**
    +   * Allocate all unallocated blocks to the given batch.
    +   * This event will get written to the write ahead log (if enabled).
    +   */
    +  def allocateBlocksToBatch(batchTime: Time): Unit = synchronized {
    +    val allocatedBlocks = AllocatedBlocks(streamIds.map { streamId =>
    --- End diff --
    
    This is a really dense expression. Can this be broken out into simpler experesssions that make it easier to read?
    
    ```
    val streamsWithBlocks = streamIds.map { streamId =>
          (streamId, getReceivedBlockQueue(streamId).dequeueAll(_ => true))
        }
        val streamToBlocks = streamsWithBlocks.toMap
        val allocatedBlocks = AllocatedBlocks(streamToBlocks)
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4029][Streaming] Update streaming drive...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/3026#issuecomment-61340155
  
      [Test build #22654 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22654/consoleFull) for   PR 3026 at commit [`2ee2484`](https://github.com/apache/spark/commit/2ee24842b5a5160bba4af9052406391cfe521b62).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4029][Streaming] Update streaming drive...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/3026#issuecomment-61727194
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/22895/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4029][Streaming] Update streaming drive...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the pull request:

    https://github.com/apache/spark/pull/3026#issuecomment-61185191
  
    @pwendell @JoshRosen @harishreedharan 
    This is the final PR of the driver HA core feature. Please take a look! 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4029][Streaming] Update streaming drive...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/3026#issuecomment-61207329
  
      [Test build #22585 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22585/consoleFull) for   PR 3026 at commit [`19aec7d`](https://github.com/apache/spark/commit/19aec7d35c51b331b130bc7667619de883ac9f0b).
     * This patch **passes all tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `case class AllocatedBlocks(streamIdToAllocatedBlocks: Map[Int, Seq[ReceivedBlockInfo]]) `
      * `class ReceivedBlockTracker(`
      * `class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false) extends Logging `



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4029][Streaming] Update streaming drive...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the pull request:

    https://github.com/apache/spark/pull/3026#issuecomment-61743807
  
    @pwendell added comment. Merging this. Thanks @pwendell and @JoshRosen for reviewing this PR and the previous ones for the streaming driver HA. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4029][Streaming] Update streaming drive...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3026#discussion_r19696873
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala ---
    @@ -60,22 +56,35 @@ abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingCont
     
       /** Ask ReceiverInputTracker for received data blocks and generates RDDs with them. */
       override def compute(validTime: Time): Option[RDD[T]] = {
    -    // If this is called for any time before the start time of the context,
    -    // then this returns an empty RDD. This may happen when recovering from a
    -    // master failure
    -    if (validTime >= graph.startTime) {
    -      val blockInfo = ssc.scheduler.receiverTracker.getReceivedBlockInfo(id)
    -      receivedBlockInfo(validTime) = blockInfo
    -      val blockIds = blockInfo.map { _.blockStoreResult.blockId.asInstanceOf[BlockId] }
    -      Some(new BlockRDD[T](ssc.sc, blockIds))
    -    } else {
    -      Some(new BlockRDD[T](ssc.sc, Array.empty))
    +    val blockRDD = {
    +      if (validTime >= graph.startTime) {
    +        val blockStoreResults = getReceivedBlockInfo(validTime).map { _.blockStoreResult }
    +        val blockIds = blockStoreResults.map { _.blockId.asInstanceOf[BlockId] }.toArray
    +        val isWriteAheadLogBased = blockStoreResults.forall {
    +          _.isInstanceOf[WriteAheadLogBasedStoreResult]
    +        }
    +        if (isWriteAheadLogBased) {
    +          val logSegments = blockStoreResults.map {
    +            _.asInstanceOf[WriteAheadLogBasedStoreResult].segment
    +          }.toArray
    +          new WriteAheadLogBackedBlockRDD[T](ssc.sparkContext,
    +            blockIds, logSegments, storeInBlockManager = false, StorageLevel.MEMORY_ONLY_SER)
    --- End diff --
    
    Yeah it doesnt. I added comment and made it StorageLevel.NONE


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4029][Streaming] Update streaming drive...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/3026#issuecomment-61185212
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/22571/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4029][Streaming] Update streaming drive...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/3026#issuecomment-61187338
  
      [Test build #22574 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22574/consoleFull) for   PR 3026 at commit [`f66d277`](https://github.com/apache/spark/commit/f66d277e7d42a869e10f0cbbb1d12e0d8f998384).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4029][Streaming] Update streaming drive...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/3026#issuecomment-61193346
  
      [Test build #22574 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22574/consoleFull) for   PR 3026 at commit [`f66d277`](https://github.com/apache/spark/commit/f66d277e7d42a869e10f0cbbb1d12e0d8f998384).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `case class AllocatedBlocks(streamIdToAllocatedBlocks: Map[Int, Seq[ReceivedBlockInfo]]) `
      * `class ReceivedBlockTracker(`
      * `class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false) extends Logging `



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4029][Streaming] Update streaming drive...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3026#discussion_r19694419
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala ---
    @@ -48,23 +49,28 @@ private[streaming] case class DeregisterReceiver(streamId: Int, msg: String, err
      * This class manages the execution of the receivers of NetworkInputDStreams. Instance of
      * this class must be created after all input streams have been added and StreamingContext.start()
      * has been called because it needs the final set of input streams at the time of instantiation.
    + *
    + * @param skipReceiverLaunch Do not launch the receiver. This is useful for testing.
      */
     private[streaming]
    -class ReceiverTracker(ssc: StreamingContext) extends Logging {
    +class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false) extends Logging {
     
    -  val receiverInputStreams = ssc.graph.getReceiverInputStreams()
    -  val receiverInputStreamMap = Map(receiverInputStreams.map(x => (x.id, x)): _*)
    -  val receiverExecutor = new ReceiverLauncher()
    -  val receiverInfo = new HashMap[Int, ReceiverInfo] with SynchronizedMap[Int, ReceiverInfo]
    -  val receivedBlockInfo = new HashMap[Int, SynchronizedQueue[ReceivedBlockInfo]]
    -    with SynchronizedMap[Int, SynchronizedQueue[ReceivedBlockInfo]]
    -  val timeout = AkkaUtils.askTimeout(ssc.conf)
    -  val listenerBus = ssc.scheduler.listenerBus
    +  private val receiverInputStreams = ssc.graph.getReceiverInputStreams()
    +  private val receiverInputStreamMap = Map(receiverInputStreams.map(x => (x.id, x)): _*)
    --- End diff --
    
    could this just be `.toMap()` instead?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4029][Streaming] Update streaming drive...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3026#discussion_r19647630
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala ---
    @@ -48,23 +49,28 @@ private[streaming] case class DeregisterReceiver(streamId: Int, msg: String, err
      * This class manages the execution of the receivers of NetworkInputDStreams. Instance of
      * this class must be created after all input streams have been added and StreamingContext.start()
      * has been called because it needs the final set of input streams at the time of instantiation.
    + *
    + * @param skipReceiverLaunch Do not launch the receiver. This is useful for testing.
      */
     private[streaming]
    -class ReceiverTracker(ssc: StreamingContext) extends Logging {
    +class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false) extends Logging {
     
    -  val receiverInputStreams = ssc.graph.getReceiverInputStreams()
    -  val receiverInputStreamMap = Map(receiverInputStreams.map(x => (x.id, x)): _*)
    -  val receiverExecutor = new ReceiverLauncher()
    -  val receiverInfo = new HashMap[Int, ReceiverInfo] with SynchronizedMap[Int, ReceiverInfo]
    -  val receivedBlockInfo = new HashMap[Int, SynchronizedQueue[ReceivedBlockInfo]]
    --- End diff --
    
    All the functionality to keep track of received block metadata have been moved from `ReceiverTracker` to `ReceivedBlockTracker`, so that all actions on the block metadata (include block-to-batch allocations) can be logged at a central location.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4029][Streaming] Update streaming drive...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3026#discussion_r19653884
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala ---
    @@ -0,0 +1,207 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.streaming.scheduler
    +
    +import java.nio.ByteBuffer
    +
    +import scala.collection.mutable
    +import scala.language.implicitConversions
    +
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.fs.Path
    +
    +import org.apache.spark.{Logging, SparkConf}
    +import org.apache.spark.storage.StreamBlockId
    +import org.apache.spark.streaming.Time
    +import org.apache.spark.streaming.util.{Clock, WriteAheadLogManager}
    +import org.apache.spark.util.Utils
    +
    +/** Trait representing any action done in the ReceivedBlockTracker */
    +private[streaming] sealed trait ReceivedBlockTrackerAction
    +
    +private[streaming] case class BlockAddition(receivedBlockInfo: ReceivedBlockInfo)
    --- End diff --
    
    Alright!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4029][Streaming] Update streaming drive...

Posted by harishreedharan <gi...@git.apache.org>.
Github user harishreedharan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3026#discussion_r19699221
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala ---
    @@ -58,24 +54,45 @@ abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingCont
     
       def stop() {}
     
    -  /** Ask ReceiverInputTracker for received data blocks and generates RDDs with them. */
    +  /**
    +   * Generates RDDs with blocks received by the receiver of this stream. */
       override def compute(validTime: Time): Option[RDD[T]] = {
    -    // If this is called for any time before the start time of the context,
    -    // then this returns an empty RDD. This may happen when recovering from a
    -    // master failure
    -    if (validTime >= graph.startTime) {
    -      val blockInfo = ssc.scheduler.receiverTracker.getReceivedBlockInfo(id)
    -      receivedBlockInfo(validTime) = blockInfo
    -      val blockIds = blockInfo.map { _.blockStoreResult.blockId.asInstanceOf[BlockId] }
    -      Some(new BlockRDD[T](ssc.sc, blockIds))
    -    } else {
    -      Some(new BlockRDD[T](ssc.sc, Array.empty))
    -    }
    -  }
    +    val blockRDD = {
     
    -  /** Get information on received blocks. */
    -  private[streaming] def getReceivedBlockInfo(time: Time) = {
    -    receivedBlockInfo.get(time).getOrElse(Array.empty[ReceivedBlockInfo])
    +      if (validTime < graph.startTime) {
    +        // If this is called for any time before the start time of the context,
    +        // then this returns an empty RDD. This may happen when recovering from a
    +        // driver failure without any write ahead log to recover pre-failure data.
    +        new BlockRDD[T](ssc.sc, Array.empty)
    +      } else {
    +        // Otherwise, ask the tracker for all the blocks that have been allocated to this stream
    +        // for this batch
    +        val blockInfos =
    +          ssc.scheduler.receiverTracker.getBlocksOfBatch(validTime).get(id).getOrElse(Seq.empty)
    --- End diff --
    
    Ignore this. Verified it does not.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4029][Streaming] Update streaming drive...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3026#discussion_r19653892
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala ---
    @@ -0,0 +1,207 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.streaming.scheduler
    +
    +import java.nio.ByteBuffer
    +
    +import scala.collection.mutable
    +import scala.language.implicitConversions
    +
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.fs.Path
    +
    +import org.apache.spark.{Logging, SparkConf}
    +import org.apache.spark.storage.StreamBlockId
    +import org.apache.spark.streaming.Time
    +import org.apache.spark.streaming.util.{Clock, WriteAheadLogManager}
    +import org.apache.spark.util.Utils
    +
    +/** Trait representing any action done in the ReceivedBlockTracker */
    +private[streaming] sealed trait ReceivedBlockTrackerAction
    +
    +private[streaming] case class BlockAddition(receivedBlockInfo: ReceivedBlockInfo)
    +  extends ReceivedBlockTrackerAction
    +private[streaming] case class BatchAllocations(time: Time, allocatedBlocks: AllocatedBlocks)
    --- End diff --
    
    I am fine with either.  I will let @JoshRosen chime in and be the tie breaker ;)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4029][Streaming] Update streaming drive...

Posted by pwendell <gi...@git.apache.org>.
Github user pwendell commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3026#discussion_r19696631
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala ---
    @@ -0,0 +1,207 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.streaming.scheduler
    +
    +import java.nio.ByteBuffer
    +
    +import scala.collection.mutable
    +import scala.language.implicitConversions
    +
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.fs.Path
    +
    +import org.apache.spark.{Logging, SparkConf}
    +import org.apache.spark.storage.StreamBlockId
    +import org.apache.spark.streaming.Time
    +import org.apache.spark.streaming.util.{Clock, WriteAheadLogManager}
    +import org.apache.spark.util.Utils
    +
    +/** Trait representing any action done in the ReceivedBlockTracker */
    +private[streaming] sealed trait ReceivedBlockTrackerAction
    +
    +private[streaming] case class BlockAddition(receivedBlockInfo: ReceivedBlockInfo)
    +  extends ReceivedBlockTrackerAction
    +private[streaming] case class BatchAllocations(time: Time, allocatedBlocks: AllocatedBlocks)
    --- End diff --
    
    @JoshRosen any opinions one way or the other?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4029][Streaming] Update streaming drive...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3026#discussion_r19689273
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala ---
    @@ -0,0 +1,207 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.streaming.scheduler
    +
    +import java.nio.ByteBuffer
    +
    +import scala.collection.mutable
    +import scala.language.implicitConversions
    +
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.fs.Path
    +
    +import org.apache.spark.{Logging, SparkConf}
    +import org.apache.spark.storage.StreamBlockId
    +import org.apache.spark.streaming.Time
    +import org.apache.spark.streaming.util.{Clock, WriteAheadLogManager}
    +import org.apache.spark.util.Utils
    +
    +/** Trait representing any action done in the ReceivedBlockTracker */
    +private[streaming] sealed trait ReceivedBlockTrackerAction
    +
    +private[streaming] case class BlockAddition(receivedBlockInfo: ReceivedBlockInfo)
    +  extends ReceivedBlockTrackerAction
    +private[streaming] case class BatchAllocations(time: Time, allocatedBlocks: AllocatedBlocks)
    +  extends ReceivedBlockTrackerAction
    +private[streaming] case class BatchCleanup(times: Seq[Time])
    +  extends ReceivedBlockTrackerAction
    +
    +
    +/** Class representing the blocks of all the streams allocated to a batch */
    +case class AllocatedBlocks(streamIdToAllocatedBlocks: Map[Int, Seq[ReceivedBlockInfo]]) {
    +  def apply(streamId: Int) = streamIdToAllocatedBlocks(streamId)
    +}
    +
    +/**
    + * Class that keep track of all the received blocks, and allocate them to batches
    + * when required. All actions taken by this class can be saved to a write ahead log,
    + * so that the state of the tracker (received blocks and block-to-batch allocations)
    + * can be recovered after driver failure.
    + */
    +private[streaming]
    +class ReceivedBlockTracker(
    +    conf: SparkConf, hadoopConf: Configuration, streamIds: Seq[Int], clock: Clock,
    +    checkpointDirOption: Option[String]) extends Logging {
    +
    +  private type ReceivedBlockQueue = mutable.Queue[ReceivedBlockInfo]
    +  
    +  private val streamIdToUnallocatedBlockInfo = new mutable.HashMap[Int, ReceivedBlockQueue]
    +  private val timeToAllocatedBlockInfo = new mutable.HashMap[Time, AllocatedBlocks]
    +
    +  private val logManagerRollingIntervalSecs = conf.getInt(
    +    "spark.streaming.receivedBlockTracker.writeAheadLog.rotationIntervalSecs", 60)
    +  private val logManagerOption = checkpointDirOption.map { checkpointDir =>
    +    new WriteAheadLogManager(
    +      ReceivedBlockTracker.checkpointDirToLogDir(checkpointDir),
    +      hadoopConf,
    +      rollingIntervalSecs = logManagerRollingIntervalSecs,
    +      callerName = "ReceivedBlockHandlerMaster",
    +      clock = clock
    +    )
    +  }
    +
    +  // Recover block information from write ahead logs
    +  recoverFromWriteAheadLogs()
    +
    +  /** Add received block */
    +  def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = synchronized {
    +    try {
    +      writeToLog(BlockAddition(receivedBlockInfo))
    +      getReceivedBlockQueue(receivedBlockInfo.streamId) += receivedBlockInfo
    +      logDebug(s"Stream ${receivedBlockInfo.streamId} received " +
    +        s"block ${receivedBlockInfo.blockStoreResult.blockId}")
    +      true
    +    } catch {
    +      case e: Exception =>
    +        logError("Error adding block " + receivedBlockInfo, e)
    +        false
    +    }
    +  }
    +
    +  /** Get blocks that have been added but not yet allocated to any batch */
    +  def getUnallocatedBlocks(streamId: Int): Seq[ReceivedBlockInfo] = synchronized {
    +    getReceivedBlockQueue(streamId).toSeq
    +  } 
    +
    +  /** Get the blocks allocated to a batch, or allocate blocks to the batch and then get them */
    +  def getOrAllocateBlocksToBatch(batchTime: Time, streamId: Int): Seq[ReceivedBlockInfo] = {
    +    synchronized {
    +      if (!timeToAllocatedBlockInfo.contains(batchTime)) {
    +        allocateAllUnallocatedBlocksToBatch(batchTime)
    +      }
    +      timeToAllocatedBlockInfo(batchTime)(streamId)
    +    }
    +  }
    +
    +  /** Check if any blocks are left to be allocated to batches */
    +  def hasUnallocatedReceivedBlocks(): Boolean = synchronized {
    +    !streamIdToUnallocatedBlockInfo.values.forall(_.isEmpty)
    +  }
    +
    +  /** Clean up block information of old batches */
    +  def cleanupOldBatches(cleanupThreshTime: Time): Unit = synchronized {
    +    assert(cleanupThreshTime.milliseconds < clock.currentTime())
    +    val timesToCleanup = timeToAllocatedBlockInfo.keys.filter { _ < cleanupThreshTime }.toSeq
    +    logInfo("Deleting batches " + timesToCleanup)
    +    writeToLog(BatchCleanup(timesToCleanup))
    +    timeToAllocatedBlockInfo --= timesToCleanup
    +    logManagerOption.foreach(_.cleanupOldLogs(cleanupThreshTime.milliseconds))
    +    log
    +  }
    +
    +  /** Stop the block tracker */
    +  def stop() {
    +    logManagerOption.foreach { _.stop() }
    +  }
    +
    +  /** Allocate all unallocated blocks to the given batch */
    +  private def allocateAllUnallocatedBlocksToBatch(batchTime: Time): AllocatedBlocks = synchronized {
    +    val allocatedBlockInfos = AllocatedBlocks(streamIds.map { streamId =>
    +      (streamId, getReceivedBlockQueue(streamId).dequeueAll(x => true))
    +    }.toMap)
    +    writeToLog(BatchAllocations(batchTime, allocatedBlockInfos))
    +    timeToAllocatedBlockInfo(batchTime) = allocatedBlockInfos
    +    allocatedBlockInfos
    +  }
    +
    +  /**
    +   * Recover all the tracker actions from the write ahead logs to recover the state (unallocated
    +   * and allocated block info) prior to failure
    +   */
    +  private def recoverFromWriteAheadLogs(): Unit = synchronized {
    +    logInfo("Recovering from checkpoint")
    +
    +    // Insert the recovered block information
    +    def insertAddedBlock(receivedBlockInfo: ReceivedBlockInfo) {
    +      logTrace(s"Recovery: Inserting added block $receivedBlockInfo")
    +      // println(s"Recovery: Inserting added block $receivedBlockInfo")
    +      getReceivedBlockQueue(receivedBlockInfo.streamId) += receivedBlockInfo
    +    }
    +
    +    // Insert the recovered block-to-batch allocations and clear the queue of received blocks
    +    // (when the blocks were originally allocated to the batch, the queue must have been cleared).
    +    def insertAllocatedBatch(time: Time, allocatedBlocks: AllocatedBlocks) {
    +      logTrace(s"Recovery: Inserting allocated batch for time $time to " +
    +        s"${allocatedBlocks.streamIdToAllocatedBlocks}")
    +      // println(s"Recovery: Inserting allocated batch for time $time to " +
    +      // s"${allocatedBlocks.streamIdToAllocatedBlocks}")
    +      streamIdToUnallocatedBlockInfo.values.foreach { _.clear() }
    +      timeToAllocatedBlockInfo.put(time, allocatedBlocks)
    +    }
    +
    +    // Cleanup the batch allocations
    +    def cleanupBatches(batchTimes: Seq[Time]) {
    +      logTrace(s"Recovery: Cleaning up batches $batchTimes")
    +      // println(s"Recovery: Cleaning up batches ${batchTimes}")
    --- End diff --
    
    Removed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4029][Streaming] Update streaming drive...

Posted by pwendell <gi...@git.apache.org>.
Github user pwendell commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3026#discussion_r19652783
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala ---
    @@ -0,0 +1,207 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.streaming.scheduler
    +
    +import java.nio.ByteBuffer
    +
    +import scala.collection.mutable
    +import scala.language.implicitConversions
    +
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.fs.Path
    +
    +import org.apache.spark.{Logging, SparkConf}
    +import org.apache.spark.storage.StreamBlockId
    +import org.apache.spark.streaming.Time
    +import org.apache.spark.streaming.util.{Clock, WriteAheadLogManager}
    +import org.apache.spark.util.Utils
    +
    +/** Trait representing any action done in the ReceivedBlockTracker */
    +private[streaming] sealed trait ReceivedBlockTrackerAction
    +
    +private[streaming] case class BlockAddition(receivedBlockInfo: ReceivedBlockInfo)
    +  extends ReceivedBlockTrackerAction
    +private[streaming] case class BatchAllocations(time: Time, allocatedBlocks: AllocatedBlocks)
    +  extends ReceivedBlockTrackerAction
    +private[streaming] case class BatchCleanup(times: Seq[Time])
    +  extends ReceivedBlockTrackerAction
    +
    +
    +/** Class representing the blocks of all the streams allocated to a batch */
    +case class AllocatedBlocks(streamIdToAllocatedBlocks: Map[Int, Seq[ReceivedBlockInfo]]) {
    +  def apply(streamId: Int) = streamIdToAllocatedBlocks(streamId)
    +}
    +
    +/**
    + * Class that keep track of all the received blocks, and allocate them to batches
    + * when required. All actions taken by this class can be saved to a write ahead log,
    + * so that the state of the tracker (received blocks and block-to-batch allocations)
    + * can be recovered after driver failure.
    + */
    +private[streaming]
    +class ReceivedBlockTracker(
    +    conf: SparkConf, hadoopConf: Configuration, streamIds: Seq[Int], clock: Clock,
    +    checkpointDirOption: Option[String]) extends Logging {
    +
    +  private type ReceivedBlockQueue = mutable.Queue[ReceivedBlockInfo]
    +  
    +  private val streamIdToUnallocatedBlockInfo = new mutable.HashMap[Int, ReceivedBlockQueue]
    +  private val timeToAllocatedBlockInfo = new mutable.HashMap[Time, AllocatedBlocks]
    +
    +  private val logManagerRollingIntervalSecs = conf.getInt(
    +    "spark.streaming.receivedBlockTracker.writeAheadLog.rotationIntervalSecs", 60)
    +  private val logManagerOption = checkpointDirOption.map { checkpointDir =>
    +    new WriteAheadLogManager(
    +      ReceivedBlockTracker.checkpointDirToLogDir(checkpointDir),
    +      hadoopConf,
    +      rollingIntervalSecs = logManagerRollingIntervalSecs,
    +      callerName = "ReceivedBlockHandlerMaster",
    +      clock = clock
    +    )
    +  }
    +
    +  // Recover block information from write ahead logs
    +  recoverFromWriteAheadLogs()
    +
    +  /** Add received block */
    +  def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = synchronized {
    +    try {
    +      writeToLog(BlockAddition(receivedBlockInfo))
    +      getReceivedBlockQueue(receivedBlockInfo.streamId) += receivedBlockInfo
    +      logDebug(s"Stream ${receivedBlockInfo.streamId} received " +
    +        s"block ${receivedBlockInfo.blockStoreResult.blockId}")
    +      true
    +    } catch {
    +      case e: Exception =>
    +        logError("Error adding block " + receivedBlockInfo, e)
    +        false
    +    }
    +  }
    +
    +  /** Get blocks that have been added but not yet allocated to any batch */
    +  def getUnallocatedBlocks(streamId: Int): Seq[ReceivedBlockInfo] = synchronized {
    +    getReceivedBlockQueue(streamId).toSeq
    +  } 
    +
    +  /** Get the blocks allocated to a batch, or allocate blocks to the batch and then get them */
    +  def getOrAllocateBlocksToBatch(batchTime: Time, streamId: Int): Seq[ReceivedBlockInfo] = {
    +    synchronized {
    +      if (!timeToAllocatedBlockInfo.contains(batchTime)) {
    +        allocateAllUnallocatedBlocksToBatch(batchTime)
    +      }
    +      timeToAllocatedBlockInfo(batchTime)(streamId)
    +    }
    +  }
    +
    +  /** Check if any blocks are left to be allocated to batches */
    +  def hasUnallocatedReceivedBlocks(): Boolean = synchronized {
    +    !streamIdToUnallocatedBlockInfo.values.forall(_.isEmpty)
    +  }
    +
    +  /** Clean up block information of old batches */
    +  def cleanupOldBatches(cleanupThreshTime: Time): Unit = synchronized {
    +    assert(cleanupThreshTime.milliseconds < clock.currentTime())
    +    val timesToCleanup = timeToAllocatedBlockInfo.keys.filter { _ < cleanupThreshTime }.toSeq
    +    logInfo("Deleting batches " + timesToCleanup)
    +    writeToLog(BatchCleanup(timesToCleanup))
    +    timeToAllocatedBlockInfo --= timesToCleanup
    +    logManagerOption.foreach(_.cleanupOldLogs(cleanupThreshTime.milliseconds))
    +    log
    +  }
    +
    +  /** Stop the block tracker */
    +  def stop() {
    +    logManagerOption.foreach { _.stop() }
    +  }
    +
    +  /** Allocate all unallocated blocks to the given batch */
    +  private def allocateAllUnallocatedBlocksToBatch(batchTime: Time): AllocatedBlocks = synchronized {
    +    val allocatedBlockInfos = AllocatedBlocks(streamIds.map { streamId =>
    +      (streamId, getReceivedBlockQueue(streamId).dequeueAll(x => true))
    +    }.toMap)
    +    writeToLog(BatchAllocations(batchTime, allocatedBlockInfos))
    +    timeToAllocatedBlockInfo(batchTime) = allocatedBlockInfos
    +    allocatedBlockInfos
    +  }
    +
    +  /**
    +   * Recover all the tracker actions from the write ahead logs to recover the state (unallocated
    +   * and allocated block info) prior to failure
    +   */
    +  private def recoverFromWriteAheadLogs(): Unit = synchronized {
    +    logInfo("Recovering from checkpoint")
    +
    +    // Insert the recovered block information
    +    def insertAddedBlock(receivedBlockInfo: ReceivedBlockInfo) {
    +      logTrace(s"Recovery: Inserting added block $receivedBlockInfo")
    +      // println(s"Recovery: Inserting added block $receivedBlockInfo")
    +      getReceivedBlockQueue(receivedBlockInfo.streamId) += receivedBlockInfo
    +    }
    +
    +    // Insert the recovered block-to-batch allocations and clear the queue of received blocks
    +    // (when the blocks were originally allocated to the batch, the queue must have been cleared).
    +    def insertAllocatedBatch(time: Time, allocatedBlocks: AllocatedBlocks) {
    +      logTrace(s"Recovery: Inserting allocated batch for time $time to " +
    +        s"${allocatedBlocks.streamIdToAllocatedBlocks}")
    +      // println(s"Recovery: Inserting allocated batch for time $time to " +
    +      // s"${allocatedBlocks.streamIdToAllocatedBlocks}")
    +      streamIdToUnallocatedBlockInfo.values.foreach { _.clear() }
    +      timeToAllocatedBlockInfo.put(time, allocatedBlocks)
    +    }
    +
    +    // Cleanup the batch allocations
    +    def cleanupBatches(batchTimes: Seq[Time]) {
    +      logTrace(s"Recovery: Cleaning up batches $batchTimes")
    +      // println(s"Recovery: Cleaning up batches ${batchTimes}")
    --- End diff --
    
    commented code


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4029][Streaming] Update streaming drive...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3026#discussion_r19693151
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala ---
    @@ -0,0 +1,205 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.streaming.scheduler
    +
    +import java.nio.ByteBuffer
    +
    +import scala.collection.mutable
    +import scala.language.implicitConversions
    +
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.fs.Path
    +
    +import org.apache.spark.{Logging, SparkConf}
    +import org.apache.spark.streaming.Time
    +import org.apache.spark.streaming.util.{Clock, WriteAheadLogManager}
    +import org.apache.spark.util.Utils
    +
    +/** Trait representing any event in the ReceivedBlockTracker that updates its state. */
    +private[streaming] sealed trait ReceivedBlockTrackerLogEvent
    +
    +private[streaming] case class BlockAdditionEvent(receivedBlockInfo: ReceivedBlockInfo)
    +  extends ReceivedBlockTrackerLogEvent
    +private[streaming] case class BatchAllocationEvent(time: Time, allocatedBlocks: AllocatedBlocks)
    +  extends ReceivedBlockTrackerLogEvent
    +private[streaming] case class BatchCleanupEvent(times: Seq[Time])
    +  extends ReceivedBlockTrackerLogEvent
    +
    +
    +/** Class representing the blocks of all the streams allocated to a batch */
    +private[streaming] case class AllocatedBlocks(streamIdToAllocatedBlocks: Map[Int, Seq[ReceivedBlockInfo]]) {
    +  def getBlockForStream(streamId: Int) = streamIdToAllocatedBlocks(streamId)
    +}
    +
    +/**
    + * Class that keep track of all the received blocks, and allocate them to batches
    + * when required. All actions taken by this class can be saved to a write ahead log
    + * (if a checkpoint directory has been provided), so that the state of the tracker
    + * (received blocks and block-to-batch allocations) can be recovered after driver failure.
    + *
    + * Note that when any instance of this class is created with a checkpoint directory,
    + * it will try reading events from logs in the directory.
    + */
    +private[streaming] class ReceivedBlockTracker(
    +    conf: SparkConf,
    +    hadoopConf: Configuration,
    +    streamIds: Seq[Int],
    +    clock: Clock,
    +    checkpointDirOption: Option[String])
    +  extends Logging {
    +
    +  private type ReceivedBlockQueue = mutable.Queue[ReceivedBlockInfo]
    +  
    +  private val streamIdToUnallocatedBlockQueues = new mutable.HashMap[Int, ReceivedBlockQueue]
    +  private val timeToAllocatedBlocks = new mutable.HashMap[Time, AllocatedBlocks]
    +
    +  private val logManagerRollingIntervalSecs = conf.getInt(
    +    "spark.streaming.receivedBlockTracker.writeAheadLog.rotationIntervalSecs", 60)
    +  private val logManagerOption = checkpointDirOption.map { checkpointDir =>
    +    new WriteAheadLogManager(
    +      ReceivedBlockTracker.checkpointDirToLogDir(checkpointDir),
    +      hadoopConf,
    +      rollingIntervalSecs = logManagerRollingIntervalSecs,
    +      callerName = "ReceivedBlockHandlerMaster",
    +      clock = clock
    +    )
    +  }
    +
    +  // Recover block information from write ahead logs
    +  recoverFromWriteAheadLogs()
    +
    +  /** Add received block. This event will get written to the write ahead log (if enabled). */
    +  def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = synchronized {
    +    try {
    +      writeToLog(BlockAdditionEvent(receivedBlockInfo))
    +      getReceivedBlockQueue(receivedBlockInfo.streamId) += receivedBlockInfo
    +      logDebug(s"Stream ${receivedBlockInfo.streamId} received " +
    +        s"block ${receivedBlockInfo.blockStoreResult.blockId}")
    +      true
    +    } catch {
    +      case e: Exception =>
    +        logError(s"Error adding block $receivedBlockInfo", e)
    +        false
    +    }
    +  }
    +
    +  /**
    +   * Allocate all unallocated blocks to the given batch.
    +   * This event will get written to the write ahead log (if enabled).
    +   */
    +  def allocateBlocksToBatch(batchTime: Time): Unit = synchronized {
    +    val allocatedBlocks = AllocatedBlocks(streamIds.map { streamId =>
    +        (streamId, getReceivedBlockQueue(streamId).dequeueAll(x => true))
    +    }.toMap)
    +    writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))
    +    timeToAllocatedBlocks(batchTime) = allocatedBlocks
    +    allocatedBlocks
    +  }
    +
    +  /** Get blocks that have been added but not yet allocated to any batch. */
    +  def getUnallocatedBlocks(streamId: Int): Seq[ReceivedBlockInfo] = synchronized {
    +    getReceivedBlockQueue(streamId).toSeq
    +  } 
    +
    +  /** Get the blocks allocated to a batch, or allocate blocks to the batch and then get them. */
    --- End diff --
    
    Dang, forgot to update, good catch!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4029][Streaming] Update streaming drive...

Posted by pwendell <gi...@git.apache.org>.
Github user pwendell commented on the pull request:

    https://github.com/apache/spark/pull/3026#issuecomment-61222814
  
    Hey @tdas I did a first pass on this. This makes sense to me overall and seems like a straightforward next step in the design. I had a few comments about naming. The main issue I noticed is that having a single function `getOrAllocateBlocksToBatch` was very hard to reason about and the semantics seemed like they were lost up the call chain. I wonder if it can be improved.
    
    All that said, I was just reviewing this slice of a larger change and I haven't been able yet to sit down and I haven't done a thorough vetting of the larger fault tolerance guarentees (e.g. think about corner cases during failures, etc). I will try to do that tomorrow. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4029][Streaming] Update streaming drive...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3026#discussion_r19694528
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala ---
    @@ -60,22 +56,35 @@ abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingCont
     
       /** Ask ReceiverInputTracker for received data blocks and generates RDDs with them. */
       override def compute(validTime: Time): Option[RDD[T]] = {
    -    // If this is called for any time before the start time of the context,
    -    // then this returns an empty RDD. This may happen when recovering from a
    -    // master failure
    -    if (validTime >= graph.startTime) {
    -      val blockInfo = ssc.scheduler.receiverTracker.getReceivedBlockInfo(id)
    -      receivedBlockInfo(validTime) = blockInfo
    -      val blockIds = blockInfo.map { _.blockStoreResult.blockId.asInstanceOf[BlockId] }
    -      Some(new BlockRDD[T](ssc.sc, blockIds))
    -    } else {
    -      Some(new BlockRDD[T](ssc.sc, Array.empty))
    +    val blockRDD = {
    +      if (validTime >= graph.startTime) {
    +        val blockStoreResults = getReceivedBlockInfo(validTime).map { _.blockStoreResult }
    +        val blockIds = blockStoreResults.map { _.blockId.asInstanceOf[BlockId] }.toArray
    +        val isWriteAheadLogBased = blockStoreResults.forall {
    +          _.isInstanceOf[WriteAheadLogBasedStoreResult]
    +        }
    +        if (isWriteAheadLogBased) {
    +          val logSegments = blockStoreResults.map {
    +            _.asInstanceOf[WriteAheadLogBasedStoreResult].segment
    +          }.toArray
    +          new WriteAheadLogBackedBlockRDD[T](ssc.sparkContext,
    +            blockIds, logSegments, storeInBlockManager = false, StorageLevel.MEMORY_ONLY_SER)
    +        } else {
    +          new BlockRDD[T](ssc.sc, blockIds)
    +        }
    +      } else {
    +        // If this is called for any time before the start time of the context,
    +        // then this returns an empty RDD. This may happen when recovering from a
    +        // driver failure, a
    --- End diff --
    
    This comment looks incomplete?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4029][Streaming] Update streaming drive...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3026#discussion_r19697216
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala ---
    @@ -0,0 +1,229 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.streaming.scheduler
    +
    +import java.nio.ByteBuffer
    +
    +import scala.collection.mutable
    +import scala.language.implicitConversions
    +
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.fs.Path
    +
    +import org.apache.spark.{SparkException, Logging, SparkConf}
    +import org.apache.spark.streaming.Time
    +import org.apache.spark.streaming.util.{Clock, WriteAheadLogManager}
    +import org.apache.spark.util.Utils
    +
    +/** Trait representing any event in the ReceivedBlockTracker that updates its state. */
    +private[streaming] sealed trait ReceivedBlockTrackerLogEvent
    +
    +private[streaming] case class BlockAdditionEvent(receivedBlockInfo: ReceivedBlockInfo)
    +  extends ReceivedBlockTrackerLogEvent
    +private[streaming] case class BatchAllocationEvent(time: Time, allocatedBlocks: AllocatedBlocks)
    +  extends ReceivedBlockTrackerLogEvent
    +private[streaming] case class BatchCleanupEvent(times: Seq[Time])
    +  extends ReceivedBlockTrackerLogEvent
    +
    +
    +/** Class representing the blocks of all the streams allocated to a batch */
    +private[streaming]
    +case class AllocatedBlocks(streamIdToAllocatedBlocks: Map[Int, Seq[ReceivedBlockInfo]]) {
    +  def getBlocksOfStream(streamId: Int): Seq[ReceivedBlockInfo] = {
    +    streamIdToAllocatedBlocks.get(streamId).getOrElse(Seq.empty)
    +  }
    +}
    +
    +/**
    + * Class that keep track of all the received blocks, and allocate them to batches
    + * when required. All actions taken by this class can be saved to a write ahead log
    + * (if a checkpoint directory has been provided), so that the state of the tracker
    + * (received blocks and block-to-batch allocations) can be recovered after driver failure.
    + *
    + * Note that when any instance of this class is created with a checkpoint directory,
    + * it will try reading events from logs in the directory.
    + */
    +private[streaming] class ReceivedBlockTracker(
    +    conf: SparkConf,
    +    hadoopConf: Configuration,
    +    streamIds: Seq[Int],
    +    clock: Clock,
    +    checkpointDirOption: Option[String])
    +  extends Logging {
    +
    +  private type ReceivedBlockQueue = mutable.Queue[ReceivedBlockInfo]
    +  
    +  private val streamIdToUnallocatedBlockQueues = new mutable.HashMap[Int, ReceivedBlockQueue]
    +  private val timeToAllocatedBlocks = new mutable.HashMap[Time, AllocatedBlocks]
    +
    +  private val logManagerRollingIntervalSecs = conf.getInt(
    +    "spark.streaming.receivedBlockTracker.writeAheadLog.rotationIntervalSecs", 60)
    +  private val logManagerOption = checkpointDirOption.map { checkpointDir =>
    +    new WriteAheadLogManager(
    +      ReceivedBlockTracker.checkpointDirToLogDir(checkpointDir),
    +      hadoopConf,
    +      rollingIntervalSecs = logManagerRollingIntervalSecs,
    +      callerName = "ReceivedBlockHandlerMaster",
    +      clock = clock
    +    )
    +  }
    +
    +  private var lastAllocatedBatchTime: Time = null
    +
    +  // Recover block information from write ahead logs
    +  recoverFromWriteAheadLogs()
    +
    +  /** Add received block. This event will get written to the write ahead log (if enabled). */
    +  def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = synchronized {
    +    try {
    +      writeToLog(BlockAdditionEvent(receivedBlockInfo))
    +      getReceivedBlockQueue(receivedBlockInfo.streamId) += receivedBlockInfo
    +      logDebug(s"Stream ${receivedBlockInfo.streamId} received " +
    +        s"block ${receivedBlockInfo.blockStoreResult.blockId}")
    +      true
    +    } catch {
    +      case e: Exception =>
    +        logError(s"Error adding block $receivedBlockInfo", e)
    +        false
    +    }
    +  }
    +
    +  /**
    +   * Allocate all unallocated blocks to the given batch.
    +   * This event will get written to the write ahead log (if enabled).
    +   */
    +  def allocateBlocksToBatch(batchTime: Time): Unit = synchronized {
    +    if (lastAllocatedBatchTime == null || batchTime > lastAllocatedBatchTime) {
    +      val allocatedBlocks = {
    --- End diff --
    
    Done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4029][Streaming] Update streaming drive...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/3026#issuecomment-61348399
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/22652/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4029][Streaming] Update streaming drive...

Posted by pwendell <gi...@git.apache.org>.
Github user pwendell commented on the pull request:

    https://github.com/apache/spark/pull/3026#issuecomment-61339143
  
    This looks good overall. I like the clean-up on the allocation code path. Just left minor comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4029][Streaming] Update streaming drive...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/3026#issuecomment-61325810
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/22636/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4029][Streaming] Update streaming drive...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3026#discussion_r19693010
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala ---
    @@ -0,0 +1,205 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.streaming.scheduler
    +
    +import java.nio.ByteBuffer
    +
    +import scala.collection.mutable
    +import scala.language.implicitConversions
    +
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.fs.Path
    +
    +import org.apache.spark.{Logging, SparkConf}
    +import org.apache.spark.streaming.Time
    +import org.apache.spark.streaming.util.{Clock, WriteAheadLogManager}
    +import org.apache.spark.util.Utils
    +
    +/** Trait representing any event in the ReceivedBlockTracker that updates its state. */
    +private[streaming] sealed trait ReceivedBlockTrackerLogEvent
    +
    +private[streaming] case class BlockAdditionEvent(receivedBlockInfo: ReceivedBlockInfo)
    +  extends ReceivedBlockTrackerLogEvent
    +private[streaming] case class BatchAllocationEvent(time: Time, allocatedBlocks: AllocatedBlocks)
    +  extends ReceivedBlockTrackerLogEvent
    +private[streaming] case class BatchCleanupEvent(times: Seq[Time])
    +  extends ReceivedBlockTrackerLogEvent
    +
    +
    +/** Class representing the blocks of all the streams allocated to a batch */
    +private[streaming] case class AllocatedBlocks(streamIdToAllocatedBlocks: Map[Int, Seq[ReceivedBlockInfo]]) {
    +  def getBlockForStream(streamId: Int) = streamIdToAllocatedBlocks(streamId)
    +}
    +
    +/**
    + * Class that keep track of all the received blocks, and allocate them to batches
    + * when required. All actions taken by this class can be saved to a write ahead log
    + * (if a checkpoint directory has been provided), so that the state of the tracker
    + * (received blocks and block-to-batch allocations) can be recovered after driver failure.
    + *
    + * Note that when any instance of this class is created with a checkpoint directory,
    + * it will try reading events from logs in the directory.
    + */
    +private[streaming] class ReceivedBlockTracker(
    +    conf: SparkConf,
    +    hadoopConf: Configuration,
    +    streamIds: Seq[Int],
    +    clock: Clock,
    +    checkpointDirOption: Option[String])
    +  extends Logging {
    +
    +  private type ReceivedBlockQueue = mutable.Queue[ReceivedBlockInfo]
    +  
    +  private val streamIdToUnallocatedBlockQueues = new mutable.HashMap[Int, ReceivedBlockQueue]
    +  private val timeToAllocatedBlocks = new mutable.HashMap[Time, AllocatedBlocks]
    +
    +  private val logManagerRollingIntervalSecs = conf.getInt(
    +    "spark.streaming.receivedBlockTracker.writeAheadLog.rotationIntervalSecs", 60)
    +  private val logManagerOption = checkpointDirOption.map { checkpointDir =>
    +    new WriteAheadLogManager(
    +      ReceivedBlockTracker.checkpointDirToLogDir(checkpointDir),
    +      hadoopConf,
    +      rollingIntervalSecs = logManagerRollingIntervalSecs,
    +      callerName = "ReceivedBlockHandlerMaster",
    +      clock = clock
    +    )
    +  }
    +
    +  // Recover block information from write ahead logs
    +  recoverFromWriteAheadLogs()
    +
    +  /** Add received block. This event will get written to the write ahead log (if enabled). */
    +  def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = synchronized {
    +    try {
    +      writeToLog(BlockAdditionEvent(receivedBlockInfo))
    +      getReceivedBlockQueue(receivedBlockInfo.streamId) += receivedBlockInfo
    +      logDebug(s"Stream ${receivedBlockInfo.streamId} received " +
    +        s"block ${receivedBlockInfo.blockStoreResult.blockId}")
    +      true
    +    } catch {
    +      case e: Exception =>
    +        logError(s"Error adding block $receivedBlockInfo", e)
    +        false
    +    }
    +  }
    +
    +  /**
    +   * Allocate all unallocated blocks to the given batch.
    +   * This event will get written to the write ahead log (if enabled).
    +   */
    +  def allocateBlocksToBatch(batchTime: Time): Unit = synchronized {
    +    val allocatedBlocks = AllocatedBlocks(streamIds.map { streamId =>
    +        (streamId, getReceivedBlockQueue(streamId).dequeueAll(x => true))
    +    }.toMap)
    +    writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))
    +    timeToAllocatedBlocks(batchTime) = allocatedBlocks
    +    allocatedBlocks
    +  }
    +
    +  /** Get blocks that have been added but not yet allocated to any batch. */
    +  def getUnallocatedBlocks(streamId: Int): Seq[ReceivedBlockInfo] = synchronized {
    +    getReceivedBlockQueue(streamId).toSeq
    +  } 
    +
    +  /** Get the blocks allocated to a batch, or allocate blocks to the batch and then get them. */
    --- End diff --
    
    This comment seems inaccurate, since it looks like this method doesn't auto-allocate blocks to batches.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4029][Streaming] Update streaming drive...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3026#discussion_r19694650
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala ---
    @@ -60,22 +56,35 @@ abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingCont
     
       /** Ask ReceiverInputTracker for received data blocks and generates RDDs with them. */
       override def compute(validTime: Time): Option[RDD[T]] = {
    -    // If this is called for any time before the start time of the context,
    -    // then this returns an empty RDD. This may happen when recovering from a
    -    // master failure
    -    if (validTime >= graph.startTime) {
    -      val blockInfo = ssc.scheduler.receiverTracker.getReceivedBlockInfo(id)
    -      receivedBlockInfo(validTime) = blockInfo
    -      val blockIds = blockInfo.map { _.blockStoreResult.blockId.asInstanceOf[BlockId] }
    -      Some(new BlockRDD[T](ssc.sc, blockIds))
    -    } else {
    -      Some(new BlockRDD[T](ssc.sc, Array.empty))
    +    val blockRDD = {
    +      if (validTime >= graph.startTime) {
    +        val blockStoreResults = getReceivedBlockInfo(validTime).map { _.blockStoreResult }
    +        val blockIds = blockStoreResults.map { _.blockId.asInstanceOf[BlockId] }.toArray
    --- End diff --
    
    It looks like the `blockId` here is a `StreamBlockId`, which is a subclass of BlockId, so I don't think that you need this cast.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4029][Streaming] Update streaming drive...

Posted by pwendell <gi...@git.apache.org>.
Github user pwendell commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3026#discussion_r19653425
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala ---
    @@ -0,0 +1,207 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.streaming.scheduler
    +
    +import java.nio.ByteBuffer
    +
    +import scala.collection.mutable
    +import scala.language.implicitConversions
    +
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.fs.Path
    +
    +import org.apache.spark.{Logging, SparkConf}
    +import org.apache.spark.storage.StreamBlockId
    +import org.apache.spark.streaming.Time
    +import org.apache.spark.streaming.util.{Clock, WriteAheadLogManager}
    +import org.apache.spark.util.Utils
    +
    +/** Trait representing any action done in the ReceivedBlockTracker */
    +private[streaming] sealed trait ReceivedBlockTrackerAction
    +
    +private[streaming] case class BlockAddition(receivedBlockInfo: ReceivedBlockInfo)
    +  extends ReceivedBlockTrackerAction
    +private[streaming] case class BatchAllocations(time: Time, allocatedBlocks: AllocatedBlocks)
    +  extends ReceivedBlockTrackerAction
    +private[streaming] case class BatchCleanup(times: Seq[Time])
    +  extends ReceivedBlockTrackerAction
    +
    +
    +/** Class representing the blocks of all the streams allocated to a batch */
    +case class AllocatedBlocks(streamIdToAllocatedBlocks: Map[Int, Seq[ReceivedBlockInfo]]) {
    +  def apply(streamId: Int) = streamIdToAllocatedBlocks(streamId)
    +}
    +
    +/**
    + * Class that keep track of all the received blocks, and allocate them to batches
    + * when required. All actions taken by this class can be saved to a write ahead log,
    + * so that the state of the tracker (received blocks and block-to-batch allocations)
    + * can be recovered after driver failure.
    + */
    --- End diff --
    
    It would be good to explain that as a side effect of construction this will attempt to read any existing state in the log contained in the checkpoint directory. Alternatively, to keep the constructor side effect free, what if the caller explicitly called `recoverFromWriteAheadLogs` when it constructs this (and you'd throw an error if someone called that when there were already blocks).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4029][Streaming] Update streaming drive...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/3026#issuecomment-61744377
  
      [Test build #22906 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22906/consoleFull) for   PR 3026 at commit [`a8009ed`](https://github.com/apache/spark/commit/a8009ed0f23899202750df9d2213e8bffa01f7f3).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4029][Streaming] Update streaming drive...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3026#discussion_r19834651
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala ---
    @@ -58,24 +54,45 @@ abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingCont
     
       def stop() {}
     
    -  /** Ask ReceiverInputTracker for received data blocks and generates RDDs with them. */
    +  /**
    +   * Generates RDDs with blocks received by the receiver of this stream. */
       override def compute(validTime: Time): Option[RDD[T]] = {
    -    // If this is called for any time before the start time of the context,
    -    // then this returns an empty RDD. This may happen when recovering from a
    -    // master failure
    -    if (validTime >= graph.startTime) {
    -      val blockInfo = ssc.scheduler.receiverTracker.getReceivedBlockInfo(id)
    -      receivedBlockInfo(validTime) = blockInfo
    -      val blockIds = blockInfo.map { _.blockStoreResult.blockId.asInstanceOf[BlockId] }
    -      Some(new BlockRDD[T](ssc.sc, blockIds))
    -    } else {
    -      Some(new BlockRDD[T](ssc.sc, Array.empty))
    -    }
    -  }
    +    val blockRDD = {
     
    -  /** Get information on received blocks. */
    -  private[streaming] def getReceivedBlockInfo(time: Time) = {
    -    receivedBlockInfo.get(time).getOrElse(Array.empty[ReceivedBlockInfo])
    +      if (validTime < graph.startTime) {
    +        // If this is called for any time before the start time of the context,
    +        // then this returns an empty RDD. This may happen when recovering from a
    +        // driver failure without any write ahead log to recover pre-failure data.
    +        new BlockRDD[T](ssc.sc, Array.empty)
    +      } else {
    +        // Otherwise, ask the tracker for all the blocks that have been allocated to this stream
    +        // for this batch
    +        val blockInfos =
    +          ssc.scheduler.receiverTracker.getBlocksOfBatch(validTime).get(id).getOrElse(Seq.empty)
    +        val blockStoreResults = blockInfos.map { _.blockStoreResult }
    +        val blockIds = blockStoreResults.map { _.blockId.asInstanceOf[BlockId] }.toArray
    +
    +        // Check whether all the results are of the same type
    +        val resultTypes = blockStoreResults.map { _.getClass }.distinct
    +        if (resultTypes.size > 1) {
    +          logWarning("Multiple result types in block information, WAL information will be ignored.")
    +        }
    +
    +        // If all the results are of type WriteAheadLogBasedStoreResult, then create
    +        // WriteAheadLogBackedBlockRDD else create simple BlockRDD.
    +        if (resultTypes.size == 1 && resultTypes.head == classOf[WriteAheadLogBasedStoreResult]) {
    +          val logSegments = blockStoreResults.map {
    +            _.asInstanceOf[WriteAheadLogBasedStoreResult].segment
    +          }.toArray
    +          // Since storeInBlockManager = false, the storage level does not matter.
    +          new WriteAheadLogBackedBlockRDD[T](ssc.sparkContext,
    +            blockIds, logSegments, storeInBlockManager = false, StorageLevel.NONE)
    --- End diff --
    
    There can be an additional cost of putting the data back into BM, which is unnecessary for simple workloads where the data is probably going to be used only once. I see your point as well. So what we can do is that we allows the data to be stored in BM only in the serialized form  (so storage level = MEMORY_ONLY_SER). That should be a no-overhead solution.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4029][Streaming] Update streaming drive...

Posted by pwendell <gi...@git.apache.org>.
Github user pwendell commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3026#discussion_r19652287
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala ---
    @@ -0,0 +1,207 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.streaming.scheduler
    +
    +import java.nio.ByteBuffer
    +
    +import scala.collection.mutable
    +import scala.language.implicitConversions
    +
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.fs.Path
    +
    +import org.apache.spark.{Logging, SparkConf}
    +import org.apache.spark.storage.StreamBlockId
    +import org.apache.spark.streaming.Time
    +import org.apache.spark.streaming.util.{Clock, WriteAheadLogManager}
    +import org.apache.spark.util.Utils
    +
    +/** Trait representing any action done in the ReceivedBlockTracker */
    +private[streaming] sealed trait ReceivedBlockTrackerAction
    +
    +private[streaming] case class BlockAddition(receivedBlockInfo: ReceivedBlockInfo)
    --- End diff --
    
    These could be `BlockAdditionEvent`, `BatchCleanupEvent` etc.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4029][Streaming] Update streaming drive...

Posted by pwendell <gi...@git.apache.org>.
Github user pwendell commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3026#discussion_r19696702
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala ---
    @@ -0,0 +1,205 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.streaming.scheduler
    +
    +import java.nio.ByteBuffer
    +
    +import scala.collection.mutable
    +import scala.language.implicitConversions
    +
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.fs.Path
    +
    +import org.apache.spark.{Logging, SparkConf}
    +import org.apache.spark.streaming.Time
    +import org.apache.spark.streaming.util.{Clock, WriteAheadLogManager}
    +import org.apache.spark.util.Utils
    +
    +/** Trait representing any event in the ReceivedBlockTracker that updates its state. */
    +private[streaming] sealed trait ReceivedBlockTrackerLogEvent
    +
    +private[streaming] case class BlockAdditionEvent(receivedBlockInfo: ReceivedBlockInfo)
    +  extends ReceivedBlockTrackerLogEvent
    +private[streaming] case class BatchAllocationEvent(time: Time, allocatedBlocks: AllocatedBlocks)
    +  extends ReceivedBlockTrackerLogEvent
    +private[streaming] case class BatchCleanupEvent(times: Seq[Time])
    +  extends ReceivedBlockTrackerLogEvent
    +
    +
    +/** Class representing the blocks of all the streams allocated to a batch */
    +private[streaming] case class AllocatedBlocks(streamIdToAllocatedBlocks: Map[Int, Seq[ReceivedBlockInfo]]) {
    +  def getBlockForStream(streamId: Int) = streamIdToAllocatedBlocks(streamId)
    +}
    +
    +/**
    + * Class that keep track of all the received blocks, and allocate them to batches
    + * when required. All actions taken by this class can be saved to a write ahead log
    + * (if a checkpoint directory has been provided), so that the state of the tracker
    + * (received blocks and block-to-batch allocations) can be recovered after driver failure.
    + *
    + * Note that when any instance of this class is created with a checkpoint directory,
    + * it will try reading events from logs in the directory.
    + */
    +private[streaming] class ReceivedBlockTracker(
    +    conf: SparkConf,
    +    hadoopConf: Configuration,
    +    streamIds: Seq[Int],
    +    clock: Clock,
    +    checkpointDirOption: Option[String])
    +  extends Logging {
    +
    +  private type ReceivedBlockQueue = mutable.Queue[ReceivedBlockInfo]
    +  
    +  private val streamIdToUnallocatedBlockQueues = new mutable.HashMap[Int, ReceivedBlockQueue]
    +  private val timeToAllocatedBlocks = new mutable.HashMap[Time, AllocatedBlocks]
    +
    +  private val logManagerRollingIntervalSecs = conf.getInt(
    +    "spark.streaming.receivedBlockTracker.writeAheadLog.rotationIntervalSecs", 60)
    +  private val logManagerOption = checkpointDirOption.map { checkpointDir =>
    +    new WriteAheadLogManager(
    +      ReceivedBlockTracker.checkpointDirToLogDir(checkpointDir),
    +      hadoopConf,
    +      rollingIntervalSecs = logManagerRollingIntervalSecs,
    +      callerName = "ReceivedBlockHandlerMaster",
    +      clock = clock
    +    )
    +  }
    +
    +  // Recover block information from write ahead logs
    +  recoverFromWriteAheadLogs()
    +
    +  /** Add received block. This event will get written to the write ahead log (if enabled). */
    +  def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = synchronized {
    +    try {
    +      writeToLog(BlockAdditionEvent(receivedBlockInfo))
    +      getReceivedBlockQueue(receivedBlockInfo.streamId) += receivedBlockInfo
    +      logDebug(s"Stream ${receivedBlockInfo.streamId} received " +
    +        s"block ${receivedBlockInfo.blockStoreResult.blockId}")
    +      true
    +    } catch {
    +      case e: Exception =>
    +        logError(s"Error adding block $receivedBlockInfo", e)
    +        false
    +    }
    +  }
    +
    +  /**
    +   * Allocate all unallocated blocks to the given batch.
    +   * This event will get written to the write ahead log (if enabled).
    +   */
    +  def allocateBlocksToBatch(batchTime: Time): Unit = synchronized {
    +    val allocatedBlocks = AllocatedBlocks(streamIds.map { streamId =>
    +        (streamId, getReceivedBlockQueue(streamId).dequeueAll(x => true))
    +    }.toMap)
    +    writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))
    +    timeToAllocatedBlocks(batchTime) = allocatedBlocks
    +    allocatedBlocks
    +  }
    +
    +  /** Get blocks that have been added but not yet allocated to any batch. */
    +  def getUnallocatedBlocks(streamId: Int): Seq[ReceivedBlockInfo] = synchronized {
    --- End diff --
    
    Is this exposed only for testing? If so, can you note that down?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4029][Streaming] Update streaming drive...

Posted by pwendell <gi...@git.apache.org>.
Github user pwendell commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3026#discussion_r19684794
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala ---
    @@ -0,0 +1,207 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.streaming.scheduler
    +
    +import java.nio.ByteBuffer
    +
    +import scala.collection.mutable
    +import scala.language.implicitConversions
    +
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.fs.Path
    +
    +import org.apache.spark.{Logging, SparkConf}
    +import org.apache.spark.storage.StreamBlockId
    +import org.apache.spark.streaming.Time
    +import org.apache.spark.streaming.util.{Clock, WriteAheadLogManager}
    +import org.apache.spark.util.Utils
    +
    +/** Trait representing any action done in the ReceivedBlockTracker */
    +private[streaming] sealed trait ReceivedBlockTrackerAction
    --- End diff --
    
    Sure - having `LogEvent` somewhere in there would just be helpful.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4029][Streaming] Update streaming drive...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/3026#issuecomment-61322229
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/22629/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4029][Streaming] Update streaming drive...

Posted by pwendell <gi...@git.apache.org>.
Github user pwendell commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3026#discussion_r19652330
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala ---
    @@ -0,0 +1,207 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.streaming.scheduler
    +
    +import java.nio.ByteBuffer
    +
    +import scala.collection.mutable
    +import scala.language.implicitConversions
    +
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.fs.Path
    +
    +import org.apache.spark.{Logging, SparkConf}
    +import org.apache.spark.storage.StreamBlockId
    +import org.apache.spark.streaming.Time
    +import org.apache.spark.streaming.util.{Clock, WriteAheadLogManager}
    +import org.apache.spark.util.Utils
    +
    +/** Trait representing any action done in the ReceivedBlockTracker */
    +private[streaming] sealed trait ReceivedBlockTrackerAction
    +
    +private[streaming] case class BlockAddition(receivedBlockInfo: ReceivedBlockInfo)
    +  extends ReceivedBlockTrackerAction
    +private[streaming] case class BatchAllocations(time: Time, allocatedBlocks: AllocatedBlocks)
    --- End diff --
    
    Minor - but I found the term "Allocation" confusing here and in my mind saying "Assignment" would be more clear. I think of allocation as associated with creation of new data (e.g. allocate more memory). In this case, for a given block, it is first added and then assigned to a particular batch. This is very minor though and maybe other people find Allocation more obvious.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4029][Streaming] Update streaming drive...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3026#discussion_r19689268
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala ---
    @@ -0,0 +1,207 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.streaming.scheduler
    +
    +import java.nio.ByteBuffer
    +
    +import scala.collection.mutable
    +import scala.language.implicitConversions
    +
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.fs.Path
    +
    +import org.apache.spark.{Logging, SparkConf}
    +import org.apache.spark.storage.StreamBlockId
    +import org.apache.spark.streaming.Time
    +import org.apache.spark.streaming.util.{Clock, WriteAheadLogManager}
    +import org.apache.spark.util.Utils
    +
    +/** Trait representing any action done in the ReceivedBlockTracker */
    +private[streaming] sealed trait ReceivedBlockTrackerAction
    +
    +private[streaming] case class BlockAddition(receivedBlockInfo: ReceivedBlockInfo)
    +  extends ReceivedBlockTrackerAction
    +private[streaming] case class BatchAllocations(time: Time, allocatedBlocks: AllocatedBlocks)
    +  extends ReceivedBlockTrackerAction
    +private[streaming] case class BatchCleanup(times: Seq[Time])
    +  extends ReceivedBlockTrackerAction
    +
    +
    +/** Class representing the blocks of all the streams allocated to a batch */
    +case class AllocatedBlocks(streamIdToAllocatedBlocks: Map[Int, Seq[ReceivedBlockInfo]]) {
    +  def apply(streamId: Int) = streamIdToAllocatedBlocks(streamId)
    +}
    +
    +/**
    + * Class that keep track of all the received blocks, and allocate them to batches
    + * when required. All actions taken by this class can be saved to a write ahead log,
    + * so that the state of the tracker (received blocks and block-to-batch allocations)
    + * can be recovered after driver failure.
    + */
    +private[streaming]
    +class ReceivedBlockTracker(
    +    conf: SparkConf, hadoopConf: Configuration, streamIds: Seq[Int], clock: Clock,
    +    checkpointDirOption: Option[String]) extends Logging {
    +
    +  private type ReceivedBlockQueue = mutable.Queue[ReceivedBlockInfo]
    +  
    +  private val streamIdToUnallocatedBlockInfo = new mutable.HashMap[Int, ReceivedBlockQueue]
    +  private val timeToAllocatedBlockInfo = new mutable.HashMap[Time, AllocatedBlocks]
    +
    +  private val logManagerRollingIntervalSecs = conf.getInt(
    +    "spark.streaming.receivedBlockTracker.writeAheadLog.rotationIntervalSecs", 60)
    +  private val logManagerOption = checkpointDirOption.map { checkpointDir =>
    +    new WriteAheadLogManager(
    +      ReceivedBlockTracker.checkpointDirToLogDir(checkpointDir),
    +      hadoopConf,
    +      rollingIntervalSecs = logManagerRollingIntervalSecs,
    +      callerName = "ReceivedBlockHandlerMaster",
    +      clock = clock
    +    )
    +  }
    +
    +  // Recover block information from write ahead logs
    +  recoverFromWriteAheadLogs()
    +
    +  /** Add received block */
    +  def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = synchronized {
    +    try {
    +      writeToLog(BlockAddition(receivedBlockInfo))
    +      getReceivedBlockQueue(receivedBlockInfo.streamId) += receivedBlockInfo
    +      logDebug(s"Stream ${receivedBlockInfo.streamId} received " +
    +        s"block ${receivedBlockInfo.blockStoreResult.blockId}")
    +      true
    +    } catch {
    +      case e: Exception =>
    +        logError("Error adding block " + receivedBlockInfo, e)
    +        false
    +    }
    +  }
    +
    +  /** Get blocks that have been added but not yet allocated to any batch */
    +  def getUnallocatedBlocks(streamId: Int): Seq[ReceivedBlockInfo] = synchronized {
    +    getReceivedBlockQueue(streamId).toSeq
    +  } 
    +
    +  /** Get the blocks allocated to a batch, or allocate blocks to the batch and then get them */
    +  def getOrAllocateBlocksToBatch(batchTime: Time, streamId: Int): Seq[ReceivedBlockInfo] = {
    +    synchronized {
    +      if (!timeToAllocatedBlockInfo.contains(batchTime)) {
    +        allocateAllUnallocatedBlocksToBatch(batchTime)
    +      }
    +      timeToAllocatedBlockInfo(batchTime)(streamId)
    +    }
    +  }
    +
    +  /** Check if any blocks are left to be allocated to batches */
    +  def hasUnallocatedReceivedBlocks(): Boolean = synchronized {
    +    !streamIdToUnallocatedBlockInfo.values.forall(_.isEmpty)
    +  }
    +
    +  /** Clean up block information of old batches */
    +  def cleanupOldBatches(cleanupThreshTime: Time): Unit = synchronized {
    +    assert(cleanupThreshTime.milliseconds < clock.currentTime())
    +    val timesToCleanup = timeToAllocatedBlockInfo.keys.filter { _ < cleanupThreshTime }.toSeq
    +    logInfo("Deleting batches " + timesToCleanup)
    +    writeToLog(BatchCleanup(timesToCleanup))
    +    timeToAllocatedBlockInfo --= timesToCleanup
    +    logManagerOption.foreach(_.cleanupOldLogs(cleanupThreshTime.milliseconds))
    +    log
    +  }
    +
    +  /** Stop the block tracker */
    +  def stop() {
    +    logManagerOption.foreach { _.stop() }
    +  }
    +
    +  /** Allocate all unallocated blocks to the given batch */
    +  private def allocateAllUnallocatedBlocksToBatch(batchTime: Time): AllocatedBlocks = synchronized {
    +    val allocatedBlockInfos = AllocatedBlocks(streamIds.map { streamId =>
    +      (streamId, getReceivedBlockQueue(streamId).dequeueAll(x => true))
    +    }.toMap)
    +    writeToLog(BatchAllocations(batchTime, allocatedBlockInfos))
    +    timeToAllocatedBlockInfo(batchTime) = allocatedBlockInfos
    +    allocatedBlockInfos
    +  }
    +
    +  /**
    +   * Recover all the tracker actions from the write ahead logs to recover the state (unallocated
    +   * and allocated block info) prior to failure
    +   */
    +  private def recoverFromWriteAheadLogs(): Unit = synchronized {
    +    logInfo("Recovering from checkpoint")
    +
    +    // Insert the recovered block information
    +    def insertAddedBlock(receivedBlockInfo: ReceivedBlockInfo) {
    +      logTrace(s"Recovery: Inserting added block $receivedBlockInfo")
    +      // println(s"Recovery: Inserting added block $receivedBlockInfo")
    +      getReceivedBlockQueue(receivedBlockInfo.streamId) += receivedBlockInfo
    +    }
    +
    +    // Insert the recovered block-to-batch allocations and clear the queue of received blocks
    +    // (when the blocks were originally allocated to the batch, the queue must have been cleared).
    +    def insertAllocatedBatch(time: Time, allocatedBlocks: AllocatedBlocks) {
    +      logTrace(s"Recovery: Inserting allocated batch for time $time to " +
    +        s"${allocatedBlocks.streamIdToAllocatedBlocks}")
    +      // println(s"Recovery: Inserting allocated batch for time $time to " +
    --- End diff --
    
    Removed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4029][Streaming] Update streaming drive...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3026#discussion_r19653881
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala ---
    @@ -0,0 +1,207 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.streaming.scheduler
    +
    +import java.nio.ByteBuffer
    +
    +import scala.collection.mutable
    +import scala.language.implicitConversions
    +
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.fs.Path
    +
    +import org.apache.spark.{Logging, SparkConf}
    +import org.apache.spark.storage.StreamBlockId
    +import org.apache.spark.streaming.Time
    +import org.apache.spark.streaming.util.{Clock, WriteAheadLogManager}
    +import org.apache.spark.util.Utils
    +
    +/** Trait representing any action done in the ReceivedBlockTracker */
    +private[streaming] sealed trait ReceivedBlockTrackerAction
    --- End diff --
    
    How about `ReceivedBlockTrackerLogEvent` ? I am not so sure about giving a such a generic name `LogEvent`; it becomes hard to immediately identify what module such this class is related to. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4029][Streaming] Update streaming drive...

Posted by harishreedharan <gi...@git.apache.org>.
Github user harishreedharan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3026#discussion_r19698018
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala ---
    @@ -58,24 +54,45 @@ abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingCont
     
       def stop() {}
     
    -  /** Ask ReceiverInputTracker for received data blocks and generates RDDs with them. */
    +  /**
    +   * Generates RDDs with blocks received by the receiver of this stream. */
       override def compute(validTime: Time): Option[RDD[T]] = {
    -    // If this is called for any time before the start time of the context,
    -    // then this returns an empty RDD. This may happen when recovering from a
    -    // master failure
    -    if (validTime >= graph.startTime) {
    -      val blockInfo = ssc.scheduler.receiverTracker.getReceivedBlockInfo(id)
    -      receivedBlockInfo(validTime) = blockInfo
    -      val blockIds = blockInfo.map { _.blockStoreResult.blockId.asInstanceOf[BlockId] }
    -      Some(new BlockRDD[T](ssc.sc, blockIds))
    -    } else {
    -      Some(new BlockRDD[T](ssc.sc, Array.empty))
    -    }
    -  }
    +    val blockRDD = {
     
    -  /** Get information on received blocks. */
    -  private[streaming] def getReceivedBlockInfo(time: Time) = {
    -    receivedBlockInfo.get(time).getOrElse(Array.empty[ReceivedBlockInfo])
    +      if (validTime < graph.startTime) {
    +        // If this is called for any time before the start time of the context,
    +        // then this returns an empty RDD. This may happen when recovering from a
    +        // driver failure without any write ahead log to recover pre-failure data.
    +        new BlockRDD[T](ssc.sc, Array.empty)
    +      } else {
    +        // Otherwise, ask the tracker for all the blocks that have been allocated to this stream
    +        // for this batch
    +        val blockInfos =
    +          ssc.scheduler.receiverTracker.getBlocksOfBatch(validTime).get(id).getOrElse(Seq.empty)
    --- End diff --
    
    Does the receiver tracker read from HDFS each time getBlocksOfBatch is called (sorry, I don't remember if it does)? If it does, then this call incurs more HDFS reads than required when there are several streams in the same app, correct?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4029][Streaming] Update streaming drive...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3026#discussion_r19846870
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala ---
    @@ -217,14 +217,15 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
     
       /** Generate jobs and perform checkpoint for the given `time`.  */
       private def generateJobs(time: Time) {
    -    Try(graph.generateJobs(time)) match {
    +    SparkEnv.set(ssc.env)
    --- End diff --
    
    This was added when SparkEnv needed to be set for launching jobs on non-main threads. Since the JobGenerator is background thread which actually submits the jobs, the SparkEnv needed to be set. But since we have removed the whole threadlocal stuff from SparkEnv, this is probably not needed any more.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4029][Streaming] Update streaming drive...

Posted by pwendell <gi...@git.apache.org>.
Github user pwendell commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3026#discussion_r19652476
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala ---
    @@ -0,0 +1,207 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.streaming.scheduler
    +
    +import java.nio.ByteBuffer
    +
    +import scala.collection.mutable
    +import scala.language.implicitConversions
    +
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.fs.Path
    +
    +import org.apache.spark.{Logging, SparkConf}
    +import org.apache.spark.storage.StreamBlockId
    +import org.apache.spark.streaming.Time
    +import org.apache.spark.streaming.util.{Clock, WriteAheadLogManager}
    +import org.apache.spark.util.Utils
    +
    +/** Trait representing any action done in the ReceivedBlockTracker */
    +private[streaming] sealed trait ReceivedBlockTrackerAction
    +
    +private[streaming] case class BlockAddition(receivedBlockInfo: ReceivedBlockInfo)
    +  extends ReceivedBlockTrackerAction
    +private[streaming] case class BatchAllocations(time: Time, allocatedBlocks: AllocatedBlocks)
    +  extends ReceivedBlockTrackerAction
    +private[streaming] case class BatchCleanup(times: Seq[Time])
    +  extends ReceivedBlockTrackerAction
    +
    +
    +/** Class representing the blocks of all the streams allocated to a batch */
    +case class AllocatedBlocks(streamIdToAllocatedBlocks: Map[Int, Seq[ReceivedBlockInfo]]) {
    +  def apply(streamId: Int) = streamIdToAllocatedBlocks(streamId)
    +}
    +
    +/**
    + * Class that keep track of all the received blocks, and allocate them to batches
    + * when required. All actions taken by this class can be saved to a write ahead log,
    + * so that the state of the tracker (received blocks and block-to-batch allocations)
    + * can be recovered after driver failure.
    + */
    +private[streaming]
    +class ReceivedBlockTracker(
    +    conf: SparkConf, hadoopConf: Configuration, streamIds: Seq[Int], clock: Clock,
    +    checkpointDirOption: Option[String]) extends Logging {
    +
    +  private type ReceivedBlockQueue = mutable.Queue[ReceivedBlockInfo]
    +  
    +  private val streamIdToUnallocatedBlockInfo = new mutable.HashMap[Int, ReceivedBlockQueue]
    +  private val timeToAllocatedBlockInfo = new mutable.HashMap[Time, AllocatedBlocks]
    +
    +  private val logManagerRollingIntervalSecs = conf.getInt(
    +    "spark.streaming.receivedBlockTracker.writeAheadLog.rotationIntervalSecs", 60)
    +  private val logManagerOption = checkpointDirOption.map { checkpointDir =>
    +    new WriteAheadLogManager(
    +      ReceivedBlockTracker.checkpointDirToLogDir(checkpointDir),
    +      hadoopConf,
    +      rollingIntervalSecs = logManagerRollingIntervalSecs,
    +      callerName = "ReceivedBlockHandlerMaster",
    +      clock = clock
    +    )
    +  }
    +
    +  // Recover block information from write ahead logs
    +  recoverFromWriteAheadLogs()
    +
    +  /** Add received block */
    +  def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = synchronized {
    --- End diff --
    
    is there any issue with having all of these in a synchronized block? This seems like it could block for a long time if it goes into an HDFS flush... and the event is processed inside of an actor.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4029][Streaming] Update streaming drive...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3026#discussion_r19696972
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala ---
    @@ -0,0 +1,205 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.streaming.scheduler
    +
    +import java.nio.ByteBuffer
    +
    +import scala.collection.mutable
    +import scala.language.implicitConversions
    +
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.fs.Path
    +
    +import org.apache.spark.{Logging, SparkConf}
    +import org.apache.spark.streaming.Time
    +import org.apache.spark.streaming.util.{Clock, WriteAheadLogManager}
    +import org.apache.spark.util.Utils
    +
    +/** Trait representing any event in the ReceivedBlockTracker that updates its state. */
    +private[streaming] sealed trait ReceivedBlockTrackerLogEvent
    +
    +private[streaming] case class BlockAdditionEvent(receivedBlockInfo: ReceivedBlockInfo)
    +  extends ReceivedBlockTrackerLogEvent
    +private[streaming] case class BatchAllocationEvent(time: Time, allocatedBlocks: AllocatedBlocks)
    +  extends ReceivedBlockTrackerLogEvent
    +private[streaming] case class BatchCleanupEvent(times: Seq[Time])
    +  extends ReceivedBlockTrackerLogEvent
    +
    +
    +/** Class representing the blocks of all the streams allocated to a batch */
    +private[streaming] case class AllocatedBlocks(streamIdToAllocatedBlocks: Map[Int, Seq[ReceivedBlockInfo]]) {
    +  def getBlockForStream(streamId: Int) = streamIdToAllocatedBlocks(streamId)
    +}
    +
    +/**
    + * Class that keep track of all the received blocks, and allocate them to batches
    + * when required. All actions taken by this class can be saved to a write ahead log
    + * (if a checkpoint directory has been provided), so that the state of the tracker
    + * (received blocks and block-to-batch allocations) can be recovered after driver failure.
    + *
    + * Note that when any instance of this class is created with a checkpoint directory,
    + * it will try reading events from logs in the directory.
    + */
    +private[streaming] class ReceivedBlockTracker(
    +    conf: SparkConf,
    +    hadoopConf: Configuration,
    +    streamIds: Seq[Int],
    +    clock: Clock,
    +    checkpointDirOption: Option[String])
    +  extends Logging {
    +
    +  private type ReceivedBlockQueue = mutable.Queue[ReceivedBlockInfo]
    +  
    +  private val streamIdToUnallocatedBlockQueues = new mutable.HashMap[Int, ReceivedBlockQueue]
    +  private val timeToAllocatedBlocks = new mutable.HashMap[Time, AllocatedBlocks]
    +
    +  private val logManagerRollingIntervalSecs = conf.getInt(
    +    "spark.streaming.receivedBlockTracker.writeAheadLog.rotationIntervalSecs", 60)
    +  private val logManagerOption = checkpointDirOption.map { checkpointDir =>
    +    new WriteAheadLogManager(
    +      ReceivedBlockTracker.checkpointDirToLogDir(checkpointDir),
    +      hadoopConf,
    +      rollingIntervalSecs = logManagerRollingIntervalSecs,
    +      callerName = "ReceivedBlockHandlerMaster",
    +      clock = clock
    +    )
    +  }
    +
    +  // Recover block information from write ahead logs
    +  recoverFromWriteAheadLogs()
    +
    +  /** Add received block. This event will get written to the write ahead log (if enabled). */
    +  def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = synchronized {
    +    try {
    +      writeToLog(BlockAdditionEvent(receivedBlockInfo))
    +      getReceivedBlockQueue(receivedBlockInfo.streamId) += receivedBlockInfo
    +      logDebug(s"Stream ${receivedBlockInfo.streamId} received " +
    +        s"block ${receivedBlockInfo.blockStoreResult.blockId}")
    +      true
    +    } catch {
    +      case e: Exception =>
    +        logError(s"Error adding block $receivedBlockInfo", e)
    +        false
    +    }
    +  }
    +
    +  /**
    +   * Allocate all unallocated blocks to the given batch.
    +   * This event will get written to the write ahead log (if enabled).
    +   */
    +  def allocateBlocksToBatch(batchTime: Time): Unit = synchronized {
    +    val allocatedBlocks = AllocatedBlocks(streamIds.map { streamId =>
    +        (streamId, getReceivedBlockQueue(streamId).dequeueAll(x => true))
    +    }.toMap)
    +    writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))
    +    timeToAllocatedBlocks(batchTime) = allocatedBlocks
    +    allocatedBlocks
    +  }
    +
    +  /** Get blocks that have been added but not yet allocated to any batch. */
    +  def getUnallocatedBlocks(streamId: Int): Seq[ReceivedBlockInfo] = synchronized {
    --- End diff --
    
    Added.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4029][Streaming] Update streaming drive...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3026#discussion_r19695521
  
    --- Diff: streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala ---
    @@ -0,0 +1,230 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.streaming
    +
    +import java.io.File
    +
    +import scala.Some
    +import scala.collection.mutable.ArrayBuffer
    +import scala.concurrent.duration._
    +import scala.language.{implicitConversions, postfixOps}
    +import scala.util.Random
    +
    +import com.google.common.io.Files
    +import org.apache.commons.io.FileUtils
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.spark.{Logging, SparkConf}
    +import org.apache.spark.storage.StreamBlockId
    +import org.apache.spark.streaming.scheduler._
    +import org.apache.spark.streaming.util.WriteAheadLogSuite._
    +import org.apache.spark.streaming.util.{WriteAheadLogReader, Clock, ManualClock, SystemClock}
    +import org.apache.spark.util.Utils
    +import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
    +import org.scalatest.concurrent.Eventually._
    +import org.apache.spark.streaming.scheduler.ReceivedBlockInfo
    +import org.apache.spark.storage.StreamBlockId
    +import scala.Some
    +import org.apache.spark.streaming.receiver.BlockManagerBasedStoreResult
    +
    +class ReceivedBlockTrackerSuite
    +  extends FunSuite with BeforeAndAfter with Matchers with Logging {
    +
    +  val conf = new SparkConf().setMaster("local[2]").setAppName("ReceivedBlockTrackerSuite")
    +  conf.set("spark.streaming.receivedBlockTracker.writeAheadLog.rotationIntervalSecs", "1")
    +
    +  val hadoopConf = new Configuration()
    +  val akkaTimeout = 10 seconds
    +  val streamId = 1
    +
    +  var allReceivedBlockTrackers = new ArrayBuffer[ReceivedBlockTracker]()
    +  var checkpointDirectory: File = null
    +
    +  before {
    +    checkpointDirectory = Files.createTempDir()
    +  }
    +
    +  after {
    +    allReceivedBlockTrackers.foreach { _.stop() }
    +    if (checkpointDirectory != null && checkpointDirectory.exists()) {
    +      FileUtils.deleteDirectory(checkpointDirectory)
    +      checkpointDirectory = null
    +    }
    +  }
    +
    +  test("block addition, and block to batch allocation") {
    +    val receivedBlockTracker = createTracker(enableCheckpoint = false)
    +    receivedBlockTracker.getUnallocatedBlocks(streamId) shouldEqual Seq.empty
    +
    +    val blockInfos = generateBlockInfos()
    +    blockInfos.map(receivedBlockTracker.addBlock)
    +
    +    receivedBlockTracker.getUnallocatedBlocks(streamId) shouldEqual blockInfos
    +    receivedBlockTracker.allocateBlocksToBatch(1)
    +    receivedBlockTracker.getBlocksOfBatch(1, streamId) shouldEqual blockInfos
    +    receivedBlockTracker.getUnallocatedBlocks(streamId) should have size 0
    +    receivedBlockTracker.allocateBlocksToBatch(2)
    +    receivedBlockTracker.getBlocksOfBatch(2, streamId) should have size 0
    +  }
    +
    +  test("block addition, block to batch allocation and cleanup with write ahead log") {
    +    val manualClock = new ManualClock
    +    conf.getInt(
    +      "spark.streaming.receivedBlockTracker.writeAheadLog.rotationIntervalSecs", -1) should be (1)
    +
    +    // Set the time increment level to twice the rotation interval so that every increment creates
    +    // a new log file
    +    val timeIncrementMillis = 2000L
    +    def incrementTime() {
    +      manualClock.addToTime(timeIncrementMillis)
    +    }
    +
    +    // Generate and add blocks to the given tracker
    +    def addBlockInfos(tracker: ReceivedBlockTracker): Seq[ReceivedBlockInfo] = {
    +      val blockInfos = generateBlockInfos()
    +      blockInfos.map(tracker.addBlock)
    +      blockInfos
    +    }
    +
    +    // Print the data present in the log ahead files in the log directory
    +    def printLogFiles(message: String) {
    +      val fileContents = getWriteAheadLogFiles().map { file =>
    +        (s"\n>>>>> $file: <<<<<\n${getWrittenLogData(file).mkString("\n")}")
    +      }.mkString("\n")
    +      logInfo(s"\n\n=====================\n$message\n$fileContents\n=====================\n")
    +    }
    +
    +    // Start tracker and add blocks
    +    val tracker1 = createTracker(enableCheckpoint = true, clock = manualClock)
    +    val blockInfos1 = addBlockInfos(tracker1)
    +    tracker1.getUnallocatedBlocks(streamId).toList shouldEqual blockInfos1
    +
    +    // Verify whether write ahead log has correct contents
    +    val expectedWrittenData1 = blockInfos1.map(BlockAdditionEvent)
    +    getWrittenLogData() shouldEqual expectedWrittenData1
    +    getWriteAheadLogFiles() should have size 1
    +
    +    // Restart tracker and verify recovered list of unallocated blocks
    +    incrementTime()
    +    val tracker2 = createTracker(enableCheckpoint = true, clock = manualClock)
    +    tracker2.getUnallocatedBlocks(streamId).toList shouldEqual blockInfos1
    +
    +    // Allocate blocks to batch and verify whether the unallocated blocks got allocated
    +    val batchTime1 = manualClock.currentTime
    +    tracker2.allocateBlocksToBatch(batchTime1)
    +    tracker2.getBlocksOfBatch(batchTime1, streamId) shouldEqual blockInfos1
    +
    +    // Add more blocks and allocate to another batch
    +    incrementTime()
    +    val batchTime2 = manualClock.currentTime
    +    val blockInfos2 = addBlockInfos(tracker2)
    +    tracker2.allocateBlocksToBatch(batchTime2)
    +    tracker2.getBlocksOfBatch(batchTime2, streamId) shouldEqual blockInfos2
    +
    +    // Verify whether log has correct contents
    +    val expectedWrittenData2 = expectedWrittenData1 ++
    +      Seq(createBatchAllocation(batchTime1, blockInfos1)) ++
    +      blockInfos2.map(BlockAdditionEvent) ++
    +      Seq(createBatchAllocation(batchTime2, blockInfos2))
    +    getWrittenLogData() shouldEqual expectedWrittenData2
    +
    +    // Restart tracker and verify recovered state
    +    incrementTime()
    +    val tracker3 = createTracker(enableCheckpoint = true, clock = manualClock)
    +    tracker3.getBlocksOfBatch(batchTime1, streamId) shouldEqual blockInfos1
    +    tracker3.getBlocksOfBatch(batchTime2, streamId) shouldEqual blockInfos2
    +    tracker3.getUnallocatedBlocks(streamId) shouldBe empty
    +
    +    // Cleanup first batch but not second batch
    +    val oldestLogFile = getWriteAheadLogFiles().head
    +    incrementTime()
    +    tracker3.cleanupOldBatches(batchTime2)
    +
    +    // Verify that the batch allocations have been cleaned, and the act has been written to log
    +    tracker3.getBlocksOfBatch(batchTime1, streamId) shouldEqual Seq.empty
    +    getWrittenLogData(getWriteAheadLogFiles().last) should contain(createBatchCleanup(batchTime1))
    +
    +    // Verify that at least one log file gets deleted
    +    eventually(timeout(10 seconds), interval(10 millisecond )) {
    --- End diff --
    
    Looks like an extra space here after `millisecond`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4029][Streaming] Update streaming drive...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3026#discussion_r19697255
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala ---
    @@ -0,0 +1,205 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.streaming.scheduler
    +
    +import java.nio.ByteBuffer
    +
    +import scala.collection.mutable
    +import scala.language.implicitConversions
    +
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.fs.Path
    +
    +import org.apache.spark.{Logging, SparkConf}
    +import org.apache.spark.streaming.Time
    +import org.apache.spark.streaming.util.{Clock, WriteAheadLogManager}
    +import org.apache.spark.util.Utils
    +
    +/** Trait representing any event in the ReceivedBlockTracker that updates its state. */
    +private[streaming] sealed trait ReceivedBlockTrackerLogEvent
    +
    +private[streaming] case class BlockAdditionEvent(receivedBlockInfo: ReceivedBlockInfo)
    +  extends ReceivedBlockTrackerLogEvent
    +private[streaming] case class BatchAllocationEvent(time: Time, allocatedBlocks: AllocatedBlocks)
    +  extends ReceivedBlockTrackerLogEvent
    +private[streaming] case class BatchCleanupEvent(times: Seq[Time])
    +  extends ReceivedBlockTrackerLogEvent
    +
    +
    +/** Class representing the blocks of all the streams allocated to a batch */
    +private[streaming] case class AllocatedBlocks(streamIdToAllocatedBlocks: Map[Int, Seq[ReceivedBlockInfo]]) {
    +  def getBlockForStream(streamId: Int) = streamIdToAllocatedBlocks(streamId)
    +}
    +
    +/**
    + * Class that keep track of all the received blocks, and allocate them to batches
    + * when required. All actions taken by this class can be saved to a write ahead log
    + * (if a checkpoint directory has been provided), so that the state of the tracker
    + * (received blocks and block-to-batch allocations) can be recovered after driver failure.
    + *
    + * Note that when any instance of this class is created with a checkpoint directory,
    + * it will try reading events from logs in the directory.
    + */
    +private[streaming] class ReceivedBlockTracker(
    +    conf: SparkConf,
    +    hadoopConf: Configuration,
    +    streamIds: Seq[Int],
    +    clock: Clock,
    +    checkpointDirOption: Option[String])
    +  extends Logging {
    +
    +  private type ReceivedBlockQueue = mutable.Queue[ReceivedBlockInfo]
    +  
    +  private val streamIdToUnallocatedBlockQueues = new mutable.HashMap[Int, ReceivedBlockQueue]
    +  private val timeToAllocatedBlocks = new mutable.HashMap[Time, AllocatedBlocks]
    +
    +  private val logManagerRollingIntervalSecs = conf.getInt(
    +    "spark.streaming.receivedBlockTracker.writeAheadLog.rotationIntervalSecs", 60)
    +  private val logManagerOption = checkpointDirOption.map { checkpointDir =>
    +    new WriteAheadLogManager(
    +      ReceivedBlockTracker.checkpointDirToLogDir(checkpointDir),
    +      hadoopConf,
    +      rollingIntervalSecs = logManagerRollingIntervalSecs,
    +      callerName = "ReceivedBlockHandlerMaster",
    +      clock = clock
    +    )
    +  }
    +
    +  // Recover block information from write ahead logs
    +  recoverFromWriteAheadLogs()
    +
    +  /** Add received block. This event will get written to the write ahead log (if enabled). */
    +  def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = synchronized {
    +    try {
    +      writeToLog(BlockAdditionEvent(receivedBlockInfo))
    +      getReceivedBlockQueue(receivedBlockInfo.streamId) += receivedBlockInfo
    +      logDebug(s"Stream ${receivedBlockInfo.streamId} received " +
    +        s"block ${receivedBlockInfo.blockStoreResult.blockId}")
    +      true
    +    } catch {
    +      case e: Exception =>
    +        logError(s"Error adding block $receivedBlockInfo", e)
    +        false
    +    }
    +  }
    +
    +  /**
    +   * Allocate all unallocated blocks to the given batch.
    +   * This event will get written to the write ahead log (if enabled).
    +   */
    +  def allocateBlocksToBatch(batchTime: Time): Unit = synchronized {
    +    val allocatedBlocks = AllocatedBlocks(streamIds.map { streamId =>
    --- End diff --
    
    Haha, I had felt the same so had changed it. With your other suggestion incorporated, its cleaner.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4029][Streaming] Update streaming drive...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3026#discussion_r19647455
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala ---
    @@ -40,9 +39,6 @@ import org.apache.spark.SparkException
     abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingContext)
       extends InputDStream[T](ssc_) {
     
    -  /** Keeps all received blocks information */
    -  private lazy val receivedBlockInfo = new HashMap[Time, Array[ReceivedBlockInfo]]
    -
       /** This is an unique identifier for the network input stream. */
    --- End diff --
    
    All the functionality to keep track of block-to-batch allocations have been moved from `ReceiverInputDStream` to `ReceivedBlockTracker`. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4029][Streaming] Update streaming drive...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/3026#issuecomment-61207334
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/22585/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4029][Streaming] Update streaming drive...

Posted by pwendell <gi...@git.apache.org>.
Github user pwendell commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3026#discussion_r19652779
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala ---
    @@ -0,0 +1,207 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.streaming.scheduler
    +
    +import java.nio.ByteBuffer
    +
    +import scala.collection.mutable
    +import scala.language.implicitConversions
    +
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.fs.Path
    +
    +import org.apache.spark.{Logging, SparkConf}
    +import org.apache.spark.storage.StreamBlockId
    +import org.apache.spark.streaming.Time
    +import org.apache.spark.streaming.util.{Clock, WriteAheadLogManager}
    +import org.apache.spark.util.Utils
    +
    +/** Trait representing any action done in the ReceivedBlockTracker */
    +private[streaming] sealed trait ReceivedBlockTrackerAction
    +
    +private[streaming] case class BlockAddition(receivedBlockInfo: ReceivedBlockInfo)
    +  extends ReceivedBlockTrackerAction
    +private[streaming] case class BatchAllocations(time: Time, allocatedBlocks: AllocatedBlocks)
    +  extends ReceivedBlockTrackerAction
    +private[streaming] case class BatchCleanup(times: Seq[Time])
    +  extends ReceivedBlockTrackerAction
    +
    +
    +/** Class representing the blocks of all the streams allocated to a batch */
    +case class AllocatedBlocks(streamIdToAllocatedBlocks: Map[Int, Seq[ReceivedBlockInfo]]) {
    +  def apply(streamId: Int) = streamIdToAllocatedBlocks(streamId)
    +}
    +
    +/**
    + * Class that keep track of all the received blocks, and allocate them to batches
    + * when required. All actions taken by this class can be saved to a write ahead log,
    + * so that the state of the tracker (received blocks and block-to-batch allocations)
    + * can be recovered after driver failure.
    + */
    +private[streaming]
    +class ReceivedBlockTracker(
    +    conf: SparkConf, hadoopConf: Configuration, streamIds: Seq[Int], clock: Clock,
    +    checkpointDirOption: Option[String]) extends Logging {
    +
    +  private type ReceivedBlockQueue = mutable.Queue[ReceivedBlockInfo]
    +  
    +  private val streamIdToUnallocatedBlockInfo = new mutable.HashMap[Int, ReceivedBlockQueue]
    +  private val timeToAllocatedBlockInfo = new mutable.HashMap[Time, AllocatedBlocks]
    +
    +  private val logManagerRollingIntervalSecs = conf.getInt(
    +    "spark.streaming.receivedBlockTracker.writeAheadLog.rotationIntervalSecs", 60)
    +  private val logManagerOption = checkpointDirOption.map { checkpointDir =>
    +    new WriteAheadLogManager(
    +      ReceivedBlockTracker.checkpointDirToLogDir(checkpointDir),
    +      hadoopConf,
    +      rollingIntervalSecs = logManagerRollingIntervalSecs,
    +      callerName = "ReceivedBlockHandlerMaster",
    +      clock = clock
    +    )
    +  }
    +
    +  // Recover block information from write ahead logs
    +  recoverFromWriteAheadLogs()
    +
    +  /** Add received block */
    +  def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = synchronized {
    +    try {
    +      writeToLog(BlockAddition(receivedBlockInfo))
    +      getReceivedBlockQueue(receivedBlockInfo.streamId) += receivedBlockInfo
    +      logDebug(s"Stream ${receivedBlockInfo.streamId} received " +
    +        s"block ${receivedBlockInfo.blockStoreResult.blockId}")
    +      true
    +    } catch {
    +      case e: Exception =>
    +        logError("Error adding block " + receivedBlockInfo, e)
    +        false
    +    }
    +  }
    +
    +  /** Get blocks that have been added but not yet allocated to any batch */
    +  def getUnallocatedBlocks(streamId: Int): Seq[ReceivedBlockInfo] = synchronized {
    +    getReceivedBlockQueue(streamId).toSeq
    +  } 
    +
    +  /** Get the blocks allocated to a batch, or allocate blocks to the batch and then get them */
    +  def getOrAllocateBlocksToBatch(batchTime: Time, streamId: Int): Seq[ReceivedBlockInfo] = {
    +    synchronized {
    +      if (!timeToAllocatedBlockInfo.contains(batchTime)) {
    +        allocateAllUnallocatedBlocksToBatch(batchTime)
    +      }
    +      timeToAllocatedBlockInfo(batchTime)(streamId)
    +    }
    +  }
    +
    +  /** Check if any blocks are left to be allocated to batches */
    +  def hasUnallocatedReceivedBlocks(): Boolean = synchronized {
    +    !streamIdToUnallocatedBlockInfo.values.forall(_.isEmpty)
    +  }
    +
    +  /** Clean up block information of old batches */
    +  def cleanupOldBatches(cleanupThreshTime: Time): Unit = synchronized {
    +    assert(cleanupThreshTime.milliseconds < clock.currentTime())
    +    val timesToCleanup = timeToAllocatedBlockInfo.keys.filter { _ < cleanupThreshTime }.toSeq
    +    logInfo("Deleting batches " + timesToCleanup)
    +    writeToLog(BatchCleanup(timesToCleanup))
    +    timeToAllocatedBlockInfo --= timesToCleanup
    +    logManagerOption.foreach(_.cleanupOldLogs(cleanupThreshTime.milliseconds))
    +    log
    +  }
    +
    +  /** Stop the block tracker */
    +  def stop() {
    +    logManagerOption.foreach { _.stop() }
    +  }
    +
    +  /** Allocate all unallocated blocks to the given batch */
    +  private def allocateAllUnallocatedBlocksToBatch(batchTime: Time): AllocatedBlocks = synchronized {
    +    val allocatedBlockInfos = AllocatedBlocks(streamIds.map { streamId =>
    +      (streamId, getReceivedBlockQueue(streamId).dequeueAll(x => true))
    +    }.toMap)
    +    writeToLog(BatchAllocations(batchTime, allocatedBlockInfos))
    +    timeToAllocatedBlockInfo(batchTime) = allocatedBlockInfos
    +    allocatedBlockInfos
    +  }
    +
    +  /**
    +   * Recover all the tracker actions from the write ahead logs to recover the state (unallocated
    +   * and allocated block info) prior to failure
    +   */
    +  private def recoverFromWriteAheadLogs(): Unit = synchronized {
    +    logInfo("Recovering from checkpoint")
    +
    +    // Insert the recovered block information
    +    def insertAddedBlock(receivedBlockInfo: ReceivedBlockInfo) {
    +      logTrace(s"Recovery: Inserting added block $receivedBlockInfo")
    +      // println(s"Recovery: Inserting added block $receivedBlockInfo")
    +      getReceivedBlockQueue(receivedBlockInfo.streamId) += receivedBlockInfo
    +    }
    +
    +    // Insert the recovered block-to-batch allocations and clear the queue of received blocks
    +    // (when the blocks were originally allocated to the batch, the queue must have been cleared).
    +    def insertAllocatedBatch(time: Time, allocatedBlocks: AllocatedBlocks) {
    +      logTrace(s"Recovery: Inserting allocated batch for time $time to " +
    +        s"${allocatedBlocks.streamIdToAllocatedBlocks}")
    +      // println(s"Recovery: Inserting allocated batch for time $time to " +
    --- End diff --
    
    extra commented code


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4029][Streaming] Update streaming drive...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/3026#issuecomment-61208200
  
      [Test build #22587 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22587/consoleFull) for   PR 3026 at commit [`19aec7d`](https://github.com/apache/spark/commit/19aec7d35c51b331b130bc7667619de883ac9f0b).
     * This patch **passes all tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `case class AllocatedBlocks(streamIdToAllocatedBlocks: Map[Int, Seq[ReceivedBlockInfo]]) `
      * `class ReceivedBlockTracker(`
      * `class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false) extends Logging `



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4029][Streaming] Update streaming drive...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3026#discussion_r19653928
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala ---
    @@ -0,0 +1,207 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.streaming.scheduler
    +
    +import java.nio.ByteBuffer
    +
    +import scala.collection.mutable
    +import scala.language.implicitConversions
    +
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.fs.Path
    +
    +import org.apache.spark.{Logging, SparkConf}
    +import org.apache.spark.storage.StreamBlockId
    +import org.apache.spark.streaming.Time
    +import org.apache.spark.streaming.util.{Clock, WriteAheadLogManager}
    +import org.apache.spark.util.Utils
    +
    +/** Trait representing any action done in the ReceivedBlockTracker */
    +private[streaming] sealed trait ReceivedBlockTrackerAction
    +
    +private[streaming] case class BlockAddition(receivedBlockInfo: ReceivedBlockInfo)
    +  extends ReceivedBlockTrackerAction
    +private[streaming] case class BatchAllocations(time: Time, allocatedBlocks: AllocatedBlocks)
    +  extends ReceivedBlockTrackerAction
    +private[streaming] case class BatchCleanup(times: Seq[Time])
    +  extends ReceivedBlockTrackerAction
    +
    +
    +/** Class representing the blocks of all the streams allocated to a batch */
    +case class AllocatedBlocks(streamIdToAllocatedBlocks: Map[Int, Seq[ReceivedBlockInfo]]) {
    +  def apply(streamId: Int) = streamIdToAllocatedBlocks(streamId)
    +}
    +
    +/**
    + * Class that keep track of all the received blocks, and allocate them to batches
    + * when required. All actions taken by this class can be saved to a write ahead log,
    + * so that the state of the tracker (received blocks and block-to-batch allocations)
    + * can be recovered after driver failure.
    + */
    +private[streaming]
    +class ReceivedBlockTracker(
    +    conf: SparkConf, hadoopConf: Configuration, streamIds: Seq[Int], clock: Clock,
    +    checkpointDirOption: Option[String]) extends Logging {
    +
    +  private type ReceivedBlockQueue = mutable.Queue[ReceivedBlockInfo]
    +  
    +  private val streamIdToUnallocatedBlockInfo = new mutable.HashMap[Int, ReceivedBlockQueue]
    +  private val timeToAllocatedBlockInfo = new mutable.HashMap[Time, AllocatedBlocks]
    +
    +  private val logManagerRollingIntervalSecs = conf.getInt(
    +    "spark.streaming.receivedBlockTracker.writeAheadLog.rotationIntervalSecs", 60)
    +  private val logManagerOption = checkpointDirOption.map { checkpointDir =>
    +    new WriteAheadLogManager(
    +      ReceivedBlockTracker.checkpointDirToLogDir(checkpointDir),
    +      hadoopConf,
    +      rollingIntervalSecs = logManagerRollingIntervalSecs,
    +      callerName = "ReceivedBlockHandlerMaster",
    +      clock = clock
    +    )
    +  }
    +
    +  // Recover block information from write ahead logs
    +  recoverFromWriteAheadLogs()
    +
    +  /** Add received block */
    +  def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = synchronized {
    +    try {
    +      writeToLog(BlockAddition(receivedBlockInfo))
    +      getReceivedBlockQueue(receivedBlockInfo.streamId) += receivedBlockInfo
    +      logDebug(s"Stream ${receivedBlockInfo.streamId} received " +
    +        s"block ${receivedBlockInfo.blockStoreResult.blockId}")
    +      true
    +    } catch {
    +      case e: Exception =>
    +        logError("Error adding block " + receivedBlockInfo, e)
    +        false
    +    }
    +  }
    +
    +  /** Get blocks that have been added but not yet allocated to any batch */
    +  def getUnallocatedBlocks(streamId: Int): Seq[ReceivedBlockInfo] = synchronized {
    +    getReceivedBlockQueue(streamId).toSeq
    +  } 
    +
    +  /** Get the blocks allocated to a batch, or allocate blocks to the batch and then get them */
    +  def getOrAllocateBlocksToBatch(batchTime: Time, streamId: Int): Seq[ReceivedBlockInfo] = {
    --- End diff --
    
    Okay, let me think about this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4029][Streaming] Update streaming drive...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3026#discussion_r19694991
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala ---
    @@ -60,22 +56,35 @@ abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingCont
     
       /** Ask ReceiverInputTracker for received data blocks and generates RDDs with them. */
       override def compute(validTime: Time): Option[RDD[T]] = {
    -    // If this is called for any time before the start time of the context,
    -    // then this returns an empty RDD. This may happen when recovering from a
    -    // master failure
    -    if (validTime >= graph.startTime) {
    -      val blockInfo = ssc.scheduler.receiverTracker.getReceivedBlockInfo(id)
    -      receivedBlockInfo(validTime) = blockInfo
    -      val blockIds = blockInfo.map { _.blockStoreResult.blockId.asInstanceOf[BlockId] }
    -      Some(new BlockRDD[T](ssc.sc, blockIds))
    -    } else {
    -      Some(new BlockRDD[T](ssc.sc, Array.empty))
    +    val blockRDD = {
    +      if (validTime >= graph.startTime) {
    +        val blockStoreResults = getReceivedBlockInfo(validTime).map { _.blockStoreResult }
    +        val blockIds = blockStoreResults.map { _.blockId.asInstanceOf[BlockId] }.toArray
    +        val isWriteAheadLogBased = blockStoreResults.forall {
    +          _.isInstanceOf[WriteAheadLogBasedStoreResult]
    +        }
    +        if (isWriteAheadLogBased) {
    +          val logSegments = blockStoreResults.map {
    +            _.asInstanceOf[WriteAheadLogBasedStoreResult].segment
    +          }.toArray
    +          new WriteAheadLogBackedBlockRDD[T](ssc.sparkContext,
    +            blockIds, logSegments, storeInBlockManager = false, StorageLevel.MEMORY_ONLY_SER)
    --- End diff --
    
    Since `storeInBlockManager = false`, I guess it doesn't matter what we pass for StorageLevel here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4029][Streaming] Update streaming drive...

Posted by pwendell <gi...@git.apache.org>.
Github user pwendell commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3026#discussion_r19652215
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala ---
    @@ -0,0 +1,207 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.streaming.scheduler
    +
    +import java.nio.ByteBuffer
    +
    +import scala.collection.mutable
    +import scala.language.implicitConversions
    +
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.fs.Path
    +
    +import org.apache.spark.{Logging, SparkConf}
    +import org.apache.spark.storage.StreamBlockId
    +import org.apache.spark.streaming.Time
    +import org.apache.spark.streaming.util.{Clock, WriteAheadLogManager}
    +import org.apache.spark.util.Utils
    +
    +/** Trait representing any action done in the ReceivedBlockTracker */
    --- End diff --
    
    btw - just suggesting maybe the doc can be removed as it's self evident


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4029][Streaming] Update streaming drive...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/3026#issuecomment-61201112
  
      [Test build #22585 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22585/consoleFull) for   PR 3026 at commit [`19aec7d`](https://github.com/apache/spark/commit/19aec7d35c51b331b130bc7667619de883ac9f0b).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4029][Streaming] Update streaming drive...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/3026#issuecomment-61186763
  
      [Test build #22572 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22572/consoleFull) for   PR 3026 at commit [`cda62ee`](https://github.com/apache/spark/commit/cda62ee40f34bd4b65c657ed4194780248f6ca83).
     * This patch **fails Scala style tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `case class AllocatedBlocks(streamIdToAllocatedBlocks: Map[Int, Seq[ReceivedBlockInfo]]) `
      * `class ReceivedBlockTracker(`
      * `class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false) extends Logging `



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4029][Streaming] Update streaming drive...

Posted by pwendell <gi...@git.apache.org>.
Github user pwendell commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3026#discussion_r19652282
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala ---
    @@ -0,0 +1,207 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.streaming.scheduler
    +
    +import java.nio.ByteBuffer
    +
    +import scala.collection.mutable
    +import scala.language.implicitConversions
    +
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.fs.Path
    +
    +import org.apache.spark.{Logging, SparkConf}
    +import org.apache.spark.storage.StreamBlockId
    +import org.apache.spark.streaming.Time
    +import org.apache.spark.streaming.util.{Clock, WriteAheadLogManager}
    +import org.apache.spark.util.Utils
    +
    +/** Trait representing any action done in the ReceivedBlockTracker */
    +private[streaming] sealed trait ReceivedBlockTrackerAction
    --- End diff --
    
    What about naming this something like `LogEvent`? It wasn't clear to me when I looked at this what it meant by "Action".


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4029][Streaming] Update streaming drive...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/3026#issuecomment-61186766
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/22572/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4029][Streaming] Update streaming drive...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/3026#issuecomment-61339328
  
      [Test build #22652 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22652/consoleFull) for   PR 3026 at commit [`47fc1e3`](https://github.com/apache/spark/commit/47fc1e3acb015d119ca9328330418a8afa08d85a).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4029][Streaming] Update streaming drive...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3026#discussion_r19696729
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala ---
    @@ -60,22 +56,35 @@ abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingCont
     
       /** Ask ReceiverInputTracker for received data blocks and generates RDDs with them. */
       override def compute(validTime: Time): Option[RDD[T]] = {
    -    // If this is called for any time before the start time of the context,
    -    // then this returns an empty RDD. This may happen when recovering from a
    -    // master failure
    -    if (validTime >= graph.startTime) {
    -      val blockInfo = ssc.scheduler.receiverTracker.getReceivedBlockInfo(id)
    -      receivedBlockInfo(validTime) = blockInfo
    -      val blockIds = blockInfo.map { _.blockStoreResult.blockId.asInstanceOf[BlockId] }
    -      Some(new BlockRDD[T](ssc.sc, blockIds))
    -    } else {
    -      Some(new BlockRDD[T](ssc.sc, Array.empty))
    +    val blockRDD = {
    +      if (validTime >= graph.startTime) {
    +        val blockStoreResults = getReceivedBlockInfo(validTime).map { _.blockStoreResult }
    +        val blockIds = blockStoreResults.map { _.blockId.asInstanceOf[BlockId] }.toArray
    +        val isWriteAheadLogBased = blockStoreResults.forall {
    --- End diff --
    
    It should ideally all WAL based or none WAL based. We could make a stronger assertion here, but we risk breaking stuff if something goes wrong. We can always safely ignore the WAL info and make simple BlockRDDs, which would allow streaming to work, but not recover. In either case, if some of the blocks are WAL based, and others not, we cannot possible recover. 
    
    This is not a very strong argument though for not putting the assertion. How about just logging a warning that something may be going wrong if multiple types are mixed?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4029][Streaming] Update streaming drive...

Posted by pwendell <gi...@git.apache.org>.
Github user pwendell commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3026#discussion_r19837078
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala ---
    @@ -217,14 +217,15 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
     
       /** Generate jobs and perform checkpoint for the given `time`.  */
       private def generateJobs(time: Time) {
    -    Try(graph.generateJobs(time)) match {
    +    SparkEnv.set(ssc.env)
    --- End diff --
    
    Just wondering - why does this need to be set here? Who consumes this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4029][Streaming] Update streaming drive...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/3026#issuecomment-61345812
  
      [Test build #22654 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22654/consoleFull) for   PR 3026 at commit [`2ee2484`](https://github.com/apache/spark/commit/2ee24842b5a5160bba4af9052406391cfe521b62).
     * This patch **passes all tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `case class AllocatedBlocks(streamIdToAllocatedBlocks: Map[Int, Seq[ReceivedBlockInfo]]) `
      * `class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false) extends Logging `



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4029][Streaming] Update streaming drive...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/3026#issuecomment-61325805
  
      [Test build #22636 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22636/consoleFull) for   PR 3026 at commit [`af63655`](https://github.com/apache/spark/commit/af63655fdf0e370ceab8069780b5f83c835421f0).
     * This patch **fails Scala style tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false) extends Logging `



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4029][Streaming] Update streaming drive...

Posted by pwendell <gi...@git.apache.org>.
Github user pwendell commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3026#discussion_r19697044
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala ---
    @@ -0,0 +1,229 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.streaming.scheduler
    +
    +import java.nio.ByteBuffer
    +
    +import scala.collection.mutable
    +import scala.language.implicitConversions
    +
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.fs.Path
    +
    +import org.apache.spark.{SparkException, Logging, SparkConf}
    +import org.apache.spark.streaming.Time
    +import org.apache.spark.streaming.util.{Clock, WriteAheadLogManager}
    +import org.apache.spark.util.Utils
    +
    +/** Trait representing any event in the ReceivedBlockTracker that updates its state. */
    +private[streaming] sealed trait ReceivedBlockTrackerLogEvent
    +
    +private[streaming] case class BlockAdditionEvent(receivedBlockInfo: ReceivedBlockInfo)
    +  extends ReceivedBlockTrackerLogEvent
    +private[streaming] case class BatchAllocationEvent(time: Time, allocatedBlocks: AllocatedBlocks)
    +  extends ReceivedBlockTrackerLogEvent
    +private[streaming] case class BatchCleanupEvent(times: Seq[Time])
    +  extends ReceivedBlockTrackerLogEvent
    +
    +
    +/** Class representing the blocks of all the streams allocated to a batch */
    +private[streaming]
    +case class AllocatedBlocks(streamIdToAllocatedBlocks: Map[Int, Seq[ReceivedBlockInfo]]) {
    +  def getBlocksOfStream(streamId: Int): Seq[ReceivedBlockInfo] = {
    +    streamIdToAllocatedBlocks.get(streamId).getOrElse(Seq.empty)
    +  }
    +}
    +
    +/**
    + * Class that keep track of all the received blocks, and allocate them to batches
    + * when required. All actions taken by this class can be saved to a write ahead log
    + * (if a checkpoint directory has been provided), so that the state of the tracker
    + * (received blocks and block-to-batch allocations) can be recovered after driver failure.
    + *
    + * Note that when any instance of this class is created with a checkpoint directory,
    + * it will try reading events from logs in the directory.
    + */
    +private[streaming] class ReceivedBlockTracker(
    +    conf: SparkConf,
    +    hadoopConf: Configuration,
    +    streamIds: Seq[Int],
    +    clock: Clock,
    +    checkpointDirOption: Option[String])
    +  extends Logging {
    +
    +  private type ReceivedBlockQueue = mutable.Queue[ReceivedBlockInfo]
    +  
    +  private val streamIdToUnallocatedBlockQueues = new mutable.HashMap[Int, ReceivedBlockQueue]
    +  private val timeToAllocatedBlocks = new mutable.HashMap[Time, AllocatedBlocks]
    +
    +  private val logManagerRollingIntervalSecs = conf.getInt(
    +    "spark.streaming.receivedBlockTracker.writeAheadLog.rotationIntervalSecs", 60)
    +  private val logManagerOption = checkpointDirOption.map { checkpointDir =>
    +    new WriteAheadLogManager(
    +      ReceivedBlockTracker.checkpointDirToLogDir(checkpointDir),
    +      hadoopConf,
    +      rollingIntervalSecs = logManagerRollingIntervalSecs,
    +      callerName = "ReceivedBlockHandlerMaster",
    +      clock = clock
    +    )
    +  }
    +
    +  private var lastAllocatedBatchTime: Time = null
    +
    +  // Recover block information from write ahead logs
    +  recoverFromWriteAheadLogs()
    +
    +  /** Add received block. This event will get written to the write ahead log (if enabled). */
    +  def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = synchronized {
    +    try {
    +      writeToLog(BlockAdditionEvent(receivedBlockInfo))
    +      getReceivedBlockQueue(receivedBlockInfo.streamId) += receivedBlockInfo
    +      logDebug(s"Stream ${receivedBlockInfo.streamId} received " +
    +        s"block ${receivedBlockInfo.blockStoreResult.blockId}")
    +      true
    +    } catch {
    +      case e: Exception =>
    +        logError(s"Error adding block $receivedBlockInfo", e)
    +        false
    +    }
    +  }
    +
    +  /**
    +   * Allocate all unallocated blocks to the given batch.
    +   * This event will get written to the write ahead log (if enabled).
    +   */
    +  def allocateBlocksToBatch(batchTime: Time): Unit = synchronized {
    +    if (lastAllocatedBatchTime == null || batchTime > lastAllocatedBatchTime) {
    +      val allocatedBlocks = {
    --- End diff --
    
    We don't usually do these type of block expressions in Spark - could you just assign an intermediate variable?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4029][Streaming] Update streaming drive...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/3026#issuecomment-61325625
  
      [Test build #22636 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22636/consoleFull) for   PR 3026 at commit [`af63655`](https://github.com/apache/spark/commit/af63655fdf0e370ceab8069780b5f83c835421f0).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4029][Streaming] Update streaming drive...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/3026#issuecomment-61750952
  
      [Test build #22906 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22906/consoleFull) for   PR 3026 at commit [`a8009ed`](https://github.com/apache/spark/commit/a8009ed0f23899202750df9d2213e8bffa01f7f3).
     * This patch **passes all tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `case class AllocatedBlocks(streamIdToAllocatedBlocks: Map[Int, Seq[ReceivedBlockInfo]]) `
      * `class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false) extends Logging `



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4029][Streaming] Update streaming drive...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3026#discussion_r19689226
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala ---
    @@ -0,0 +1,207 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.streaming.scheduler
    +
    +import java.nio.ByteBuffer
    +
    +import scala.collection.mutable
    +import scala.language.implicitConversions
    +
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.fs.Path
    +
    +import org.apache.spark.{Logging, SparkConf}
    +import org.apache.spark.storage.StreamBlockId
    +import org.apache.spark.streaming.Time
    +import org.apache.spark.streaming.util.{Clock, WriteAheadLogManager}
    +import org.apache.spark.util.Utils
    +
    +/** Trait representing any action done in the ReceivedBlockTracker */
    +private[streaming] sealed trait ReceivedBlockTrackerAction
    +
    +private[streaming] case class BlockAddition(receivedBlockInfo: ReceivedBlockInfo)
    +  extends ReceivedBlockTrackerAction
    +private[streaming] case class BatchAllocations(time: Time, allocatedBlocks: AllocatedBlocks)
    +  extends ReceivedBlockTrackerAction
    +private[streaming] case class BatchCleanup(times: Seq[Time])
    +  extends ReceivedBlockTrackerAction
    +
    +
    +/** Class representing the blocks of all the streams allocated to a batch */
    +case class AllocatedBlocks(streamIdToAllocatedBlocks: Map[Int, Seq[ReceivedBlockInfo]]) {
    +  def apply(streamId: Int) = streamIdToAllocatedBlocks(streamId)
    +}
    +
    +/**
    + * Class that keep track of all the received blocks, and allocate them to batches
    + * when required. All actions taken by this class can be saved to a write ahead log,
    + * so that the state of the tracker (received blocks and block-to-batch allocations)
    + * can be recovered after driver failure.
    + */
    +private[streaming]
    +class ReceivedBlockTracker(
    +    conf: SparkConf, hadoopConf: Configuration, streamIds: Seq[Int], clock: Clock,
    +    checkpointDirOption: Option[String]) extends Logging {
    +
    +  private type ReceivedBlockQueue = mutable.Queue[ReceivedBlockInfo]
    +  
    +  private val streamIdToUnallocatedBlockInfo = new mutable.HashMap[Int, ReceivedBlockQueue]
    +  private val timeToAllocatedBlockInfo = new mutable.HashMap[Time, AllocatedBlocks]
    +
    +  private val logManagerRollingIntervalSecs = conf.getInt(
    +    "spark.streaming.receivedBlockTracker.writeAheadLog.rotationIntervalSecs", 60)
    +  private val logManagerOption = checkpointDirOption.map { checkpointDir =>
    +    new WriteAheadLogManager(
    +      ReceivedBlockTracker.checkpointDirToLogDir(checkpointDir),
    +      hadoopConf,
    +      rollingIntervalSecs = logManagerRollingIntervalSecs,
    +      callerName = "ReceivedBlockHandlerMaster",
    +      clock = clock
    +    )
    +  }
    +
    +  // Recover block information from write ahead logs
    +  recoverFromWriteAheadLogs()
    +
    +  /** Add received block */
    +  def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = synchronized {
    +    try {
    +      writeToLog(BlockAddition(receivedBlockInfo))
    +      getReceivedBlockQueue(receivedBlockInfo.streamId) += receivedBlockInfo
    +      logDebug(s"Stream ${receivedBlockInfo.streamId} received " +
    +        s"block ${receivedBlockInfo.blockStoreResult.blockId}")
    +      true
    +    } catch {
    +      case e: Exception =>
    +        logError("Error adding block " + receivedBlockInfo, e)
    +        false
    +    }
    +  }
    +
    +  /** Get blocks that have been added but not yet allocated to any batch */
    +  def getUnallocatedBlocks(streamId: Int): Seq[ReceivedBlockInfo] = synchronized {
    +    getReceivedBlockQueue(streamId).toSeq
    +  } 
    +
    +  /** Get the blocks allocated to a batch, or allocate blocks to the batch and then get them */
    +  def getOrAllocateBlocksToBatch(batchTime: Time, streamId: Int): Seq[ReceivedBlockInfo] = {
    --- End diff --
    
    Changed, as discussed offline.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4029][Streaming] Update streaming drive...

Posted by pwendell <gi...@git.apache.org>.
Github user pwendell commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3026#discussion_r19652761
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala ---
    @@ -0,0 +1,207 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.streaming.scheduler
    +
    +import java.nio.ByteBuffer
    +
    +import scala.collection.mutable
    +import scala.language.implicitConversions
    +
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.fs.Path
    +
    +import org.apache.spark.{Logging, SparkConf}
    +import org.apache.spark.storage.StreamBlockId
    +import org.apache.spark.streaming.Time
    +import org.apache.spark.streaming.util.{Clock, WriteAheadLogManager}
    +import org.apache.spark.util.Utils
    +
    +/** Trait representing any action done in the ReceivedBlockTracker */
    +private[streaming] sealed trait ReceivedBlockTrackerAction
    +
    +private[streaming] case class BlockAddition(receivedBlockInfo: ReceivedBlockInfo)
    +  extends ReceivedBlockTrackerAction
    +private[streaming] case class BatchAllocations(time: Time, allocatedBlocks: AllocatedBlocks)
    +  extends ReceivedBlockTrackerAction
    +private[streaming] case class BatchCleanup(times: Seq[Time])
    +  extends ReceivedBlockTrackerAction
    +
    +
    +/** Class representing the blocks of all the streams allocated to a batch */
    +case class AllocatedBlocks(streamIdToAllocatedBlocks: Map[Int, Seq[ReceivedBlockInfo]]) {
    +  def apply(streamId: Int) = streamIdToAllocatedBlocks(streamId)
    +}
    +
    +/**
    + * Class that keep track of all the received blocks, and allocate them to batches
    + * when required. All actions taken by this class can be saved to a write ahead log,
    + * so that the state of the tracker (received blocks and block-to-batch allocations)
    + * can be recovered after driver failure.
    + */
    +private[streaming]
    +class ReceivedBlockTracker(
    +    conf: SparkConf, hadoopConf: Configuration, streamIds: Seq[Int], clock: Clock,
    +    checkpointDirOption: Option[String]) extends Logging {
    +
    +  private type ReceivedBlockQueue = mutable.Queue[ReceivedBlockInfo]
    +  
    +  private val streamIdToUnallocatedBlockInfo = new mutable.HashMap[Int, ReceivedBlockQueue]
    +  private val timeToAllocatedBlockInfo = new mutable.HashMap[Time, AllocatedBlocks]
    +
    +  private val logManagerRollingIntervalSecs = conf.getInt(
    +    "spark.streaming.receivedBlockTracker.writeAheadLog.rotationIntervalSecs", 60)
    +  private val logManagerOption = checkpointDirOption.map { checkpointDir =>
    +    new WriteAheadLogManager(
    +      ReceivedBlockTracker.checkpointDirToLogDir(checkpointDir),
    +      hadoopConf,
    +      rollingIntervalSecs = logManagerRollingIntervalSecs,
    +      callerName = "ReceivedBlockHandlerMaster",
    +      clock = clock
    +    )
    +  }
    +
    +  // Recover block information from write ahead logs
    +  recoverFromWriteAheadLogs()
    +
    +  /** Add received block */
    +  def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = synchronized {
    +    try {
    +      writeToLog(BlockAddition(receivedBlockInfo))
    +      getReceivedBlockQueue(receivedBlockInfo.streamId) += receivedBlockInfo
    +      logDebug(s"Stream ${receivedBlockInfo.streamId} received " +
    +        s"block ${receivedBlockInfo.blockStoreResult.blockId}")
    +      true
    +    } catch {
    +      case e: Exception =>
    +        logError("Error adding block " + receivedBlockInfo, e)
    +        false
    +    }
    +  }
    +
    +  /** Get blocks that have been added but not yet allocated to any batch */
    +  def getUnallocatedBlocks(streamId: Int): Seq[ReceivedBlockInfo] = synchronized {
    +    getReceivedBlockQueue(streamId).toSeq
    +  } 
    +
    +  /** Get the blocks allocated to a batch, or allocate blocks to the batch and then get them */
    +  def getOrAllocateBlocksToBatch(batchTime: Time, streamId: Int): Seq[ReceivedBlockInfo] = {
    --- End diff --
    
    Would it be possible to write this in a different way? It's hard to reason about interactions here because the behavior is defined entirely by the order and timing of invocations. This is used by both the ReceiverInputDStream and the JobGenerator... do those both expect to generate "new" allocations, or could one of them instead use a simpler API such as "getBlocksForBatch", where you assume that the batch already has blocks.
    
    The semantics also get hidden up the call chain. `ReceiverTracker` has a function `getReceivedBlocks` that doesn't say anything about the fact that it might create new assignments.
    
    If there is any way to change the code to make it more explicit when new allocations are happening that would be helpful.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4029][Streaming] Update streaming drive...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/3026#issuecomment-61750955
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/22906/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4029][Streaming] Update streaming drive...

Posted by pwendell <gi...@git.apache.org>.
Github user pwendell commented on the pull request:

    https://github.com/apache/spark/pull/3026#issuecomment-61718197
  
    Hey @tdas this LGTM. The only question was around setting of the SparkEnv... it might be good to document what consumes that downstream.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4029][Streaming] Update streaming drive...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3026#discussion_r19694604
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala ---
    @@ -84,28 +90,35 @@ class ReceiverTracker(ssc: StreamingContext) extends Logging {
       def stop() = synchronized {
         if (!receiverInputStreams.isEmpty && actor != null) {
           // First, stop the receivers
    -      receiverExecutor.stop()
    +      if (!skipReceiverLaunch) receiverExecutor.stop()
     
           // Finally, stop the actor
           ssc.env.actorSystem.stop(actor)
           actor = null
    +      receivedBlockTracker.stop()
           logInfo("ReceiverTracker stopped")
         }
       }
     
    -  /** Return all the blocks received from a receiver. */
    -  def getReceivedBlockInfo(streamId: Int): Array[ReceivedBlockInfo] = {
    -    val receivedBlockInfo = getReceivedBlockInfoQueue(streamId).dequeueAll(x => true)
    -    logInfo("Stream " + streamId + " received " + receivedBlockInfo.size + " blocks")
    -    receivedBlockInfo.toArray
    +  /** Allocate all unallocated blocks to the given batch. */
    +  def allocateBlocksToBatch(batchTime: Time): Unit = {
    +    if (receiverInputStreams.nonEmpty) {
    +      receivedBlockTracker.allocateBlocksToBatch(batchTime)
    +    }
       }
     
    -  private def getReceivedBlockInfoQueue(streamId: Int) = {
    -    receivedBlockInfo.getOrElseUpdate(streamId, new SynchronizedQueue[ReceivedBlockInfo])
    +  /** Get all the block for batch time . */
    --- End diff --
    
    Fixed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4029][Streaming] Update streaming drive...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the pull request:

    https://github.com/apache/spark/pull/3026#issuecomment-61200969
  
    Jenkins, test this please.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4029][Streaming] Update streaming drive...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3026#discussion_r19689220
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala ---
    @@ -0,0 +1,207 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.streaming.scheduler
    +
    +import java.nio.ByteBuffer
    +
    +import scala.collection.mutable
    +import scala.language.implicitConversions
    +
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.fs.Path
    +
    +import org.apache.spark.{Logging, SparkConf}
    +import org.apache.spark.storage.StreamBlockId
    +import org.apache.spark.streaming.Time
    +import org.apache.spark.streaming.util.{Clock, WriteAheadLogManager}
    +import org.apache.spark.util.Utils
    +
    +/** Trait representing any action done in the ReceivedBlockTracker */
    +private[streaming] sealed trait ReceivedBlockTrackerAction
    +
    +private[streaming] case class BlockAddition(receivedBlockInfo: ReceivedBlockInfo)
    +  extends ReceivedBlockTrackerAction
    +private[streaming] case class BatchAllocations(time: Time, allocatedBlocks: AllocatedBlocks)
    +  extends ReceivedBlockTrackerAction
    +private[streaming] case class BatchCleanup(times: Seq[Time])
    +  extends ReceivedBlockTrackerAction
    +
    +
    +/** Class representing the blocks of all the streams allocated to a batch */
    +case class AllocatedBlocks(streamIdToAllocatedBlocks: Map[Int, Seq[ReceivedBlockInfo]]) {
    +  def apply(streamId: Int) = streamIdToAllocatedBlocks(streamId)
    +}
    +
    +/**
    + * Class that keep track of all the received blocks, and allocate them to batches
    + * when required. All actions taken by this class can be saved to a write ahead log,
    + * so that the state of the tracker (received blocks and block-to-batch allocations)
    + * can be recovered after driver failure.
    + */
    +private[streaming]
    +class ReceivedBlockTracker(
    +    conf: SparkConf, hadoopConf: Configuration, streamIds: Seq[Int], clock: Clock,
    +    checkpointDirOption: Option[String]) extends Logging {
    +
    +  private type ReceivedBlockQueue = mutable.Queue[ReceivedBlockInfo]
    +  
    +  private val streamIdToUnallocatedBlockInfo = new mutable.HashMap[Int, ReceivedBlockQueue]
    +  private val timeToAllocatedBlockInfo = new mutable.HashMap[Time, AllocatedBlocks]
    +
    +  private val logManagerRollingIntervalSecs = conf.getInt(
    +    "spark.streaming.receivedBlockTracker.writeAheadLog.rotationIntervalSecs", 60)
    +  private val logManagerOption = checkpointDirOption.map { checkpointDir =>
    +    new WriteAheadLogManager(
    +      ReceivedBlockTracker.checkpointDirToLogDir(checkpointDir),
    +      hadoopConf,
    +      rollingIntervalSecs = logManagerRollingIntervalSecs,
    +      callerName = "ReceivedBlockHandlerMaster",
    +      clock = clock
    +    )
    +  }
    +
    +  // Recover block information from write ahead logs
    +  recoverFromWriteAheadLogs()
    +
    +  /** Add received block */
    +  def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = synchronized {
    --- End diff --
    
    This is a fair point. I wouldnt be surprised if this turns out to be a problem. Just wondering whether to address that complexity in this PR, or do a bit more real HDFS testing next week and then change this. What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4029][Streaming] Update streaming drive...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3026#discussion_r19693094
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala ---
    @@ -0,0 +1,205 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.streaming.scheduler
    +
    +import java.nio.ByteBuffer
    +
    +import scala.collection.mutable
    +import scala.language.implicitConversions
    +
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.fs.Path
    +
    +import org.apache.spark.{Logging, SparkConf}
    +import org.apache.spark.streaming.Time
    +import org.apache.spark.streaming.util.{Clock, WriteAheadLogManager}
    +import org.apache.spark.util.Utils
    +
    +/** Trait representing any event in the ReceivedBlockTracker that updates its state. */
    +private[streaming] sealed trait ReceivedBlockTrackerLogEvent
    +
    +private[streaming] case class BlockAdditionEvent(receivedBlockInfo: ReceivedBlockInfo)
    +  extends ReceivedBlockTrackerLogEvent
    +private[streaming] case class BatchAllocationEvent(time: Time, allocatedBlocks: AllocatedBlocks)
    +  extends ReceivedBlockTrackerLogEvent
    +private[streaming] case class BatchCleanupEvent(times: Seq[Time])
    +  extends ReceivedBlockTrackerLogEvent
    +
    +
    +/** Class representing the blocks of all the streams allocated to a batch */
    +private[streaming] case class AllocatedBlocks(streamIdToAllocatedBlocks: Map[Int, Seq[ReceivedBlockInfo]]) {
    +  def getBlockForStream(streamId: Int) = streamIdToAllocatedBlocks(streamId)
    +}
    +
    +/**
    + * Class that keep track of all the received blocks, and allocate them to batches
    + * when required. All actions taken by this class can be saved to a write ahead log
    + * (if a checkpoint directory has been provided), so that the state of the tracker
    + * (received blocks and block-to-batch allocations) can be recovered after driver failure.
    + *
    + * Note that when any instance of this class is created with a checkpoint directory,
    + * it will try reading events from logs in the directory.
    + */
    +private[streaming] class ReceivedBlockTracker(
    +    conf: SparkConf,
    +    hadoopConf: Configuration,
    +    streamIds: Seq[Int],
    +    clock: Clock,
    +    checkpointDirOption: Option[String])
    +  extends Logging {
    +
    +  private type ReceivedBlockQueue = mutable.Queue[ReceivedBlockInfo]
    +  
    +  private val streamIdToUnallocatedBlockQueues = new mutable.HashMap[Int, ReceivedBlockQueue]
    +  private val timeToAllocatedBlocks = new mutable.HashMap[Time, AllocatedBlocks]
    +
    +  private val logManagerRollingIntervalSecs = conf.getInt(
    +    "spark.streaming.receivedBlockTracker.writeAheadLog.rotationIntervalSecs", 60)
    +  private val logManagerOption = checkpointDirOption.map { checkpointDir =>
    +    new WriteAheadLogManager(
    +      ReceivedBlockTracker.checkpointDirToLogDir(checkpointDir),
    +      hadoopConf,
    +      rollingIntervalSecs = logManagerRollingIntervalSecs,
    +      callerName = "ReceivedBlockHandlerMaster",
    +      clock = clock
    +    )
    +  }
    +
    +  // Recover block information from write ahead logs
    +  recoverFromWriteAheadLogs()
    +
    +  /** Add received block. This event will get written to the write ahead log (if enabled). */
    +  def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = synchronized {
    +    try {
    +      writeToLog(BlockAdditionEvent(receivedBlockInfo))
    +      getReceivedBlockQueue(receivedBlockInfo.streamId) += receivedBlockInfo
    +      logDebug(s"Stream ${receivedBlockInfo.streamId} received " +
    +        s"block ${receivedBlockInfo.blockStoreResult.blockId}")
    +      true
    +    } catch {
    +      case e: Exception =>
    +        logError(s"Error adding block $receivedBlockInfo", e)
    +        false
    +    }
    +  }
    +
    +  /**
    +   * Allocate all unallocated blocks to the given batch.
    --- End diff --
    
    I guess there's an implicit contract that the user will only call `allocateBlocksToBatch()` once for a given batch time.  If we're being super-defensive against mis-use, I suppose we could add an assertion for this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4029][Streaming] Update streaming drive...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3026#discussion_r19696557
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala ---
    @@ -60,22 +56,35 @@ abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingCont
     
       /** Ask ReceiverInputTracker for received data blocks and generates RDDs with them. */
       override def compute(validTime: Time): Option[RDD[T]] = {
    -    // If this is called for any time before the start time of the context,
    -    // then this returns an empty RDD. This may happen when recovering from a
    -    // master failure
    -    if (validTime >= graph.startTime) {
    -      val blockInfo = ssc.scheduler.receiverTracker.getReceivedBlockInfo(id)
    -      receivedBlockInfo(validTime) = blockInfo
    -      val blockIds = blockInfo.map { _.blockStoreResult.blockId.asInstanceOf[BlockId] }
    -      Some(new BlockRDD[T](ssc.sc, blockIds))
    -    } else {
    -      Some(new BlockRDD[T](ssc.sc, Array.empty))
    +    val blockRDD = {
    +      if (validTime >= graph.startTime) {
    +        val blockStoreResults = getReceivedBlockInfo(validTime).map { _.blockStoreResult }
    +        val blockIds = blockStoreResults.map { _.blockId.asInstanceOf[BlockId] }.toArray
    --- End diff --
    
    You need because `Array[StreamBlockIds]` cannot be assigned to a variable of type `Array[BlockIds]`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4029][Streaming] Update streaming drive...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3026#discussion_r19694569
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala ---
    @@ -48,23 +49,28 @@ private[streaming] case class DeregisterReceiver(streamId: Int, msg: String, err
      * This class manages the execution of the receivers of NetworkInputDStreams. Instance of
      * this class must be created after all input streams have been added and StreamingContext.start()
      * has been called because it needs the final set of input streams at the time of instantiation.
    + *
    + * @param skipReceiverLaunch Do not launch the receiver. This is useful for testing.
      */
     private[streaming]
    -class ReceiverTracker(ssc: StreamingContext) extends Logging {
    +class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false) extends Logging {
     
    -  val receiverInputStreams = ssc.graph.getReceiverInputStreams()
    -  val receiverInputStreamMap = Map(receiverInputStreams.map(x => (x.id, x)): _*)
    -  val receiverExecutor = new ReceiverLauncher()
    -  val receiverInfo = new HashMap[Int, ReceiverInfo] with SynchronizedMap[Int, ReceiverInfo]
    -  val receivedBlockInfo = new HashMap[Int, SynchronizedQueue[ReceivedBlockInfo]]
    -    with SynchronizedMap[Int, SynchronizedQueue[ReceivedBlockInfo]]
    -  val timeout = AkkaUtils.askTimeout(ssc.conf)
    -  val listenerBus = ssc.scheduler.listenerBus
    +  private val receiverInputStreams = ssc.graph.getReceiverInputStreams()
    +  private val receiverInputStreamMap = Map(receiverInputStreams.map(x => (x.id, x)): _*)
    --- End diff --
    
    Just removed this. Realized that it is not needed. Replaced it with just a sequence of stream ids.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org