You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by tdas <gi...@git.apache.org> on 2014/10/21 22:06:07 UTC

[GitHub] spark pull request: [SPARK-4026][Streaming] synchronously write re...

GitHub user tdas opened a pull request:

    https://github.com/apache/spark/pull/2882

    [SPARK-4026][Streaming] synchronously write received data to HDFS and recover on driver failure

    As part of the effort to avoid data loss on Spark Streaming driver failure, we want to implement a write ahead log that can write received data to HDFS. This allows the received data to be persist across driver failures. So when the streaming driver is restarted, it can find and reprocess all the data that were received but not processed.
    
    This was primarily implemented by @harishreedharan. This is still WIP, as he is going to improve the unitests by using HDFS mini cluster.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tdas/spark driver-ha-wal

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/2882.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2882
    
----
commit 172358de10a61f296e52fa347c2e40aa87490ecf
Author: Tathagata Das <ta...@gmail.com>
Date:   2014-10-21T02:52:55Z

    Pulled WriteAheadLog-related stuff from tdas/spark/tree/driver-ha-working

commit 5182ffb3053a143f221f1e56ed21e2461b4d9e4f
Author: Hari Shreedharan <hs...@apache.org>
Date:   2014-10-21T19:59:38Z

    Added documentation

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on the pull request:

    https://github.com/apache/spark/pull/2882#issuecomment-60040386
  
    This is nice work overall.  I like the thorough tests, especially the decoupling of the writer / reader tests so that you can test the components separately and as part of the complete log manager system.  I left a few comments on the code, but I have a couple of high-level questions, too:
    
    When is it safe to rotate / delete old logs?  In general, it seems like safe log compaction / deletion is application-specific and that a simple time-based mechanism might be unsafe.
    
    What would happen if Spark Streaming crashed, stayed down for some multiple of the threshold time, then recovered?  Would we read this old portion of the log or would it be deleted / ignored?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2882#discussion_r19195339
  
    --- Diff: streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala ---
    @@ -0,0 +1,298 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.streaming.util
    +
    +import java.io.{DataInputStream, File, FileInputStream, RandomAccessFile}
    +import java.nio.ByteBuffer
    +
    +import scala.collection.mutable.ArrayBuffer
    +import scala.concurrent.duration._
    +import scala.language.implicitConversions
    +import scala.language.postfixOps
    +import scala.util.Random
    +
    +import WriteAheadLogSuite._
    +import com.google.common.io.Files
    +import org.apache.commons.io.FileUtils
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.spark.util.Utils
    +import org.scalatest.{BeforeAndAfter, FunSuite}
    +import org.scalatest.concurrent.Eventually._
    +
    +/**
    + * This testsuite tests all classes related to write ahead logs.
    + */
    +class WriteAheadLogSuite extends FunSuite with BeforeAndAfter {
    +
    +  val hadoopConf = new Configuration()
    +  var tempDirectory: File = null
    +
    +  before {
    +    tempDirectory = Files.createTempDir()
    +  }
    +
    +  after {
    +    if (tempDirectory != null && tempDirectory.exists()) {
    +      FileUtils.deleteDirectory(tempDirectory)
    +      tempDirectory = null
    +    }
    +  }
    +
    +  test("WriteAheadLogWriter - writing data") {
    +    val file = new File(tempDirectory, Random.nextString(10))
    --- End diff --
    
    Minor nitpick, but it looks like the tests use a few different techniques for creating files in the temp directory:
    
    - `val file = new File(tempDirectory, Random.nextString(10))`
    - `val file = File.createTempFile("TestSequentialReads", "", tempDirectory)`
    - `val file = new File(tempDirectory, "TestWriter")`
    
    For consistency, we should pick one method.
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

Posted by jerryshao <gi...@git.apache.org>.
Github user jerryshao commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2882#discussion_r19190251
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/util/FileSegment.scala ---
    @@ -0,0 +1,19 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.streaming.util
    +
    +private[streaming] case class FileSegment (path: String, offset: Long, length: Int)
    --- End diff --
    
    Seems we also have the same case class `FileSegment` in core/storage module. can we just use that one, or we have to build our own?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2882#discussion_r19195658
  
    --- Diff: streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala ---
    @@ -0,0 +1,298 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.streaming.util
    +
    +import java.io.{DataInputStream, File, FileInputStream, RandomAccessFile}
    +import java.nio.ByteBuffer
    +
    +import scala.collection.mutable.ArrayBuffer
    +import scala.concurrent.duration._
    +import scala.language.implicitConversions
    +import scala.language.postfixOps
    +import scala.util.Random
    +
    +import WriteAheadLogSuite._
    +import com.google.common.io.Files
    +import org.apache.commons.io.FileUtils
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.spark.util.Utils
    +import org.scalatest.{BeforeAndAfter, FunSuite}
    +import org.scalatest.concurrent.Eventually._
    +
    +/**
    + * This testsuite tests all classes related to write ahead logs.
    + */
    +class WriteAheadLogSuite extends FunSuite with BeforeAndAfter {
    +
    +  val hadoopConf = new Configuration()
    +  var tempDirectory: File = null
    +
    +  before {
    +    tempDirectory = Files.createTempDir()
    +  }
    +
    +  after {
    +    if (tempDirectory != null && tempDirectory.exists()) {
    +      FileUtils.deleteDirectory(tempDirectory)
    +      tempDirectory = null
    +    }
    +  }
    +
    +  test("WriteAheadLogWriter - writing data") {
    +    val file = new File(tempDirectory, Random.nextString(10))
    +    val dataToWrite = generateRandomData()
    +    val writer = new WriteAheadLogWriter("file:///" + file, hadoopConf)
    +    val segments = dataToWrite.map(data => writer.write(data))
    +    writer.close()
    +    val writtenData = readDataManually(file, segments)
    +    assert(writtenData.toArray === dataToWrite.toArray)
    +  }
    +
    +  test("WriteAheadLogWriter - syncing of data by writing and reading immediately") {
    +    val file = new File(tempDirectory, Random.nextString(10))
    +    val dataToWrite = generateRandomData()
    +    val writer = new WriteAheadLogWriter("file:///" + file, hadoopConf)
    +    dataToWrite.foreach { data =>
    +      val segment = writer.write(data)
    +      assert(readDataManually(file, Seq(segment)).head === data)
    +    }
    +    writer.close()
    +  }
    +
    +  test("WriteAheadLogReader - sequentially reading data") {
    +    // Write data manually for testing the sequential reader
    +    val file = File.createTempFile("TestSequentialReads", "", tempDirectory)
    +    val writtenData = generateRandomData()
    +    writeDataManually(writtenData, file)
    +    val reader = new WriteAheadLogReader("file:///" + file.toString, hadoopConf)
    +    val readData = reader.toSeq.map(byteBufferToString)
    +    assert(readData.toList === writtenData.toList)
    +    assert(reader.hasNext === false)
    +    intercept[Exception] {
    +      reader.next()
    +    }
    +    reader.close()
    +  }
    +
    +  test("WriteAheadLogReader - sequentially reading data written with writer") {
    +    // Write data manually for testing the sequential reader
    +    val file = new File(tempDirectory, "TestWriter")
    +    val dataToWrite = generateRandomData()
    +    writeDataUsingWriter(file, dataToWrite)
    +    val iter = dataToWrite.iterator
    +    val reader = new WriteAheadLogReader("file:///" + file.toString, hadoopConf)
    +    reader.foreach { byteBuffer =>
    +      assert(byteBufferToString(byteBuffer) === iter.next())
    +    }
    +    reader.close()
    +  }
    +
    +  test("WriteAheadLogRandomReader - reading data using random reader") {
    +    // Write data manually for testing the random reader
    +    val file = File.createTempFile("TestRandomReads", "", tempDirectory)
    +    val writtenData = generateRandomData()
    +    val segments = writeDataManually(writtenData, file)
    +
    +    // Get a random order of these segments and read them back
    +    val writtenDataAndSegments = writtenData.zip(segments).toSeq.permutations.take(10).flatten
    +    val reader = new WriteAheadLogRandomReader("file:///" + file.toString, hadoopConf)
    +    writtenDataAndSegments.foreach { case (data, segment) =>
    +      assert(data === byteBufferToString(reader.read(segment)))
    +    }
    +    reader.close()
    +  }
    +
    +  test("WriteAheadLogRandomReader - reading data using random reader written with writer") {
    +    // Write data using writer for testing the random reader
    +    val file = new File(tempDirectory, "TestRandomReads")
    +    val data = generateRandomData()
    +    val segments = writeDataUsingWriter(file, data)
    +
    +    // Read a random sequence of segments and verify read data
    +    val dataAndSegments = data.zip(segments).toSeq.permutations.take(10).flatten
    +    val reader = new WriteAheadLogRandomReader("file:///" + file.toString, hadoopConf)
    +    dataAndSegments.foreach { case(data, segment) =>
    +      assert(data === byteBufferToString(reader.read(segment)))
    +    }
    +    reader.close()
    +  }
    +
    +  test("WriteAheadLogManager - write rotating logs") {
    +    // Write data using manager
    +    val dataToWrite = generateRandomData(10)
    +    writeDataUsingManager(tempDirectory, dataToWrite)
    +
    +    // Read data manually to verify the written data
    +    val logFiles = getLogFilesInDirectory(tempDirectory)
    +    assert(logFiles.size > 1)
    +    val writtenData = logFiles.flatMap { file => readDataManually(file) }
    +    assert(writtenData.toList === dataToWrite.toList)
    +  }
    +
    +  test("WriteAheadLogManager - read rotating logs") {
    +    // Write data manually for testing reading through manager
    +    val writtenData = (1 to 10).map { i =>
    +      val data = generateRandomData(10)
    +      val file = new File(tempDirectory, s"log-$i-${i + 1}")
    +      writeDataManually(data, file)
    +      data
    +    }.flatten
    +
    +    // Read data using manager and verify
    +    val readData = readDataUsingManager(tempDirectory)
    +    assert(readData.toList === writtenData.toList)
    +  }
    +
    +  test("WriteAheadLogManager - recover past logs when creating new manager") {
    +    // Write data with manager, recover with new manager and verify
    +    val dataToWrite = generateRandomData(100)
    +    writeDataUsingManager(tempDirectory, dataToWrite)
    +    val logFiles = getLogFilesInDirectory(tempDirectory)
    +    assert(logFiles.size > 1)
    +    val readData = readDataUsingManager(tempDirectory)
    +    assert(dataToWrite.toList === readData.toList)
    +  }
    +
    +  test("WriteAheadLogManager - cleanup old logs") {
    +    // Write data with manager, recover with new manager and verify
    +    val dataToWrite = generateRandomData(100)
    +    val fakeClock = new ManualClock
    +    val manager = new WriteAheadLogManager(tempDirectory.toString, hadoopConf,
    +      rollingIntervalSecs = 1, callerName = "WriteAheadLogSuite", clock = fakeClock)
    +    dataToWrite.foreach { item =>
    +      fakeClock.addToTime(500) // half second for each
    +      manager.writeToLog(item)
    +    }
    +    val logFiles = getLogFilesInDirectory(tempDirectory)
    +    assert(logFiles.size > 1)
    +    manager.cleanupOldLogs(fakeClock.currentTime() / 2)
    +    eventually(timeout(1 second), interval(10 milliseconds)) {
    +      assert(getLogFilesInDirectory(tempDirectory).size < logFiles.size)
    +    }
    +  }
    +
    +  // TODO (Hari, TD): Test different failure conditions of writers and readers.
    +  //  - Failure while reading incomplete/corrupt file
    +}
    +
    +object WriteAheadLogSuite {
    +
    +  private val hadoopConf = new Configuration()
    +
    +  /**
    +   * Write data to the file and returns the an array of the bytes written.
    +   * This is used to test the WAL reader independently of the WAL writer.
    +   */
    +  def writeDataManually(data: Seq[String], file: File): Seq[FileSegment] = {
    +    val segments = new ArrayBuffer[FileSegment]()
    +    val writer = new RandomAccessFile(file, "rw")
    +    data.foreach { item =>
    +      val offset = writer.getFilePointer()
    +      val bytes = Utils.serialize(item)
    +      writer.writeInt(bytes.size)
    +      writer.write(bytes)
    +      segments += FileSegment(file.toString, offset, bytes.size)
    +    }
    +    writer.close()
    +    segments
    +  }
    +
    +  def writeDataUsingWriter(file: File, data: Seq[String]): Seq[FileSegment] = {
    +    val writer = new WriteAheadLogWriter(file.toString, hadoopConf)
    +    val segments = data.map {
    +      item => writer.write(item)
    +    }
    +    writer.close()
    +    segments
    +  }
    +
    +  def writeDataUsingManager(logDirectory: File, data: Seq[String]) {
    +    val fakeClock = new ManualClock
    +    val manager = new WriteAheadLogManager(logDirectory.toString, hadoopConf,
    +      rollingIntervalSecs = 1, callerName = "WriteAheadLogSuite", clock = fakeClock)
    +    data.foreach { item =>
    +      fakeClock.addToTime(500)
    +      manager.writeToLog(item)
    +    }
    +    manager.stop()
    +  }
    +
    +  /**
    +   * Read data from the given segments of log file and returns the read list of byte buffers.
    +   * This is used to test the WAL writer independently of the WAL reader.
    +   */
    +  def readDataManually(file: File, segments: Seq[FileSegment]): Seq[String] = {
    +    val reader = new RandomAccessFile(file, "r")
    +    segments.map { x =>
    +      reader.seek(x.offset)
    +      val data = new Array[Byte](x.length)
    +      reader.readInt()
    +      reader.readFully(data)
    +      Utils.deserialize[String](data)
    +    }
    +  }
    +
    +  def readDataManually(file: File): Seq[String] = {
    +    val reader = new DataInputStream(new FileInputStream(file))
    +    val buffer = new ArrayBuffer[String]
    +    try {
    +      while (reader.available > 0) {
    +        val length = reader.readInt()
    +        val bytes = new Array[Byte](length)
    +        reader.read(bytes)
    +        buffer += Utils.deserialize[String](bytes)
    +      }
    +    } finally {
    +      reader.close()
    +    }
    +    buffer
    +  }
    +
    +  def readDataUsingManager(logDirectory: File): Seq[String] = {
    +    val manager = new WriteAheadLogManager(logDirectory.toString, hadoopConf,
    +      callerName = "WriteAheadLogSuite")
    +    val data = manager.readFromLog().map(byteBufferToString).toSeq
    +    manager.stop()
    +    data
    +  }
    +
    +  def generateRandomData(numItems: Int = 50, itemSize: Int = 50): Seq[String] = {
    +    (1 to numItems).map { _.toString }
    +  }
    +
    +  def getLogFilesInDirectory(directory: File): Seq[File] = {
    +    if (directory.exists) {
    +      directory.listFiles().filter(_.getName().startsWith("log-"))
    +        .sortBy(_.getName.split("-")(1).toLong)
    +    } else {
    +      Seq.empty
    +    }
    +  }
    +
    +  def printData(data: Seq[String]) {
    --- End diff --
    
    This is unused.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2882#issuecomment-60219269
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22068/consoleFull) for   PR 2882 at commit [`9514dc8`](https://github.com/apache/spark/commit/9514dc833c9c30be12eeb64fb4580c2e6f1adb4f).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `  case class LogInfo(startTime: Long, endTime: Long, path: String)`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the pull request:

    https://github.com/apache/spark/pull/2882#issuecomment-60431494
  
    Cool! Thanks for check with @cmccabe. Merging this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2882#discussion_r19195551
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala ---
    @@ -0,0 +1,72 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.streaming.util
    +
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.fs.{FSDataInputStream, FSDataOutputStream, Path}
    +
    +private[streaming] object HdfsUtils {
    +
    +  def getOutputStream(path: String, conf: Configuration): FSDataOutputStream = {
    +    // HDFS is not thread-safe when getFileSystem is called, so synchronize on that
    +
    +    val dfsPath = new Path(path)
    +    val dfs =
    +      this.synchronized {
    +        dfsPath.getFileSystem(conf)
    +      }
    +    // If the file exists and we have append support, append instead of creating a new file
    +    val stream: FSDataOutputStream = {
    +      if (dfs.isFile(dfsPath)) {
    +        if (conf.getBoolean("hdfs.append.support", false)) {
    +          dfs.append(dfsPath)
    +        } else {
    +          throw new IllegalStateException("File exists and there is no append support!")
    +        }
    +      } else {
    +        dfs.create(dfsPath)
    +      }
    +    }
    +    stream
    +  }
    +
    +  def getInputStream(path: String, conf: Configuration): FSDataInputStream = {
    +    val dfsPath = new Path(path)
    +    val dfs = this.synchronized {
    +      dfsPath.getFileSystem(conf)
    +    }
    +    val instream = dfs.open(dfsPath)
    +    instream
    +  }
    +
    +  def checkState(state: Boolean, errorMsg: => String) {
    +    if(!state) {
    +      throw new IllegalStateException(errorMsg)
    +    }
    +  }
    +
    +  def getBlockLocations(path: String, conf: Configuration): Option[Array[String]] = {
    +    val dfsPath = new Path(path)
    +    val dfs =
    --- End diff --
    
    Same here; I'd move the next line onto this line.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2882#issuecomment-60171298
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22046/consoleFull) for   PR 2882 at commit [`ef8db09`](https://github.com/apache/spark/commit/ef8db09075ab6d7e29a9e988bb83af16f3c553ca).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `  case class LogInfo(startTime: Long, endTime: Long, path: String)`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2882#issuecomment-60177673
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22049/consoleFull) for   PR 2882 at commit [`eb356ca`](https://github.com/apache/spark/commit/eb356caa2bfc2ea580f656c979ce372cffe91195).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/2882#issuecomment-60338152
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/22103/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2882#discussion_r19197745
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala ---
    @@ -0,0 +1,223 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.streaming.util
    +
    +import java.nio.ByteBuffer
    +
    +import scala.collection.mutable.ArrayBuffer
    +import scala.concurrent.{ExecutionContext, Future}
    +
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.fs.Path
    +import org.apache.hadoop.fs.permission.FsPermission
    +import org.apache.spark.Logging
    +import org.apache.spark.util.Utils
    +import WriteAheadLogManager._
    +
    +/**
    + * This class manages write ahead log files.
    + * - Writes records (bytebuffers) to periodically rotating log files.
    + * - Recovers the log files and the reads the recovered records upon failures.
    + * - Cleans up old log files.
    + *
    + * Uses [[org.apache.spark.streaming.util.WriteAheadLogWriter]] to write
    + * and [[org.apache.spark.streaming.util.WriteAheadLogReader]] to read.
    + *
    + *@param logDirectory Directory when rotating log files will be created.
    + * @param hadoopConf Hadoop configuration for reading/writing log files.
    + * @param rollingIntervalSecs The interval in seconds with which logs will be rolled over.
    + *                            Default is one minute.
    + * @param maxFailures Max number of failures that is tolerated for every attempt to write to log.
    + *                    Default is three.
    + * @param callerName Optional name of the class who is using this manager.
    + * @param clock Optional clock that is used to check for rotation interval.
    + */
    +private[streaming] class WriteAheadLogManager(
    +    logDirectory: String,
    +    hadoopConf: Configuration,
    +    rollingIntervalSecs: Int = 60,
    +    maxFailures: Int = 3,
    +    callerName: String = "",
    +    clock: Clock = new SystemClock
    +  ) extends Logging {
    +
    +  private val pastLogs = new ArrayBuffer[LogInfo]
    +  private val callerNameTag =
    +    if (callerName != null && callerName.nonEmpty) s" for $callerName" else ""
    +  private val threadpoolName = s"WriteAheadLogManager $callerNameTag"
    +  implicit private val executionContext = ExecutionContext.fromExecutorService(
    +    Utils.newDaemonFixedThreadPool(1, threadpoolName))
    +  override protected val logName = s"WriteAheadLogManager $callerNameTag"
    +
    +  private var currentLogPath: String = null
    +  private var currentLogWriter: WriteAheadLogWriter = null
    +  private var currentLogWriterStartTime: Long = -1L
    +  private var currentLogWriterStopTime: Long = -1L
    +
    +  initializeOrRecover()
    +
    +  /** Write a byte buffer to the log file */
    +  def writeToLog(byteBuffer: ByteBuffer): FileSegment = synchronized {
    +    var fileSegment: FileSegment = null
    +    var failures = 0
    +    var lastException: Exception = null
    +    var succeeded = false
    +    while (!succeeded && failures < maxFailures) {
    +      try {
    +        fileSegment = getLogWriter(clock.currentTime).write(byteBuffer)
    +        succeeded = true
    +      } catch {
    +        case ex: Exception =>
    +          lastException = ex
    +          logWarning("Failed to write to write ahead log")
    +          resetWriter()
    +          failures += 1
    +      }
    +    }
    +    if (fileSegment == null) {
    +      logError(s"Failed to write to write ahead log after $failures failures")
    +      throw lastException
    +    }
    +    fileSegment
    +  }
    +
    +  /**
    +   * Read all the existing logs from the log directory.
    +   *
    +   * Note that this is typically called when the caller is initializing and wants
    +   * to recover past  state from the write ahead logs (that is, before making any writes).
    +   * If this is called after writes have been made using this manager, then it may not return
    +   * the latest the records. This does not deal with currently active log files, and
    +   * hence the implementation is kept simple.
    +   */
    +  def readFromLog(): Iterator[ByteBuffer] = synchronized {
    +    val logFilesToRead = pastLogs.map{ _.path} ++ Option(currentLogPath)
    +    logInfo("Reading from the logs: " + logFilesToRead.mkString("\n"))
    +    logFilesToRead.iterator.map { file =>
    +        logDebug(s"Creating log reader with $file")
    +        new WriteAheadLogReader(file, hadoopConf)
    +    } flatMap { x => x }
    --- End diff --
    
    Tried, for some reason cant do it. Iterator does not have flatten. 
    http://www.scala-lang.org/api/current/index.html#scala.collection.Iterator


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2882#discussion_r19318068
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala ---
    @@ -0,0 +1,71 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.streaming.util
    +
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.fs.{LocalFileSystem, FSDataInputStream, FSDataOutputStream, Path}
    +
    +private[streaming] object HdfsUtils {
    +
    +  def getOutputStream(path: String, conf: Configuration): FSDataOutputStream = {
    +    // HDFS is not thread-safe when getFileSystem is called, so synchronize on that
    +    val dfsPath = new Path(path)
    +    val dfs = getFileSystemForPath(dfsPath, conf)
    +    // If the file exists and we have append support, append instead of creating a new file
    +    val stream: FSDataOutputStream = {
    +      if (dfs.isFile(dfsPath)) {
    +        if (conf.getBoolean("hdfs.append.support", false)) {
    +          dfs.append(dfsPath)
    +        } else {
    +          throw new IllegalStateException("File exists and there is no append support!")
    +        }
    +      } else {
    +        dfs.create(dfsPath)
    +      }
    +    }
    +    stream
    +  }
    +
    +  def getInputStream(path: String, conf: Configuration): FSDataInputStream = {
    +    val dfsPath = new Path(path)
    +    val dfs = getFileSystemForPath(dfsPath, conf)
    +    val instream = dfs.open(dfsPath)
    +    instream
    +  }
    +
    +  def checkState(state: Boolean, errorMsg: => String) {
    +    if (!state) {
    +      throw new IllegalStateException(errorMsg)
    +    }
    +  }
    +
    +  def getBlockLocations(path: String, conf: Configuration): Option[Array[String]] = {
    +    val dfsPath = new Path(path)
    +    val dfs = getFileSystemForPath(dfsPath, conf)
    +    val fileStatus = dfs.getFileStatus(dfsPath)
    +    val blockLocs = Option(dfs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen))
    +    blockLocs.map(_.flatMap(_.getHosts))
    +  }
    +
    +  def getFileSystemForPath(path: Path, conf: Configuration) = synchronized {
    +    val fs = path.getFileSystem(conf)
    --- End diff --
    
    Based on the old comment, does this need synchronization?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2882#issuecomment-60213859
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22068/consoleFull) for   PR 2882 at commit [`9514dc8`](https://github.com/apache/spark/commit/9514dc833c9c30be12eeb64fb4580c2e6f1adb4f).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

Posted by harishreedharan <gi...@git.apache.org>.
Github user harishreedharan commented on the pull request:

    https://github.com/apache/spark/pull/2882#issuecomment-60184378
  
    Even after sbt clean I am getting the same error:
    ```[error] /Users/hshreedharan/work/spark-mvn/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala:34: object MiniDFSCluster is not a member of package org.apache.hadoop.hdfs
    [error] import org.apache.hadoop.hdfs.MiniDFSCluster
    [error]        ^
    [error] /Users/hshreedharan/work/spark-mvn/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala:46: not found: type MiniDFSCluster
    [error]   val cluster = new MiniDFSCluster(new Configuration, 2, true, null)
    [error]                     ^
    [info] Resolving org.fusesource.jansi#jansi;1.4 ...
    [error] two errors found```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2882#issuecomment-60004008
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22006/consoleFull) for   PR 2882 at commit [`4ab602a`](https://github.com/apache/spark/commit/4ab602a0074a2144d33367229358c19d079798d8).
     * This patch **passes all tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `  case class LogInfo(startTime: Long, endTime: Long, path: String)`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2882#discussion_r19197947
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogWriter.scala ---
    @@ -0,0 +1,101 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.streaming.util
    +
    +import java.io._
    +import java.net.URI
    +import java.nio.ByteBuffer
    +
    +import scala.util.Try
    +
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.fs.{FSDataOutputStream, FileSystem}
    +
    +/**
    + * A writer for writing byte-buffers to a write ahead log file.
    + */
    +private[streaming] class WriteAheadLogWriter(path: String, hadoopConf: Configuration)
    +  extends Closeable {
    +  private val underlyingStream: Either[DataOutputStream, FSDataOutputStream] = {
    +    val uri = new URI(path)
    +    val defaultFs = FileSystem.getDefaultUri(hadoopConf).getScheme
    +    val isDefaultLocal = defaultFs == null || defaultFs == "file"
    +
    +    if ((isDefaultLocal && uri.getScheme == null) || uri.getScheme == "file") {
    +      assert(!new File(uri.getPath).exists)
    +      Left(new DataOutputStream(new BufferedOutputStream(new FileOutputStream(uri.getPath))))
    +    } else {
    +      Right(HdfsUtils.getOutputStream(path, hadoopConf))
    +    }
    +  }
    +
    +  private lazy val hadoopFlushMethod = {
    +    val cls = classOf[FSDataOutputStream]
    +    Try(cls.getMethod("hflush")).orElse(Try(cls.getMethod("sync"))).toOption
    --- End diff --
    
    Credit goes to Colin McCabe who wrote this line. 
    https://github.com/apache/spark/blame/master/core/src/main/scala/org/apache/spark/util/FileLogger.scala#L106
    Stole from there.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2882#discussion_r19193881
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogReader.scala ---
    @@ -0,0 +1,82 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.streaming.util
    +
    +import java.io.{Closeable, EOFException}
    +import java.nio.ByteBuffer
    +
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.spark.Logging
    +
    +/**
    + * A reader for reading write ahead log files written using
    + * [[org.apache.spark.streaming.util.WriteAheadLogWriter]]. This reads
    + * the records (bytebuffers) in the log file sequentially and return them as an
    + * iterator of bytebuffers.
    + */
    +private[streaming] class WriteAheadLogReader(path: String, conf: Configuration)
    +  extends Iterator[ByteBuffer] with Closeable with Logging {
    +
    +  private val instream = HdfsUtils.getInputStream(path, conf)
    +  private var closed = false
    +  private var nextItem: Option[ByteBuffer] = None
    +
    +  override def hasNext: Boolean = synchronized {
    +    if (closed) {
    +      return false
    +    }
    +
    +    if (nextItem.isDefined) { // handle the case where hasNext is called without calling next
    +      true
    +    } else {
    +      try {
    +        val length = instream.readInt()
    +        val buffer = new Array[Byte](length)
    +        instream.readFully(buffer)
    +        nextItem = Some(ByteBuffer.wrap(buffer))
    +        logTrace("Read next item " + nextItem.get)
    +        true
    +      } catch {
    +        case e: EOFException =>
    +          logDebug("Error reading next item, EOF reached", e)
    +          close()
    +          false
    +        case e: Exception =>
    +          logDebug("Error reading next item, EOF reached", e)
    --- End diff --
    
    This error message should probably be different in order to distinguish it from the EOFException case.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2882#discussion_r19197871
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala ---
    @@ -0,0 +1,223 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.streaming.util
    +
    +import java.nio.ByteBuffer
    +
    +import scala.collection.mutable.ArrayBuffer
    +import scala.concurrent.{ExecutionContext, Future}
    +
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.fs.Path
    +import org.apache.hadoop.fs.permission.FsPermission
    +import org.apache.spark.Logging
    +import org.apache.spark.util.Utils
    +import WriteAheadLogManager._
    +
    +/**
    + * This class manages write ahead log files.
    + * - Writes records (bytebuffers) to periodically rotating log files.
    + * - Recovers the log files and the reads the recovered records upon failures.
    + * - Cleans up old log files.
    + *
    + * Uses [[org.apache.spark.streaming.util.WriteAheadLogWriter]] to write
    + * and [[org.apache.spark.streaming.util.WriteAheadLogReader]] to read.
    + *
    + *@param logDirectory Directory when rotating log files will be created.
    + * @param hadoopConf Hadoop configuration for reading/writing log files.
    + * @param rollingIntervalSecs The interval in seconds with which logs will be rolled over.
    + *                            Default is one minute.
    + * @param maxFailures Max number of failures that is tolerated for every attempt to write to log.
    + *                    Default is three.
    + * @param callerName Optional name of the class who is using this manager.
    + * @param clock Optional clock that is used to check for rotation interval.
    + */
    +private[streaming] class WriteAheadLogManager(
    +    logDirectory: String,
    +    hadoopConf: Configuration,
    +    rollingIntervalSecs: Int = 60,
    +    maxFailures: Int = 3,
    +    callerName: String = "",
    +    clock: Clock = new SystemClock
    +  ) extends Logging {
    +
    +  private val pastLogs = new ArrayBuffer[LogInfo]
    +  private val callerNameTag =
    +    if (callerName != null && callerName.nonEmpty) s" for $callerName" else ""
    +  private val threadpoolName = s"WriteAheadLogManager $callerNameTag"
    +  implicit private val executionContext = ExecutionContext.fromExecutorService(
    +    Utils.newDaemonFixedThreadPool(1, threadpoolName))
    +  override protected val logName = s"WriteAheadLogManager $callerNameTag"
    +
    +  private var currentLogPath: String = null
    +  private var currentLogWriter: WriteAheadLogWriter = null
    +  private var currentLogWriterStartTime: Long = -1L
    +  private var currentLogWriterStopTime: Long = -1L
    +
    +  initializeOrRecover()
    +
    +  /** Write a byte buffer to the log file */
    +  def writeToLog(byteBuffer: ByteBuffer): FileSegment = synchronized {
    +    var fileSegment: FileSegment = null
    +    var failures = 0
    +    var lastException: Exception = null
    +    var succeeded = false
    +    while (!succeeded && failures < maxFailures) {
    +      try {
    +        fileSegment = getLogWriter(clock.currentTime).write(byteBuffer)
    +        succeeded = true
    +      } catch {
    +        case ex: Exception =>
    +          lastException = ex
    +          logWarning("Failed to write to write ahead log")
    +          resetWriter()
    +          failures += 1
    +      }
    +    }
    +    if (fileSegment == null) {
    +      logError(s"Failed to write to write ahead log after $failures failures")
    +      throw lastException
    +    }
    +    fileSegment
    +  }
    +
    +  /**
    +   * Read all the existing logs from the log directory.
    +   *
    +   * Note that this is typically called when the caller is initializing and wants
    +   * to recover past  state from the write ahead logs (that is, before making any writes).
    +   * If this is called after writes have been made using this manager, then it may not return
    +   * the latest the records. This does not deal with currently active log files, and
    +   * hence the implementation is kept simple.
    +   */
    +  def readFromLog(): Iterator[ByteBuffer] = synchronized {
    +    val logFilesToRead = pastLogs.map{ _.path} ++ Option(currentLogPath)
    +    logInfo("Reading from the logs: " + logFilesToRead.mkString("\n"))
    +    logFilesToRead.iterator.map { file =>
    +        logDebug(s"Creating log reader with $file")
    +        new WriteAheadLogReader(file, hadoopConf)
    +    } flatMap { x => x }
    +  }
    +
    +  /**
    +   * Delete the log files that are older than the threshold time.
    +   *
    +   * Its important to note that the threshold time is based on the time stamps used in the log
    +   * files, which is usually based on the local system time. So if there is coordination necessary
    +   * between the node calculating the threshTime (say, driver node), and the local system time
    +   * (say, worker node), the caller has to take account of possible time skew.
    +   */
    +  def cleanupOldLogs(threshTime: Long): Unit = {
    +    val oldLogFiles = synchronized { pastLogs.filter { _.endTime < threshTime } }
    +    logInfo(s"Attempting to clear ${oldLogFiles.size} old log files in $logDirectory " +
    +      s"older than $threshTime: ${oldLogFiles.map { _.path }.mkString("\n")}")
    +
    +    def deleteFiles() {
    +      oldLogFiles.foreach { logInfo =>
    +        try {
    +          val path = new Path(logInfo.path)
    +          val fs = hadoopConf.synchronized { path.getFileSystem(hadoopConf) }
    +          fs.delete(path, true)
    +          synchronized { pastLogs -= logInfo }
    +          logDebug(s"Cleared log file $logInfo")
    +        } catch {
    +          case ex: Exception =>
    +            logWarning(s"Error clearing write ahead log file $logInfo", ex)
    +        }
    +      }
    +      logInfo(s"Cleared log files in $logDirectory older than $threshTime")
    +    }
    +    if (!executionContext.isShutdown) {
    +      Future { deleteFiles() }
    +    }
    +  }
    +
    +  /** Stop the manager, close any open log writer */
    +  def stop(): Unit = synchronized {
    +    if (currentLogWriter != null) {
    +      currentLogWriter.close()
    +    }
    +    executionContext.shutdown()
    +    logInfo("Stopped write ahead log manager")
    +  }
    +
    +  /** Get the current log writer while taking care of rotation */
    +  private def getLogWriter(currentTime: Long): WriteAheadLogWriter = synchronized {
    +    if (currentLogWriter == null || currentTime > currentLogWriterStopTime) {
    +      resetWriter()
    +      if (currentLogPath != null) {
    +        pastLogs += LogInfo(currentLogWriterStartTime, currentLogWriterStopTime, currentLogPath)
    +      }
    +      currentLogWriterStartTime = currentTime
    +      currentLogWriterStopTime = currentTime + (rollingIntervalSecs * 1000)
    +      val newLogPath = new Path(logDirectory,
    +        timeToLogFile(currentLogWriterStartTime, currentLogWriterStopTime))
    +      currentLogPath = newLogPath.toString
    +      currentLogWriter = new WriteAheadLogWriter(currentLogPath, hadoopConf)
    +    }
    +    currentLogWriter
    +  }
    +
    +  /** Initialize the log directory or recover existing logs inside the directory */
    +  private def initializeOrRecover(): Unit = synchronized {
    +    val logDirectoryPath = new Path(logDirectory)
    +    val fileSystem = logDirectoryPath.getFileSystem(hadoopConf)
    +
    +    if (fileSystem.exists(logDirectoryPath) && fileSystem.getFileStatus(logDirectoryPath).isDir) {
    +      val logFileInfo = logFilesTologInfo(fileSystem.listStatus(logDirectoryPath).map { _.getPath })
    +      pastLogs.clear()
    +      pastLogs ++= logFileInfo
    +      logInfo(s"Recovered ${logFileInfo.size} write ahead log files from $logDirectory")
    +      logDebug(s"Recovered files are:\n${logFileInfo.map(_.path).mkString("\n")}")
    +    } else {
    +      fileSystem.mkdirs(logDirectoryPath,
    +        FsPermission.createImmutable(Integer.parseInt("770", 8).toShort))
    --- End diff --
    
    I copied the permissions from a similar functionality
    https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala#L145


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2882#issuecomment-60338151
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22103/consoleFull) for   PR 2882 at commit [`e4bee20`](https://github.com/apache/spark/commit/e4bee2065293d7373c43fe5636dd9971dede257e).
     * This patch **passes all tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `  case class LogInfo(startTime: Long, endTime: Long, path: String)`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2882#issuecomment-60286438
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22075/consoleFull) for   PR 2882 at commit [`d29fddd`](https://github.com/apache/spark/commit/d29fddd880fd7efec8ed05017a12600bcb2aa829).
     * This patch **passes all tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `  case class LogInfo(startTime: Long, endTime: Long, path: String)`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/2882#issuecomment-60286445
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/22075/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2882#discussion_r19318676
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala ---
    @@ -0,0 +1,71 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.streaming.util
    +
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.fs.{LocalFileSystem, FSDataInputStream, FSDataOutputStream, Path}
    +
    +private[streaming] object HdfsUtils {
    +
    +  def getOutputStream(path: String, conf: Configuration): FSDataOutputStream = {
    +    // HDFS is not thread-safe when getFileSystem is called, so synchronize on that
    +    val dfsPath = new Path(path)
    +    val dfs = getFileSystemForPath(dfsPath, conf)
    +    // If the file exists and we have append support, append instead of creating a new file
    +    val stream: FSDataOutputStream = {
    +      if (dfs.isFile(dfsPath)) {
    +        if (conf.getBoolean("hdfs.append.support", false)) {
    +          dfs.append(dfsPath)
    +        } else {
    +          throw new IllegalStateException("File exists and there is no append support!")
    +        }
    +      } else {
    +        dfs.create(dfsPath)
    +      }
    +    }
    +    stream
    +  }
    +
    +  def getInputStream(path: String, conf: Configuration): FSDataInputStream = {
    +    val dfsPath = new Path(path)
    +    val dfs = getFileSystemForPath(dfsPath, conf)
    +    val instream = dfs.open(dfsPath)
    +    instream
    +  }
    +
    +  def checkState(state: Boolean, errorMsg: => String) {
    +    if (!state) {
    +      throw new IllegalStateException(errorMsg)
    +    }
    +  }
    +
    +  def getBlockLocations(path: String, conf: Configuration): Option[Array[String]] = {
    +    val dfsPath = new Path(path)
    +    val dfs = getFileSystemForPath(dfsPath, conf)
    +    val fileStatus = dfs.getFileStatus(dfsPath)
    +    val blockLocs = Option(dfs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen))
    +    blockLocs.map(_.flatMap(_.getHosts))
    +  }
    +
    +  def getFileSystemForPath(path: Path, conf: Configuration) = synchronized {
    +    val fs = path.getFileSystem(conf)
    --- End diff --
    
    @aarondav, is this that same file system issue that you mentioned to me?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2882#issuecomment-60180202
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22049/consoleFull) for   PR 2882 at commit [`eb356ca`](https://github.com/apache/spark/commit/eb356caa2bfc2ea580f656c979ce372cffe91195).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `  case class LogInfo(startTime: Long, endTime: Long, path: String)`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2882#discussion_r19193639
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala ---
    @@ -0,0 +1,223 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.streaming.util
    +
    +import java.nio.ByteBuffer
    +
    +import scala.collection.mutable.ArrayBuffer
    +import scala.concurrent.{ExecutionContext, Future}
    +
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.fs.Path
    +import org.apache.hadoop.fs.permission.FsPermission
    +import org.apache.spark.Logging
    +import org.apache.spark.util.Utils
    +import WriteAheadLogManager._
    +
    +/**
    + * This class manages write ahead log files.
    + * - Writes records (bytebuffers) to periodically rotating log files.
    + * - Recovers the log files and the reads the recovered records upon failures.
    + * - Cleans up old log files.
    + *
    + * Uses [[org.apache.spark.streaming.util.WriteAheadLogWriter]] to write
    + * and [[org.apache.spark.streaming.util.WriteAheadLogReader]] to read.
    + *
    + *@param logDirectory Directory when rotating log files will be created.
    + * @param hadoopConf Hadoop configuration for reading/writing log files.
    + * @param rollingIntervalSecs The interval in seconds with which logs will be rolled over.
    + *                            Default is one minute.
    + * @param maxFailures Max number of failures that is tolerated for every attempt to write to log.
    + *                    Default is three.
    + * @param callerName Optional name of the class who is using this manager.
    + * @param clock Optional clock that is used to check for rotation interval.
    + */
    +private[streaming] class WriteAheadLogManager(
    +    logDirectory: String,
    +    hadoopConf: Configuration,
    +    rollingIntervalSecs: Int = 60,
    +    maxFailures: Int = 3,
    +    callerName: String = "",
    +    clock: Clock = new SystemClock
    +  ) extends Logging {
    +
    +  private val pastLogs = new ArrayBuffer[LogInfo]
    +  private val callerNameTag =
    +    if (callerName != null && callerName.nonEmpty) s" for $callerName" else ""
    +  private val threadpoolName = s"WriteAheadLogManager $callerNameTag"
    +  implicit private val executionContext = ExecutionContext.fromExecutorService(
    +    Utils.newDaemonFixedThreadPool(1, threadpoolName))
    +  override protected val logName = s"WriteAheadLogManager $callerNameTag"
    +
    +  private var currentLogPath: String = null
    +  private var currentLogWriter: WriteAheadLogWriter = null
    +  private var currentLogWriterStartTime: Long = -1L
    +  private var currentLogWriterStopTime: Long = -1L
    +
    +  initializeOrRecover()
    +
    +  /** Write a byte buffer to the log file */
    +  def writeToLog(byteBuffer: ByteBuffer): FileSegment = synchronized {
    +    var fileSegment: FileSegment = null
    +    var failures = 0
    +    var lastException: Exception = null
    +    var succeeded = false
    --- End diff --
    
    I think that `succeeded` implies `fileSegment != null` and vice-versa, so you probably don't need this variable.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

Posted by harishreedharan <gi...@git.apache.org>.
Github user harishreedharan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2882#discussion_r19241867
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala ---
    @@ -0,0 +1,223 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.streaming.util
    +
    +import java.nio.ByteBuffer
    +
    +import scala.collection.mutable.ArrayBuffer
    +import scala.concurrent.{ExecutionContext, Future}
    +
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.fs.Path
    +import org.apache.hadoop.fs.permission.FsPermission
    +import org.apache.spark.Logging
    +import org.apache.spark.util.Utils
    +import WriteAheadLogManager._
    +
    +/**
    + * This class manages write ahead log files.
    + * - Writes records (bytebuffers) to periodically rotating log files.
    + * - Recovers the log files and the reads the recovered records upon failures.
    + * - Cleans up old log files.
    + *
    + * Uses [[org.apache.spark.streaming.util.WriteAheadLogWriter]] to write
    + * and [[org.apache.spark.streaming.util.WriteAheadLogReader]] to read.
    + *
    + *@param logDirectory Directory when rotating log files will be created.
    + * @param hadoopConf Hadoop configuration for reading/writing log files.
    + * @param rollingIntervalSecs The interval in seconds with which logs will be rolled over.
    + *                            Default is one minute.
    + * @param maxFailures Max number of failures that is tolerated for every attempt to write to log.
    + *                    Default is three.
    + * @param callerName Optional name of the class who is using this manager.
    + * @param clock Optional clock that is used to check for rotation interval.
    + */
    +private[streaming] class WriteAheadLogManager(
    +    logDirectory: String,
    +    hadoopConf: Configuration,
    +    rollingIntervalSecs: Int = 60,
    +    maxFailures: Int = 3,
    +    callerName: String = "",
    +    clock: Clock = new SystemClock
    +  ) extends Logging {
    +
    +  private val pastLogs = new ArrayBuffer[LogInfo]
    +  private val callerNameTag =
    +    if (callerName != null && callerName.nonEmpty) s" for $callerName" else ""
    +  private val threadpoolName = s"WriteAheadLogManager $callerNameTag"
    +  implicit private val executionContext = ExecutionContext.fromExecutorService(
    +    Utils.newDaemonFixedThreadPool(1, threadpoolName))
    +  override protected val logName = s"WriteAheadLogManager $callerNameTag"
    +
    +  private var currentLogPath: String = null
    +  private var currentLogWriter: WriteAheadLogWriter = null
    +  private var currentLogWriterStartTime: Long = -1L
    +  private var currentLogWriterStopTime: Long = -1L
    +
    +  initializeOrRecover()
    +
    +  /** Write a byte buffer to the log file */
    +  def writeToLog(byteBuffer: ByteBuffer): FileSegment = synchronized {
    +    var fileSegment: FileSegment = null
    +    var failures = 0
    +    var lastException: Exception = null
    +    var succeeded = false
    +    while (!succeeded && failures < maxFailures) {
    +      try {
    +        fileSegment = getLogWriter(clock.currentTime).write(byteBuffer)
    +        succeeded = true
    +      } catch {
    +        case ex: Exception =>
    +          lastException = ex
    +          logWarning("Failed to write to write ahead log")
    +          resetWriter()
    +          failures += 1
    +      }
    +    }
    +    if (fileSegment == null) {
    +      logError(s"Failed to write to write ahead log after $failures failures")
    +      throw lastException
    +    }
    +    fileSegment
    +  }
    +
    +  /**
    +   * Read all the existing logs from the log directory.
    +   *
    +   * Note that this is typically called when the caller is initializing and wants
    +   * to recover past  state from the write ahead logs (that is, before making any writes).
    +   * If this is called after writes have been made using this manager, then it may not return
    +   * the latest the records. This does not deal with currently active log files, and
    +   * hence the implementation is kept simple.
    +   */
    +  def readFromLog(): Iterator[ByteBuffer] = synchronized {
    +    val logFilesToRead = pastLogs.map{ _.path} ++ Option(currentLogPath)
    +    logInfo("Reading from the logs: " + logFilesToRead.mkString("\n"))
    +    logFilesToRead.iterator.map { file =>
    +        logDebug(s"Creating log reader with $file")
    +        new WriteAheadLogReader(file, hadoopConf)
    +    } flatMap { x => x }
    +  }
    +
    +  /**
    +   * Delete the log files that are older than the threshold time.
    +   *
    +   * Its important to note that the threshold time is based on the time stamps used in the log
    +   * files, which is usually based on the local system time. So if there is coordination necessary
    +   * between the node calculating the threshTime (say, driver node), and the local system time
    +   * (say, worker node), the caller has to take account of possible time skew.
    +   */
    +  def cleanupOldLogs(threshTime: Long): Unit = {
    +    val oldLogFiles = synchronized { pastLogs.filter { _.endTime < threshTime } }
    +    logInfo(s"Attempting to clear ${oldLogFiles.size} old log files in $logDirectory " +
    +      s"older than $threshTime: ${oldLogFiles.map { _.path }.mkString("\n")}")
    +
    +    def deleteFiles() {
    +      oldLogFiles.foreach { logInfo =>
    +        try {
    +          val path = new Path(logInfo.path)
    +          val fs = hadoopConf.synchronized { path.getFileSystem(hadoopConf) }
    +          fs.delete(path, true)
    +          synchronized { pastLogs -= logInfo }
    +          logDebug(s"Cleared log file $logInfo")
    +        } catch {
    +          case ex: Exception =>
    +            logWarning(s"Error clearing write ahead log file $logInfo", ex)
    +        }
    +      }
    +      logInfo(s"Cleared log files in $logDirectory older than $threshTime")
    +    }
    +    if (!executionContext.isShutdown) {
    +      Future { deleteFiles() }
    +    }
    +  }
    +
    +  /** Stop the manager, close any open log writer */
    +  def stop(): Unit = synchronized {
    +    if (currentLogWriter != null) {
    +      currentLogWriter.close()
    +    }
    +    executionContext.shutdown()
    +    logInfo("Stopped write ahead log manager")
    +  }
    +
    +  /** Get the current log writer while taking care of rotation */
    +  private def getLogWriter(currentTime: Long): WriteAheadLogWriter = synchronized {
    +    if (currentLogWriter == null || currentTime > currentLogWriterStopTime) {
    +      resetWriter()
    +      if (currentLogPath != null) {
    +        pastLogs += LogInfo(currentLogWriterStartTime, currentLogWriterStopTime, currentLogPath)
    +      }
    +      currentLogWriterStartTime = currentTime
    +      currentLogWriterStopTime = currentTime + (rollingIntervalSecs * 1000)
    +      val newLogPath = new Path(logDirectory,
    +        timeToLogFile(currentLogWriterStartTime, currentLogWriterStopTime))
    +      currentLogPath = newLogPath.toString
    +      currentLogWriter = new WriteAheadLogWriter(currentLogPath, hadoopConf)
    +    }
    +    currentLogWriter
    +  }
    +
    +  /** Initialize the log directory or recover existing logs inside the directory */
    +  private def initializeOrRecover(): Unit = synchronized {
    +    val logDirectoryPath = new Path(logDirectory)
    +    val fileSystem = logDirectoryPath.getFileSystem(hadoopConf)
    +
    +    if (fileSystem.exists(logDirectoryPath) && fileSystem.getFileStatus(logDirectoryPath).isDir) {
    +      val logFileInfo = logFilesTologInfo(fileSystem.listStatus(logDirectoryPath).map { _.getPath })
    +      pastLogs.clear()
    +      pastLogs ++= logFileInfo
    +      logInfo(s"Recovered ${logFileInfo.size} write ahead log files from $logDirectory")
    +      logDebug(s"Recovered files are:\n${logFileInfo.map(_.path).mkString("\n")}")
    +    } else {
    +      fileSystem.mkdirs(logDirectoryPath,
    +        FsPermission.createImmutable(Integer.parseInt("770", 8).toShort))
    --- End diff --
    
    For now I am removing the mkdirs call. When running on minicluster it fails. Also we don't actually need to create a directory with HDFS. If we simply create a file in the directory, HDFS will create the directory with perms for the current user to read/write.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

Posted by harishreedharan <gi...@git.apache.org>.
Github user harishreedharan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2882#discussion_r19195267
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogWriter.scala ---
    @@ -0,0 +1,101 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.streaming.util
    +
    +import java.io._
    +import java.net.URI
    +import java.nio.ByteBuffer
    +
    +import scala.util.Try
    +
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.fs.{FSDataOutputStream, FileSystem}
    +
    +/**
    + * A writer for writing byte-buffers to a write ahead log file.
    + */
    +private[streaming] class WriteAheadLogWriter(path: String, hadoopConf: Configuration)
    +  extends Closeable {
    +  private val underlyingStream: Either[DataOutputStream, FSDataOutputStream] = {
    --- End diff --
    
    Yep. And for all tests, we are just going to use Hadoop Minicluster anyway.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on the pull request:

    https://github.com/apache/spark/pull/2882#issuecomment-60332121
  
    This looks good to me!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4026][Streaming] synchronously write re...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2882#issuecomment-59990368
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22002/consoleFull) for   PR 2882 at commit [`5182ffb`](https://github.com/apache/spark/commit/5182ffb3053a143f221f1e56ed21e2461b4d9e4f).
     * This patch **fails RAT tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `  case class LogInfo(startTime: Long, endTime: Long, path: String)`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2882#discussion_r19197515
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala ---
    @@ -0,0 +1,223 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.streaming.util
    +
    +import java.nio.ByteBuffer
    +
    +import scala.collection.mutable.ArrayBuffer
    +import scala.concurrent.{ExecutionContext, Future}
    +
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.fs.Path
    +import org.apache.hadoop.fs.permission.FsPermission
    +import org.apache.spark.Logging
    +import org.apache.spark.util.Utils
    +import WriteAheadLogManager._
    +
    +/**
    + * This class manages write ahead log files.
    + * - Writes records (bytebuffers) to periodically rotating log files.
    + * - Recovers the log files and the reads the recovered records upon failures.
    + * - Cleans up old log files.
    + *
    + * Uses [[org.apache.spark.streaming.util.WriteAheadLogWriter]] to write
    + * and [[org.apache.spark.streaming.util.WriteAheadLogReader]] to read.
    + *
    + *@param logDirectory Directory when rotating log files will be created.
    + * @param hadoopConf Hadoop configuration for reading/writing log files.
    + * @param rollingIntervalSecs The interval in seconds with which logs will be rolled over.
    + *                            Default is one minute.
    + * @param maxFailures Max number of failures that is tolerated for every attempt to write to log.
    + *                    Default is three.
    + * @param callerName Optional name of the class who is using this manager.
    + * @param clock Optional clock that is used to check for rotation interval.
    + */
    +private[streaming] class WriteAheadLogManager(
    +    logDirectory: String,
    +    hadoopConf: Configuration,
    +    rollingIntervalSecs: Int = 60,
    +    maxFailures: Int = 3,
    +    callerName: String = "",
    +    clock: Clock = new SystemClock
    +  ) extends Logging {
    +
    +  private val pastLogs = new ArrayBuffer[LogInfo]
    +  private val callerNameTag =
    +    if (callerName != null && callerName.nonEmpty) s" for $callerName" else ""
    +  private val threadpoolName = s"WriteAheadLogManager $callerNameTag"
    +  implicit private val executionContext = ExecutionContext.fromExecutorService(
    +    Utils.newDaemonFixedThreadPool(1, threadpoolName))
    +  override protected val logName = s"WriteAheadLogManager $callerNameTag"
    +
    +  private var currentLogPath: String = null
    +  private var currentLogWriter: WriteAheadLogWriter = null
    +  private var currentLogWriterStartTime: Long = -1L
    +  private var currentLogWriterStopTime: Long = -1L
    +
    +  initializeOrRecover()
    +
    +  /** Write a byte buffer to the log file */
    --- End diff --
    
    Yes, I agree. Will made it more clear. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2882#discussion_r19197661
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala ---
    @@ -0,0 +1,223 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.streaming.util
    +
    +import java.nio.ByteBuffer
    +
    +import scala.collection.mutable.ArrayBuffer
    +import scala.concurrent.{ExecutionContext, Future}
    +
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.fs.Path
    +import org.apache.hadoop.fs.permission.FsPermission
    +import org.apache.spark.Logging
    +import org.apache.spark.util.Utils
    +import WriteAheadLogManager._
    +
    +/**
    + * This class manages write ahead log files.
    + * - Writes records (bytebuffers) to periodically rotating log files.
    + * - Recovers the log files and the reads the recovered records upon failures.
    + * - Cleans up old log files.
    + *
    + * Uses [[org.apache.spark.streaming.util.WriteAheadLogWriter]] to write
    + * and [[org.apache.spark.streaming.util.WriteAheadLogReader]] to read.
    + *
    + *@param logDirectory Directory when rotating log files will be created.
    + * @param hadoopConf Hadoop configuration for reading/writing log files.
    + * @param rollingIntervalSecs The interval in seconds with which logs will be rolled over.
    + *                            Default is one minute.
    + * @param maxFailures Max number of failures that is tolerated for every attempt to write to log.
    + *                    Default is three.
    + * @param callerName Optional name of the class who is using this manager.
    + * @param clock Optional clock that is used to check for rotation interval.
    + */
    +private[streaming] class WriteAheadLogManager(
    +    logDirectory: String,
    +    hadoopConf: Configuration,
    +    rollingIntervalSecs: Int = 60,
    +    maxFailures: Int = 3,
    +    callerName: String = "",
    +    clock: Clock = new SystemClock
    +  ) extends Logging {
    +
    +  private val pastLogs = new ArrayBuffer[LogInfo]
    +  private val callerNameTag =
    +    if (callerName != null && callerName.nonEmpty) s" for $callerName" else ""
    +  private val threadpoolName = s"WriteAheadLogManager $callerNameTag"
    +  implicit private val executionContext = ExecutionContext.fromExecutorService(
    +    Utils.newDaemonFixedThreadPool(1, threadpoolName))
    +  override protected val logName = s"WriteAheadLogManager $callerNameTag"
    +
    +  private var currentLogPath: String = null
    +  private var currentLogWriter: WriteAheadLogWriter = null
    +  private var currentLogWriterStartTime: Long = -1L
    +  private var currentLogWriterStopTime: Long = -1L
    +
    +  initializeOrRecover()
    +
    +  /** Write a byte buffer to the log file */
    +  def writeToLog(byteBuffer: ByteBuffer): FileSegment = synchronized {
    +    var fileSegment: FileSegment = null
    +    var failures = 0
    +    var lastException: Exception = null
    +    var succeeded = false
    --- End diff --
    
    I had thought so, and I usually do it. But for reading I felt its easier to understand `if (!succeeded ...` . Am happy to change it.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

Posted by harishreedharan <gi...@git.apache.org>.
Github user harishreedharan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2882#discussion_r19242178
  
    --- Diff: streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala ---
    @@ -0,0 +1,298 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.streaming.util
    +
    +import java.io.{DataInputStream, File, FileInputStream, RandomAccessFile}
    +import java.nio.ByteBuffer
    +
    +import scala.collection.mutable.ArrayBuffer
    +import scala.concurrent.duration._
    +import scala.language.implicitConversions
    +import scala.language.postfixOps
    +import scala.util.Random
    +
    +import WriteAheadLogSuite._
    +import com.google.common.io.Files
    +import org.apache.commons.io.FileUtils
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.spark.util.Utils
    +import org.scalatest.{BeforeAndAfter, FunSuite}
    +import org.scalatest.concurrent.Eventually._
    +
    +/**
    + * This testsuite tests all classes related to write ahead logs.
    + */
    +class WriteAheadLogSuite extends FunSuite with BeforeAndAfter {
    +
    +  val hadoopConf = new Configuration()
    +  var tempDirectory: File = null
    +
    +  before {
    +    tempDirectory = Files.createTempDir()
    +  }
    +
    +  after {
    +    if (tempDirectory != null && tempDirectory.exists()) {
    +      FileUtils.deleteDirectory(tempDirectory)
    +      tempDirectory = null
    +    }
    +  }
    +
    +  test("WriteAheadLogWriter - writing data") {
    +    val file = new File(tempDirectory, Random.nextString(10))
    +    val dataToWrite = generateRandomData()
    +    val writer = new WriteAheadLogWriter("file:///" + file, hadoopConf)
    +    val segments = dataToWrite.map(data => writer.write(data))
    +    writer.close()
    +    val writtenData = readDataManually(file, segments)
    +    assert(writtenData.toArray === dataToWrite.toArray)
    +  }
    +
    +  test("WriteAheadLogWriter - syncing of data by writing and reading immediately") {
    +    val file = new File(tempDirectory, Random.nextString(10))
    +    val dataToWrite = generateRandomData()
    +    val writer = new WriteAheadLogWriter("file:///" + file, hadoopConf)
    +    dataToWrite.foreach { data =>
    +      val segment = writer.write(data)
    +      assert(readDataManually(file, Seq(segment)).head === data)
    +    }
    +    writer.close()
    +  }
    +
    +  test("WriteAheadLogReader - sequentially reading data") {
    +    // Write data manually for testing the sequential reader
    +    val file = File.createTempFile("TestSequentialReads", "", tempDirectory)
    +    val writtenData = generateRandomData()
    +    writeDataManually(writtenData, file)
    +    val reader = new WriteAheadLogReader("file:///" + file.toString, hadoopConf)
    +    val readData = reader.toSeq.map(byteBufferToString)
    +    assert(readData.toList === writtenData.toList)
    +    assert(reader.hasNext === false)
    +    intercept[Exception] {
    +      reader.next()
    +    }
    +    reader.close()
    +  }
    +
    +  test("WriteAheadLogReader - sequentially reading data written with writer") {
    +    // Write data manually for testing the sequential reader
    +    val file = new File(tempDirectory, "TestWriter")
    +    val dataToWrite = generateRandomData()
    +    writeDataUsingWriter(file, dataToWrite)
    +    val iter = dataToWrite.iterator
    +    val reader = new WriteAheadLogReader("file:///" + file.toString, hadoopConf)
    +    reader.foreach { byteBuffer =>
    +      assert(byteBufferToString(byteBuffer) === iter.next())
    +    }
    +    reader.close()
    +  }
    +
    +  test("WriteAheadLogRandomReader - reading data using random reader") {
    +    // Write data manually for testing the random reader
    +    val file = File.createTempFile("TestRandomReads", "", tempDirectory)
    +    val writtenData = generateRandomData()
    +    val segments = writeDataManually(writtenData, file)
    +
    +    // Get a random order of these segments and read them back
    +    val writtenDataAndSegments = writtenData.zip(segments).toSeq.permutations.take(10).flatten
    +    val reader = new WriteAheadLogRandomReader("file:///" + file.toString, hadoopConf)
    +    writtenDataAndSegments.foreach { case (data, segment) =>
    +      assert(data === byteBufferToString(reader.read(segment)))
    +    }
    +    reader.close()
    +  }
    +
    +  test("WriteAheadLogRandomReader - reading data using random reader written with writer") {
    +    // Write data using writer for testing the random reader
    +    val file = new File(tempDirectory, "TestRandomReads")
    +    val data = generateRandomData()
    +    val segments = writeDataUsingWriter(file, data)
    +
    +    // Read a random sequence of segments and verify read data
    +    val dataAndSegments = data.zip(segments).toSeq.permutations.take(10).flatten
    +    val reader = new WriteAheadLogRandomReader("file:///" + file.toString, hadoopConf)
    +    dataAndSegments.foreach { case(data, segment) =>
    +      assert(data === byteBufferToString(reader.read(segment)))
    +    }
    +    reader.close()
    +  }
    +
    +  test("WriteAheadLogManager - write rotating logs") {
    +    // Write data using manager
    +    val dataToWrite = generateRandomData(10)
    +    writeDataUsingManager(tempDirectory, dataToWrite)
    +
    +    // Read data manually to verify the written data
    +    val logFiles = getLogFilesInDirectory(tempDirectory)
    +    assert(logFiles.size > 1)
    +    val writtenData = logFiles.flatMap { file => readDataManually(file) }
    +    assert(writtenData.toList === dataToWrite.toList)
    +  }
    +
    +  test("WriteAheadLogManager - read rotating logs") {
    +    // Write data manually for testing reading through manager
    +    val writtenData = (1 to 10).map { i =>
    +      val data = generateRandomData(10)
    +      val file = new File(tempDirectory, s"log-$i-${i + 1}")
    +      writeDataManually(data, file)
    +      data
    +    }.flatten
    +
    +    // Read data using manager and verify
    +    val readData = readDataUsingManager(tempDirectory)
    +    assert(readData.toList === writtenData.toList)
    +  }
    +
    +  test("WriteAheadLogManager - recover past logs when creating new manager") {
    +    // Write data with manager, recover with new manager and verify
    +    val dataToWrite = generateRandomData(100)
    +    writeDataUsingManager(tempDirectory, dataToWrite)
    +    val logFiles = getLogFilesInDirectory(tempDirectory)
    +    assert(logFiles.size > 1)
    +    val readData = readDataUsingManager(tempDirectory)
    +    assert(dataToWrite.toList === readData.toList)
    +  }
    +
    +  test("WriteAheadLogManager - cleanup old logs") {
    +    // Write data with manager, recover with new manager and verify
    +    val dataToWrite = generateRandomData(100)
    +    val fakeClock = new ManualClock
    +    val manager = new WriteAheadLogManager(tempDirectory.toString, hadoopConf,
    +      rollingIntervalSecs = 1, callerName = "WriteAheadLogSuite", clock = fakeClock)
    +    dataToWrite.foreach { item =>
    +      fakeClock.addToTime(500) // half second for each
    +      manager.writeToLog(item)
    +    }
    +    val logFiles = getLogFilesInDirectory(tempDirectory)
    +    assert(logFiles.size > 1)
    +    manager.cleanupOldLogs(fakeClock.currentTime() / 2)
    +    eventually(timeout(1 second), interval(10 milliseconds)) {
    +      assert(getLogFilesInDirectory(tempDirectory).size < logFiles.size)
    +    }
    +  }
    +
    +  // TODO (Hari, TD): Test different failure conditions of writers and readers.
    +  //  - Failure while reading incomplete/corrupt file
    +}
    +
    +object WriteAheadLogSuite {
    +
    +  private val hadoopConf = new Configuration()
    +
    +  /**
    +   * Write data to the file and returns the an array of the bytes written.
    +   * This is used to test the WAL reader independently of the WAL writer.
    +   */
    +  def writeDataManually(data: Seq[String], file: File): Seq[FileSegment] = {
    +    val segments = new ArrayBuffer[FileSegment]()
    +    val writer = new RandomAccessFile(file, "rw")
    +    data.foreach { item =>
    +      val offset = writer.getFilePointer()
    +      val bytes = Utils.serialize(item)
    +      writer.writeInt(bytes.size)
    +      writer.write(bytes)
    +      segments += FileSegment(file.toString, offset, bytes.size)
    +    }
    +    writer.close()
    +    segments
    +  }
    +
    +  def writeDataUsingWriter(file: File, data: Seq[String]): Seq[FileSegment] = {
    +    val writer = new WriteAheadLogWriter(file.toString, hadoopConf)
    +    val segments = data.map {
    +      item => writer.write(item)
    +    }
    +    writer.close()
    +    segments
    +  }
    +
    +  def writeDataUsingManager(logDirectory: File, data: Seq[String]) {
    +    val fakeClock = new ManualClock
    +    val manager = new WriteAheadLogManager(logDirectory.toString, hadoopConf,
    +      rollingIntervalSecs = 1, callerName = "WriteAheadLogSuite", clock = fakeClock)
    +    data.foreach { item =>
    +      fakeClock.addToTime(500)
    +      manager.writeToLog(item)
    +    }
    +    manager.stop()
    +  }
    +
    +  /**
    +   * Read data from the given segments of log file and returns the read list of byte buffers.
    +   * This is used to test the WAL writer independently of the WAL reader.
    +   */
    +  def readDataManually(file: File, segments: Seq[FileSegment]): Seq[String] = {
    +    val reader = new RandomAccessFile(file, "r")
    +    segments.map { x =>
    +      reader.seek(x.offset)
    +      val data = new Array[Byte](x.length)
    +      reader.readInt()
    +      reader.readFully(data)
    +      Utils.deserialize[String](data)
    +    }
    +  }
    +
    +  def readDataManually(file: File): Seq[String] = {
    +    val reader = new DataInputStream(new FileInputStream(file))
    +    val buffer = new ArrayBuffer[String]
    +    try {
    +      while (reader.available > 0) {
    +        val length = reader.readInt()
    +        val bytes = new Array[Byte](length)
    +        reader.read(bytes)
    +        buffer += Utils.deserialize[String](bytes)
    +      }
    +    } finally {
    +      reader.close()
    +    }
    +    buffer
    +  }
    +
    +  def readDataUsingManager(logDirectory: File): Seq[String] = {
    +    val manager = new WriteAheadLogManager(logDirectory.toString, hadoopConf,
    +      callerName = "WriteAheadLogSuite")
    +    val data = manager.readFromLog().map(byteBufferToString).toSeq
    +    manager.stop()
    +    data
    +  }
    +
    +  def generateRandomData(numItems: Int = 50, itemSize: Int = 50): Seq[String] = {
    +    (1 to numItems).map { _.toString }
    +  }
    +
    +  def getLogFilesInDirectory(directory: File): Seq[File] = {
    +    if (directory.exists) {
    +      directory.listFiles().filter(_.getName().startsWith("log-"))
    +        .sortBy(_.getName.split("-")(1).toLong)
    +    } else {
    +      Seq.empty
    +    }
    +  }
    +
    +  def printData(data: Seq[String]) {
    --- End diff --
    
    Done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

Posted by harishreedharan <gi...@git.apache.org>.
Github user harishreedharan commented on the pull request:

    https://github.com/apache/spark/pull/2882#issuecomment-60183932
  
    1 is not an issue at all. mvn will exclude any dependencies whose versions are hard-coded. So in any profile, minicluster will pull in the correct versions, and we don't bundle it since we don't actually package it. Exclusions don't come in the way - which is why maven build works fine (see Flume for example - we build against any arbitrary HDFS version and use minicluster in our tests without issues. There are other projects which build against arbitrary HDFS versions and still use the minicluster).
    
    I don't like (2) - we made test fixes which we didn't see in the local tests. I'd rather keep it.
    
    This definitely seems like sbt-mvn resolver related issue. Since the tests are fine on mvn, it looks like new top-level dependencies are somehow not getting pulled in. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2882#discussion_r19318338
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala ---
    @@ -0,0 +1,224 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.streaming.util
    +
    +import java.nio.ByteBuffer
    +
    +import scala.collection.mutable.ArrayBuffer
    +import scala.concurrent.{ExecutionContext, Future}
    +
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.fs.Path
    +import org.apache.hadoop.fs.permission.FsPermission
    +import org.apache.spark.Logging
    +import org.apache.spark.util.Utils
    +import WriteAheadLogManager._
    +
    +/**
    + * This class manages write ahead log files.
    + * - Writes records (bytebuffers) to periodically rotating log files.
    + * - Recovers the log files and the reads the recovered records upon failures.
    + * - Cleans up old log files.
    + *
    + * Uses [[org.apache.spark.streaming.util.WriteAheadLogWriter]] to write
    + * and [[org.apache.spark.streaming.util.WriteAheadLogReader]] to read.
    + *
    + *@param logDirectory Directory when rotating log files will be created.
    + * @param hadoopConf Hadoop configuration for reading/writing log files.
    + * @param rollingIntervalSecs The interval in seconds with which logs will be rolled over.
    + *                            Default is one minute.
    + * @param maxFailures Max number of failures that is tolerated for every attempt to write to log.
    + *                    Default is three.
    + * @param callerName Optional name of the class who is using this manager.
    + * @param clock Optional clock that is used to check for rotation interval.
    + */
    +private[streaming] class WriteAheadLogManager(
    +    logDirectory: String,
    +    hadoopConf: Configuration,
    +    rollingIntervalSecs: Int = 60,
    +    maxFailures: Int = 3,
    +    callerName: String = "",
    +    clock: Clock = new SystemClock
    +  ) extends Logging {
    +
    +  private val pastLogs = new ArrayBuffer[LogInfo]
    +  private val callerNameTag =
    +    if (callerName != null && callerName.nonEmpty) s" for $callerName" else ""
    --- End diff --
    
    If someone explicitly passes null. But checking that can be avoided.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2882#discussion_r19198008
  
    --- Diff: streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala ---
    @@ -0,0 +1,298 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.streaming.util
    +
    +import java.io.{DataInputStream, File, FileInputStream, RandomAccessFile}
    +import java.nio.ByteBuffer
    +
    +import scala.collection.mutable.ArrayBuffer
    +import scala.concurrent.duration._
    +import scala.language.implicitConversions
    +import scala.language.postfixOps
    +import scala.util.Random
    +
    +import WriteAheadLogSuite._
    +import com.google.common.io.Files
    +import org.apache.commons.io.FileUtils
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.spark.util.Utils
    +import org.scalatest.{BeforeAndAfter, FunSuite}
    +import org.scalatest.concurrent.Eventually._
    +
    +/**
    + * This testsuite tests all classes related to write ahead logs.
    + */
    +class WriteAheadLogSuite extends FunSuite with BeforeAndAfter {
    +
    +  val hadoopConf = new Configuration()
    +  var tempDirectory: File = null
    +
    +  before {
    +    tempDirectory = Files.createTempDir()
    +  }
    +
    +  after {
    +    if (tempDirectory != null && tempDirectory.exists()) {
    +      FileUtils.deleteDirectory(tempDirectory)
    +      tempDirectory = null
    +    }
    +  }
    +
    +  test("WriteAheadLogWriter - writing data") {
    +    val file = new File(tempDirectory, Random.nextString(10))
    +    val dataToWrite = generateRandomData()
    +    val writer = new WriteAheadLogWriter("file:///" + file, hadoopConf)
    +    val segments = dataToWrite.map(data => writer.write(data))
    +    writer.close()
    +    val writtenData = readDataManually(file, segments)
    +    assert(writtenData.toArray === dataToWrite.toArray)
    +  }
    +
    +  test("WriteAheadLogWriter - syncing of data by writing and reading immediately") {
    +    val file = new File(tempDirectory, Random.nextString(10))
    +    val dataToWrite = generateRandomData()
    +    val writer = new WriteAheadLogWriter("file:///" + file, hadoopConf)
    +    dataToWrite.foreach { data =>
    +      val segment = writer.write(data)
    +      assert(readDataManually(file, Seq(segment)).head === data)
    +    }
    +    writer.close()
    +  }
    +
    +  test("WriteAheadLogReader - sequentially reading data") {
    +    // Write data manually for testing the sequential reader
    +    val file = File.createTempFile("TestSequentialReads", "", tempDirectory)
    +    val writtenData = generateRandomData()
    +    writeDataManually(writtenData, file)
    +    val reader = new WriteAheadLogReader("file:///" + file.toString, hadoopConf)
    +    val readData = reader.toSeq.map(byteBufferToString)
    +    assert(readData.toList === writtenData.toList)
    +    assert(reader.hasNext === false)
    +    intercept[Exception] {
    +      reader.next()
    +    }
    +    reader.close()
    +  }
    +
    +  test("WriteAheadLogReader - sequentially reading data written with writer") {
    +    // Write data manually for testing the sequential reader
    +    val file = new File(tempDirectory, "TestWriter")
    +    val dataToWrite = generateRandomData()
    +    writeDataUsingWriter(file, dataToWrite)
    +    val iter = dataToWrite.iterator
    +    val reader = new WriteAheadLogReader("file:///" + file.toString, hadoopConf)
    +    reader.foreach { byteBuffer =>
    +      assert(byteBufferToString(byteBuffer) === iter.next())
    +    }
    +    reader.close()
    +  }
    +
    +  test("WriteAheadLogRandomReader - reading data using random reader") {
    +    // Write data manually for testing the random reader
    +    val file = File.createTempFile("TestRandomReads", "", tempDirectory)
    +    val writtenData = generateRandomData()
    +    val segments = writeDataManually(writtenData, file)
    +
    +    // Get a random order of these segments and read them back
    +    val writtenDataAndSegments = writtenData.zip(segments).toSeq.permutations.take(10).flatten
    +    val reader = new WriteAheadLogRandomReader("file:///" + file.toString, hadoopConf)
    +    writtenDataAndSegments.foreach { case (data, segment) =>
    +      assert(data === byteBufferToString(reader.read(segment)))
    +    }
    +    reader.close()
    +  }
    +
    +  test("WriteAheadLogRandomReader - reading data using random reader written with writer") {
    +    // Write data using writer for testing the random reader
    +    val file = new File(tempDirectory, "TestRandomReads")
    +    val data = generateRandomData()
    +    val segments = writeDataUsingWriter(file, data)
    +
    +    // Read a random sequence of segments and verify read data
    +    val dataAndSegments = data.zip(segments).toSeq.permutations.take(10).flatten
    +    val reader = new WriteAheadLogRandomReader("file:///" + file.toString, hadoopConf)
    +    dataAndSegments.foreach { case(data, segment) =>
    --- End diff --
    
    after or before?
    It should be 
    `dataAndSegments.foreach { case (data, segment) =>`
    isnt it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2882#discussion_r19193790
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala ---
    @@ -0,0 +1,223 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.streaming.util
    +
    +import java.nio.ByteBuffer
    +
    +import scala.collection.mutable.ArrayBuffer
    +import scala.concurrent.{ExecutionContext, Future}
    +
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.fs.Path
    +import org.apache.hadoop.fs.permission.FsPermission
    +import org.apache.spark.Logging
    +import org.apache.spark.util.Utils
    +import WriteAheadLogManager._
    +
    +/**
    + * This class manages write ahead log files.
    + * - Writes records (bytebuffers) to periodically rotating log files.
    + * - Recovers the log files and the reads the recovered records upon failures.
    + * - Cleans up old log files.
    + *
    + * Uses [[org.apache.spark.streaming.util.WriteAheadLogWriter]] to write
    + * and [[org.apache.spark.streaming.util.WriteAheadLogReader]] to read.
    + *
    + *@param logDirectory Directory when rotating log files will be created.
    + * @param hadoopConf Hadoop configuration for reading/writing log files.
    + * @param rollingIntervalSecs The interval in seconds with which logs will be rolled over.
    + *                            Default is one minute.
    + * @param maxFailures Max number of failures that is tolerated for every attempt to write to log.
    + *                    Default is three.
    + * @param callerName Optional name of the class who is using this manager.
    + * @param clock Optional clock that is used to check for rotation interval.
    + */
    +private[streaming] class WriteAheadLogManager(
    +    logDirectory: String,
    +    hadoopConf: Configuration,
    +    rollingIntervalSecs: Int = 60,
    +    maxFailures: Int = 3,
    +    callerName: String = "",
    +    clock: Clock = new SystemClock
    +  ) extends Logging {
    +
    +  private val pastLogs = new ArrayBuffer[LogInfo]
    +  private val callerNameTag =
    +    if (callerName != null && callerName.nonEmpty) s" for $callerName" else ""
    +  private val threadpoolName = s"WriteAheadLogManager $callerNameTag"
    +  implicit private val executionContext = ExecutionContext.fromExecutorService(
    +    Utils.newDaemonFixedThreadPool(1, threadpoolName))
    +  override protected val logName = s"WriteAheadLogManager $callerNameTag"
    +
    +  private var currentLogPath: String = null
    +  private var currentLogWriter: WriteAheadLogWriter = null
    +  private var currentLogWriterStartTime: Long = -1L
    +  private var currentLogWriterStopTime: Long = -1L
    +
    +  initializeOrRecover()
    +
    +  /** Write a byte buffer to the log file */
    +  def writeToLog(byteBuffer: ByteBuffer): FileSegment = synchronized {
    +    var fileSegment: FileSegment = null
    +    var failures = 0
    +    var lastException: Exception = null
    +    var succeeded = false
    +    while (!succeeded && failures < maxFailures) {
    +      try {
    +        fileSegment = getLogWriter(clock.currentTime).write(byteBuffer)
    +        succeeded = true
    +      } catch {
    +        case ex: Exception =>
    +          lastException = ex
    +          logWarning("Failed to write to write ahead log")
    +          resetWriter()
    +          failures += 1
    +      }
    +    }
    +    if (fileSegment == null) {
    +      logError(s"Failed to write to write ahead log after $failures failures")
    +      throw lastException
    +    }
    +    fileSegment
    +  }
    +
    +  /**
    +   * Read all the existing logs from the log directory.
    +   *
    +   * Note that this is typically called when the caller is initializing and wants
    +   * to recover past  state from the write ahead logs (that is, before making any writes).
    +   * If this is called after writes have been made using this manager, then it may not return
    +   * the latest the records. This does not deal with currently active log files, and
    +   * hence the implementation is kept simple.
    +   */
    +  def readFromLog(): Iterator[ByteBuffer] = synchronized {
    +    val logFilesToRead = pastLogs.map{ _.path} ++ Option(currentLogPath)
    +    logInfo("Reading from the logs: " + logFilesToRead.mkString("\n"))
    +    logFilesToRead.iterator.map { file =>
    +        logDebug(s"Creating log reader with $file")
    +        new WriteAheadLogReader(file, hadoopConf)
    +    } flatMap { x => x }
    +  }
    +
    +  /**
    +   * Delete the log files that are older than the threshold time.
    +   *
    +   * Its important to note that the threshold time is based on the time stamps used in the log
    +   * files, which is usually based on the local system time. So if there is coordination necessary
    +   * between the node calculating the threshTime (say, driver node), and the local system time
    +   * (say, worker node), the caller has to take account of possible time skew.
    +   */
    +  def cleanupOldLogs(threshTime: Long): Unit = {
    +    val oldLogFiles = synchronized { pastLogs.filter { _.endTime < threshTime } }
    +    logInfo(s"Attempting to clear ${oldLogFiles.size} old log files in $logDirectory " +
    +      s"older than $threshTime: ${oldLogFiles.map { _.path }.mkString("\n")}")
    +
    +    def deleteFiles() {
    +      oldLogFiles.foreach { logInfo =>
    +        try {
    +          val path = new Path(logInfo.path)
    +          val fs = hadoopConf.synchronized { path.getFileSystem(hadoopConf) }
    +          fs.delete(path, true)
    +          synchronized { pastLogs -= logInfo }
    +          logDebug(s"Cleared log file $logInfo")
    +        } catch {
    +          case ex: Exception =>
    +            logWarning(s"Error clearing write ahead log file $logInfo", ex)
    +        }
    +      }
    +      logInfo(s"Cleared log files in $logDirectory older than $threshTime")
    +    }
    +    if (!executionContext.isShutdown) {
    +      Future { deleteFiles() }
    +    }
    +  }
    +
    +  /** Stop the manager, close any open log writer */
    +  def stop(): Unit = synchronized {
    +    if (currentLogWriter != null) {
    +      currentLogWriter.close()
    +    }
    +    executionContext.shutdown()
    +    logInfo("Stopped write ahead log manager")
    +  }
    +
    +  /** Get the current log writer while taking care of rotation */
    +  private def getLogWriter(currentTime: Long): WriteAheadLogWriter = synchronized {
    +    if (currentLogWriter == null || currentTime > currentLogWriterStopTime) {
    +      resetWriter()
    +      if (currentLogPath != null) {
    +        pastLogs += LogInfo(currentLogWriterStartTime, currentLogWriterStopTime, currentLogPath)
    +      }
    +      currentLogWriterStartTime = currentTime
    +      currentLogWriterStopTime = currentTime + (rollingIntervalSecs * 1000)
    +      val newLogPath = new Path(logDirectory,
    +        timeToLogFile(currentLogWriterStartTime, currentLogWriterStopTime))
    +      currentLogPath = newLogPath.toString
    +      currentLogWriter = new WriteAheadLogWriter(currentLogPath, hadoopConf)
    +    }
    +    currentLogWriter
    +  }
    +
    +  /** Initialize the log directory or recover existing logs inside the directory */
    +  private def initializeOrRecover(): Unit = synchronized {
    +    val logDirectoryPath = new Path(logDirectory)
    +    val fileSystem = logDirectoryPath.getFileSystem(hadoopConf)
    +
    +    if (fileSystem.exists(logDirectoryPath) && fileSystem.getFileStatus(logDirectoryPath).isDir) {
    +      val logFileInfo = logFilesTologInfo(fileSystem.listStatus(logDirectoryPath).map { _.getPath })
    +      pastLogs.clear()
    +      pastLogs ++= logFileInfo
    +      logInfo(s"Recovered ${logFileInfo.size} write ahead log files from $logDirectory")
    +      logDebug(s"Recovered files are:\n${logFileInfo.map(_.path).mkString("\n")}")
    +    } else {
    +      fileSystem.mkdirs(logDirectoryPath,
    +        FsPermission.createImmutable(Integer.parseInt("770", 8).toShort))
    --- End diff --
    
    Can these permissions be narrower?  Do we need group readability, or would user-readability suffice?  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2882#discussion_r19318649
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala ---
    @@ -0,0 +1,224 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.streaming.util
    +
    +import java.nio.ByteBuffer
    +
    +import scala.collection.mutable.ArrayBuffer
    +import scala.concurrent.{ExecutionContext, Future}
    +
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.fs.Path
    +import org.apache.hadoop.fs.permission.FsPermission
    +import org.apache.spark.Logging
    +import org.apache.spark.util.Utils
    +import WriteAheadLogManager._
    +
    +/**
    + * This class manages write ahead log files.
    + * - Writes records (bytebuffers) to periodically rotating log files.
    + * - Recovers the log files and the reads the recovered records upon failures.
    + * - Cleans up old log files.
    + *
    + * Uses [[org.apache.spark.streaming.util.WriteAheadLogWriter]] to write
    + * and [[org.apache.spark.streaming.util.WriteAheadLogReader]] to read.
    + *
    + *@param logDirectory Directory when rotating log files will be created.
    + * @param hadoopConf Hadoop configuration for reading/writing log files.
    + * @param rollingIntervalSecs The interval in seconds with which logs will be rolled over.
    + *                            Default is one minute.
    + * @param maxFailures Max number of failures that is tolerated for every attempt to write to log.
    + *                    Default is three.
    + * @param callerName Optional name of the class who is using this manager.
    + * @param clock Optional clock that is used to check for rotation interval.
    + */
    +private[streaming] class WriteAheadLogManager(
    +    logDirectory: String,
    +    hadoopConf: Configuration,
    +    rollingIntervalSecs: Int = 60,
    +    maxFailures: Int = 3,
    +    callerName: String = "",
    +    clock: Clock = new SystemClock
    +  ) extends Logging {
    +
    +  private val pastLogs = new ArrayBuffer[LogInfo]
    +  private val callerNameTag =
    +    if (callerName != null && callerName.nonEmpty) s" for $callerName" else ""
    --- End diff --
    
    Removed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the pull request:

    https://github.com/apache/spark/pull/2882#issuecomment-60333829
  
    Alright, thanks! I will merge when this last set of changes gets through jenkins.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2882#issuecomment-59993367
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22006/consoleFull) for   PR 2882 at commit [`4ab602a`](https://github.com/apache/spark/commit/4ab602a0074a2144d33367229358c19d079798d8).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

Posted by jerryshao <gi...@git.apache.org>.
Github user jerryshao commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2882#discussion_r19191068
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/util/FileSegment.scala ---
    @@ -0,0 +1,19 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.streaming.util
    +
    +private[streaming] case class FileSegment (path: String, offset: Long, length: Int)
    --- End diff --
    
    Yeah, that would be fine :).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2882#discussion_r19195483
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala ---
    @@ -0,0 +1,72 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.streaming.util
    +
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.fs.{FSDataInputStream, FSDataOutputStream, Path}
    +
    +private[streaming] object HdfsUtils {
    +
    +  def getOutputStream(path: String, conf: Configuration): FSDataOutputStream = {
    +    // HDFS is not thread-safe when getFileSystem is called, so synchronize on that
    +
    +    val dfsPath = new Path(path)
    +    val dfs =
    --- End diff --
    
    Minor style nit but the extra newline here looks strange.  I'd put the `this.synchronized` on this line, too.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2882#discussion_r19318631
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala ---
    @@ -0,0 +1,71 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.streaming.util
    +
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.fs.{LocalFileSystem, FSDataInputStream, FSDataOutputStream, Path}
    +
    +private[streaming] object HdfsUtils {
    +
    +  def getOutputStream(path: String, conf: Configuration): FSDataOutputStream = {
    +    // HDFS is not thread-safe when getFileSystem is called, so synchronize on that
    +    val dfsPath = new Path(path)
    +    val dfs = getFileSystemForPath(dfsPath, conf)
    +    // If the file exists and we have append support, append instead of creating a new file
    +    val stream: FSDataOutputStream = {
    +      if (dfs.isFile(dfsPath)) {
    +        if (conf.getBoolean("hdfs.append.support", false)) {
    +          dfs.append(dfsPath)
    +        } else {
    +          throw new IllegalStateException("File exists and there is no append support!")
    +        }
    +      } else {
    +        dfs.create(dfsPath)
    +      }
    +    }
    +    stream
    +  }
    +
    +  def getInputStream(path: String, conf: Configuration): FSDataInputStream = {
    +    val dfsPath = new Path(path)
    +    val dfs = getFileSystemForPath(dfsPath, conf)
    +    val instream = dfs.open(dfsPath)
    +    instream
    +  }
    +
    +  def checkState(state: Boolean, errorMsg: => String) {
    +    if (!state) {
    +      throw new IllegalStateException(errorMsg)
    +    }
    +  }
    +
    +  def getBlockLocations(path: String, conf: Configuration): Option[Array[String]] = {
    +    val dfsPath = new Path(path)
    +    val dfs = getFileSystemForPath(dfsPath, conf)
    +    val fileStatus = dfs.getFileStatus(dfsPath)
    +    val blockLocs = Option(dfs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen))
    +    blockLocs.map(_.flatMap(_.getHosts))
    +  }
    +
    +  def getFileSystemForPath(path: Path, conf: Configuration) = synchronized {
    +    val fs = path.getFileSystem(conf)
    --- End diff --
    
    @harishreedharan Can you elaborate on why `getFileSystem` is not thread-safe? References?
    And if it is indeed not thread-safe, then doing synchronization here does not solve the problem because other threads in spark could be access getFileSystem at the same time.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the pull request:

    https://github.com/apache/spark/pull/2882#issuecomment-60213891
  
    @JoshRosen
    @harishreedharan  addressed all your comments, and also simplified the writer code
    I did some further cleanups, and also added two new unit tests that test the writer and manager with corrupted writes. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2882#issuecomment-60167866
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22046/consoleFull) for   PR 2882 at commit [`ef8db09`](https://github.com/apache/spark/commit/ef8db09075ab6d7e29a9e988bb83af16f3c553ca).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/2882#issuecomment-60004014
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/22006/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

Posted by harishreedharan <gi...@git.apache.org>.
Github user harishreedharan commented on the pull request:

    https://github.com/apache/spark/pull/2882#issuecomment-60185449
  
    I have the env vars set to use hadoop-2.4


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/2882#issuecomment-60208456
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/22067/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2882#discussion_r19190985
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/util/FileSegment.scala ---
    @@ -0,0 +1,19 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.streaming.util
    +
    +private[streaming] case class FileSegment (path: String, offset: Long, length: Int)
    --- End diff --
    
    How about `WriteAheadLogFileSegment`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

Posted by harishreedharan <gi...@git.apache.org>.
Github user harishreedharan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2882#discussion_r19195251
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogReader.scala ---
    @@ -0,0 +1,82 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.streaming.util
    +
    +import java.io.{Closeable, EOFException}
    +import java.nio.ByteBuffer
    +
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.spark.Logging
    +
    +/**
    + * A reader for reading write ahead log files written using
    + * [[org.apache.spark.streaming.util.WriteAheadLogWriter]]. This reads
    + * the records (bytebuffers) in the log file sequentially and return them as an
    + * iterator of bytebuffers.
    + */
    +private[streaming] class WriteAheadLogReader(path: String, conf: Configuration)
    +  extends Iterator[ByteBuffer] with Closeable with Logging {
    +
    +  private val instream = HdfsUtils.getInputStream(path, conf)
    +  private var closed = false
    +  private var nextItem: Option[ByteBuffer] = None
    +
    +  override def hasNext: Boolean = synchronized {
    +    if (closed) {
    +      return false
    +    }
    +
    +    if (nextItem.isDefined) { // handle the case where hasNext is called without calling next
    +      true
    +    } else {
    +      try {
    +        val length = instream.readInt()
    +        val buffer = new Array[Byte](length)
    +        instream.readFully(buffer)
    +        nextItem = Some(ByteBuffer.wrap(buffer))
    +        logTrace("Read next item " + nextItem.get)
    +        true
    +      } catch {
    +        case e: EOFException =>
    +          logDebug("Error reading next item, EOF reached", e)
    +          close()
    +          false
    +        case e: Exception =>
    +          logDebug("Error reading next item, EOF reached", e)
    --- End diff --
    
    Will fix in the next iter


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2882#issuecomment-60208450
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22067/consoleFull) for   PR 2882 at commit [`3881706`](https://github.com/apache/spark/commit/38817069e66cc8c161cc2a8033873a3342cff4e2).
     * This patch **passes all tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `  case class LogInfo(startTime: Long, endTime: Long, path: String)`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2882#discussion_r19318155
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala ---
    @@ -0,0 +1,224 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.streaming.util
    +
    +import java.nio.ByteBuffer
    +
    +import scala.collection.mutable.ArrayBuffer
    +import scala.concurrent.{ExecutionContext, Future}
    +
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.fs.Path
    +import org.apache.hadoop.fs.permission.FsPermission
    +import org.apache.spark.Logging
    +import org.apache.spark.util.Utils
    +import WriteAheadLogManager._
    +
    +/**
    + * This class manages write ahead log files.
    + * - Writes records (bytebuffers) to periodically rotating log files.
    + * - Recovers the log files and the reads the recovered records upon failures.
    + * - Cleans up old log files.
    + *
    + * Uses [[org.apache.spark.streaming.util.WriteAheadLogWriter]] to write
    + * and [[org.apache.spark.streaming.util.WriteAheadLogReader]] to read.
    + *
    + *@param logDirectory Directory when rotating log files will be created.
    + * @param hadoopConf Hadoop configuration for reading/writing log files.
    + * @param rollingIntervalSecs The interval in seconds with which logs will be rolled over.
    + *                            Default is one minute.
    + * @param maxFailures Max number of failures that is tolerated for every attempt to write to log.
    + *                    Default is three.
    + * @param callerName Optional name of the class who is using this manager.
    + * @param clock Optional clock that is used to check for rotation interval.
    + */
    +private[streaming] class WriteAheadLogManager(
    +    logDirectory: String,
    +    hadoopConf: Configuration,
    +    rollingIntervalSecs: Int = 60,
    +    maxFailures: Int = 3,
    +    callerName: String = "",
    +    clock: Clock = new SystemClock
    +  ) extends Logging {
    +
    +  private val pastLogs = new ArrayBuffer[LogInfo]
    +  private val callerNameTag =
    +    if (callerName != null && callerName.nonEmpty) s" for $callerName" else ""
    --- End diff --
    
    How could callerName be null?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2882#discussion_r19184821
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogWriter.scala ---
    @@ -0,0 +1,101 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.streaming.util
    +
    +import java.io._
    +import java.net.URI
    +import java.nio.ByteBuffer
    +
    +import scala.util.Try
    +
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.fs.{FSDataOutputStream, FileSystem}
    +
    +/**
    + * A writer for writing byte-buffers to a write ahead log file.
    + */
    +private[streaming] class WriteAheadLogWriter(path: String, hadoopConf: Configuration)
    +  extends Closeable {
    +  private val underlyingStream: Either[DataOutputStream, FSDataOutputStream] = {
    --- End diff --
    
    WIP: this file is going to be updated by @harishreedharan to get rid of the local file customizations.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

Posted by harishreedharan <gi...@git.apache.org>.
Github user harishreedharan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2882#discussion_r19241965
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala ---
    @@ -0,0 +1,223 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.streaming.util
    +
    +import java.nio.ByteBuffer
    +
    +import scala.collection.mutable.ArrayBuffer
    +import scala.concurrent.{ExecutionContext, Future}
    +
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.fs.Path
    +import org.apache.hadoop.fs.permission.FsPermission
    +import org.apache.spark.Logging
    +import org.apache.spark.util.Utils
    +import WriteAheadLogManager._
    +
    +/**
    + * This class manages write ahead log files.
    + * - Writes records (bytebuffers) to periodically rotating log files.
    + * - Recovers the log files and the reads the recovered records upon failures.
    + * - Cleans up old log files.
    + *
    + * Uses [[org.apache.spark.streaming.util.WriteAheadLogWriter]] to write
    + * and [[org.apache.spark.streaming.util.WriteAheadLogReader]] to read.
    + *
    + *@param logDirectory Directory when rotating log files will be created.
    + * @param hadoopConf Hadoop configuration for reading/writing log files.
    + * @param rollingIntervalSecs The interval in seconds with which logs will be rolled over.
    + *                            Default is one minute.
    + * @param maxFailures Max number of failures that is tolerated for every attempt to write to log.
    + *                    Default is three.
    + * @param callerName Optional name of the class who is using this manager.
    + * @param clock Optional clock that is used to check for rotation interval.
    + */
    +private[streaming] class WriteAheadLogManager(
    +    logDirectory: String,
    +    hadoopConf: Configuration,
    +    rollingIntervalSecs: Int = 60,
    +    maxFailures: Int = 3,
    +    callerName: String = "",
    +    clock: Clock = new SystemClock
    +  ) extends Logging {
    +
    +  private val pastLogs = new ArrayBuffer[LogInfo]
    +  private val callerNameTag =
    +    if (callerName != null && callerName.nonEmpty) s" for $callerName" else ""
    +  private val threadpoolName = s"WriteAheadLogManager $callerNameTag"
    +  implicit private val executionContext = ExecutionContext.fromExecutorService(
    +    Utils.newDaemonFixedThreadPool(1, threadpoolName))
    +  override protected val logName = s"WriteAheadLogManager $callerNameTag"
    +
    +  private var currentLogPath: String = null
    +  private var currentLogWriter: WriteAheadLogWriter = null
    +  private var currentLogWriterStartTime: Long = -1L
    +  private var currentLogWriterStopTime: Long = -1L
    +
    +  initializeOrRecover()
    +
    +  /** Write a byte buffer to the log file */
    +  def writeToLog(byteBuffer: ByteBuffer): FileSegment = synchronized {
    +    var fileSegment: FileSegment = null
    +    var failures = 0
    +    var lastException: Exception = null
    +    var succeeded = false
    +    while (!succeeded && failures < maxFailures) {
    +      try {
    +        fileSegment = getLogWriter(clock.currentTime).write(byteBuffer)
    +        succeeded = true
    +      } catch {
    +        case ex: Exception =>
    +          lastException = ex
    +          logWarning("Failed to write to write ahead log")
    +          resetWriter()
    +          failures += 1
    +      }
    +    }
    +    if (fileSegment == null) {
    +      logError(s"Failed to write to write ahead log after $failures failures")
    +      throw lastException
    +    }
    +    fileSegment
    +  }
    +
    +  /**
    +   * Read all the existing logs from the log directory.
    +   *
    +   * Note that this is typically called when the caller is initializing and wants
    +   * to recover past  state from the write ahead logs (that is, before making any writes).
    +   * If this is called after writes have been made using this manager, then it may not return
    +   * the latest the records. This does not deal with currently active log files, and
    +   * hence the implementation is kept simple.
    +   */
    +  def readFromLog(): Iterator[ByteBuffer] = synchronized {
    +    val logFilesToRead = pastLogs.map{ _.path} ++ Option(currentLogPath)
    +    logInfo("Reading from the logs: " + logFilesToRead.mkString("\n"))
    +    logFilesToRead.iterator.map { file =>
    +        logDebug(s"Creating log reader with $file")
    +        new WriteAheadLogReader(file, hadoopConf)
    +    } flatMap { x => x }
    +  }
    +
    +  /**
    +   * Delete the log files that are older than the threshold time.
    +   *
    +   * Its important to note that the threshold time is based on the time stamps used in the log
    +   * files, which is usually based on the local system time. So if there is coordination necessary
    +   * between the node calculating the threshTime (say, driver node), and the local system time
    +   * (say, worker node), the caller has to take account of possible time skew.
    +   */
    +  def cleanupOldLogs(threshTime: Long): Unit = {
    +    val oldLogFiles = synchronized { pastLogs.filter { _.endTime < threshTime } }
    +    logInfo(s"Attempting to clear ${oldLogFiles.size} old log files in $logDirectory " +
    +      s"older than $threshTime: ${oldLogFiles.map { _.path }.mkString("\n")}")
    +
    +    def deleteFiles() {
    +      oldLogFiles.foreach { logInfo =>
    +        try {
    +          val path = new Path(logInfo.path)
    +          val fs = hadoopConf.synchronized { path.getFileSystem(hadoopConf) }
    +          fs.delete(path, true)
    +          synchronized { pastLogs -= logInfo }
    +          logDebug(s"Cleared log file $logInfo")
    +        } catch {
    +          case ex: Exception =>
    +            logWarning(s"Error clearing write ahead log file $logInfo", ex)
    +        }
    +      }
    +      logInfo(s"Cleared log files in $logDirectory older than $threshTime")
    +    }
    +    if (!executionContext.isShutdown) {
    +      Future { deleteFiles() }
    +    }
    +  }
    +
    +  /** Stop the manager, close any open log writer */
    +  def stop(): Unit = synchronized {
    +    if (currentLogWriter != null) {
    +      currentLogWriter.close()
    +    }
    +    executionContext.shutdown()
    +    logInfo("Stopped write ahead log manager")
    +  }
    +
    +  /** Get the current log writer while taking care of rotation */
    +  private def getLogWriter(currentTime: Long): WriteAheadLogWriter = synchronized {
    +    if (currentLogWriter == null || currentTime > currentLogWriterStopTime) {
    +      resetWriter()
    +      if (currentLogPath != null) {
    +        pastLogs += LogInfo(currentLogWriterStartTime, currentLogWriterStopTime, currentLogPath)
    +      }
    +      currentLogWriterStartTime = currentTime
    +      currentLogWriterStopTime = currentTime + (rollingIntervalSecs * 1000)
    +      val newLogPath = new Path(logDirectory,
    +        timeToLogFile(currentLogWriterStartTime, currentLogWriterStopTime))
    +      currentLogPath = newLogPath.toString
    +      currentLogWriter = new WriteAheadLogWriter(currentLogPath, hadoopConf)
    +    }
    +    currentLogWriter
    +  }
    +
    +  /** Initialize the log directory or recover existing logs inside the directory */
    +  private def initializeOrRecover(): Unit = synchronized {
    +    val logDirectoryPath = new Path(logDirectory)
    +    val fileSystem = logDirectoryPath.getFileSystem(hadoopConf)
    +
    +    if (fileSystem.exists(logDirectoryPath) && fileSystem.getFileStatus(logDirectoryPath).isDir) {
    +      val logFileInfo = logFilesTologInfo(fileSystem.listStatus(logDirectoryPath).map { _.getPath })
    +      pastLogs.clear()
    +      pastLogs ++= logFileInfo
    +      logInfo(s"Recovered ${logFileInfo.size} write ahead log files from $logDirectory")
    +      logDebug(s"Recovered files are:\n${logFileInfo.map(_.path).mkString("\n")}")
    +    } else {
    +      fileSystem.mkdirs(logDirectoryPath,
    +        FsPermission.createImmutable(Integer.parseInt("770", 8).toShort))
    +      logInfo(s"Created ${logDirectory} for write ahead log files")
    --- End diff --
    
    Removed as part of change above.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4026][Streaming] synchronously write re...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/2882#issuecomment-59990371
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/22002/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/2882#issuecomment-60180203
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/22049/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

Posted by harishreedharan <gi...@git.apache.org>.
Github user harishreedharan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2882#discussion_r19195316
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogWriter.scala ---
    @@ -0,0 +1,101 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.streaming.util
    +
    +import java.io._
    +import java.net.URI
    +import java.nio.ByteBuffer
    +
    +import scala.util.Try
    +
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.fs.{FSDataOutputStream, FileSystem}
    +
    +/**
    + * A writer for writing byte-buffers to a write ahead log file.
    + */
    +private[streaming] class WriteAheadLogWriter(path: String, hadoopConf: Configuration)
    +  extends Closeable {
    +  private val underlyingStream: Either[DataOutputStream, FSDataOutputStream] = {
    +    val uri = new URI(path)
    +    val defaultFs = FileSystem.getDefaultUri(hadoopConf).getScheme
    +    val isDefaultLocal = defaultFs == null || defaultFs == "file"
    +
    +    if ((isDefaultLocal && uri.getScheme == null) || uri.getScheme == "file") {
    +      assert(!new File(uri.getPath).exists)
    +      Left(new DataOutputStream(new BufferedOutputStream(new FileOutputStream(uri.getPath))))
    +    } else {
    +      Right(HdfsUtils.getOutputStream(path, hadoopConf))
    +    }
    +  }
    +
    +  private lazy val hadoopFlushMethod = {
    +    val cls = classOf[FSDataOutputStream]
    +    Try(cls.getMethod("hflush")).orElse(Try(cls.getMethod("sync"))).toOption
    --- End diff --
    
    Actually we do, since Spark supports Hadoop 1 to Hadoop 2.5.0 right now. In Hadoop 1.x, the "sync" method did the same thing hflush does in 2.5.0 - so in short we do.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

Posted by harishreedharan <gi...@git.apache.org>.
Github user harishreedharan commented on the pull request:

    https://github.com/apache/spark/pull/2882#issuecomment-60425750
  
    Let's merge this for now. I will try and find out more about the getFileSystem thread-safety without doAs (which is what we support anyway)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2882#discussion_r19318648
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala ---
    @@ -0,0 +1,224 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.streaming.util
    +
    +import java.nio.ByteBuffer
    +
    +import scala.collection.mutable.ArrayBuffer
    +import scala.concurrent.{ExecutionContext, Future}
    +
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.fs.Path
    +import org.apache.hadoop.fs.permission.FsPermission
    +import org.apache.spark.Logging
    +import org.apache.spark.util.Utils
    +import WriteAheadLogManager._
    +
    +/**
    + * This class manages write ahead log files.
    + * - Writes records (bytebuffers) to periodically rotating log files.
    + * - Recovers the log files and the reads the recovered records upon failures.
    + * - Cleans up old log files.
    + *
    + * Uses [[org.apache.spark.streaming.util.WriteAheadLogWriter]] to write
    + * and [[org.apache.spark.streaming.util.WriteAheadLogReader]] to read.
    + *
    + *@param logDirectory Directory when rotating log files will be created.
    --- End diff --
    
    Done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2882#discussion_r19193795
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala ---
    @@ -0,0 +1,223 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.streaming.util
    +
    +import java.nio.ByteBuffer
    +
    +import scala.collection.mutable.ArrayBuffer
    +import scala.concurrent.{ExecutionContext, Future}
    +
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.fs.Path
    +import org.apache.hadoop.fs.permission.FsPermission
    +import org.apache.spark.Logging
    +import org.apache.spark.util.Utils
    +import WriteAheadLogManager._
    +
    +/**
    + * This class manages write ahead log files.
    + * - Writes records (bytebuffers) to periodically rotating log files.
    + * - Recovers the log files and the reads the recovered records upon failures.
    + * - Cleans up old log files.
    + *
    + * Uses [[org.apache.spark.streaming.util.WriteAheadLogWriter]] to write
    + * and [[org.apache.spark.streaming.util.WriteAheadLogReader]] to read.
    + *
    + *@param logDirectory Directory when rotating log files will be created.
    + * @param hadoopConf Hadoop configuration for reading/writing log files.
    + * @param rollingIntervalSecs The interval in seconds with which logs will be rolled over.
    + *                            Default is one minute.
    + * @param maxFailures Max number of failures that is tolerated for every attempt to write to log.
    + *                    Default is three.
    + * @param callerName Optional name of the class who is using this manager.
    + * @param clock Optional clock that is used to check for rotation interval.
    + */
    +private[streaming] class WriteAheadLogManager(
    +    logDirectory: String,
    +    hadoopConf: Configuration,
    +    rollingIntervalSecs: Int = 60,
    +    maxFailures: Int = 3,
    +    callerName: String = "",
    +    clock: Clock = new SystemClock
    +  ) extends Logging {
    +
    +  private val pastLogs = new ArrayBuffer[LogInfo]
    +  private val callerNameTag =
    +    if (callerName != null && callerName.nonEmpty) s" for $callerName" else ""
    +  private val threadpoolName = s"WriteAheadLogManager $callerNameTag"
    +  implicit private val executionContext = ExecutionContext.fromExecutorService(
    +    Utils.newDaemonFixedThreadPool(1, threadpoolName))
    +  override protected val logName = s"WriteAheadLogManager $callerNameTag"
    +
    +  private var currentLogPath: String = null
    +  private var currentLogWriter: WriteAheadLogWriter = null
    +  private var currentLogWriterStartTime: Long = -1L
    +  private var currentLogWriterStopTime: Long = -1L
    +
    +  initializeOrRecover()
    +
    +  /** Write a byte buffer to the log file */
    +  def writeToLog(byteBuffer: ByteBuffer): FileSegment = synchronized {
    +    var fileSegment: FileSegment = null
    +    var failures = 0
    +    var lastException: Exception = null
    +    var succeeded = false
    +    while (!succeeded && failures < maxFailures) {
    +      try {
    +        fileSegment = getLogWriter(clock.currentTime).write(byteBuffer)
    +        succeeded = true
    +      } catch {
    +        case ex: Exception =>
    +          lastException = ex
    +          logWarning("Failed to write to write ahead log")
    +          resetWriter()
    +          failures += 1
    +      }
    +    }
    +    if (fileSegment == null) {
    +      logError(s"Failed to write to write ahead log after $failures failures")
    +      throw lastException
    +    }
    +    fileSegment
    +  }
    +
    +  /**
    +   * Read all the existing logs from the log directory.
    +   *
    +   * Note that this is typically called when the caller is initializing and wants
    +   * to recover past  state from the write ahead logs (that is, before making any writes).
    +   * If this is called after writes have been made using this manager, then it may not return
    +   * the latest the records. This does not deal with currently active log files, and
    +   * hence the implementation is kept simple.
    +   */
    +  def readFromLog(): Iterator[ByteBuffer] = synchronized {
    +    val logFilesToRead = pastLogs.map{ _.path} ++ Option(currentLogPath)
    +    logInfo("Reading from the logs: " + logFilesToRead.mkString("\n"))
    +    logFilesToRead.iterator.map { file =>
    +        logDebug(s"Creating log reader with $file")
    +        new WriteAheadLogReader(file, hadoopConf)
    +    } flatMap { x => x }
    +  }
    +
    +  /**
    +   * Delete the log files that are older than the threshold time.
    +   *
    +   * Its important to note that the threshold time is based on the time stamps used in the log
    +   * files, which is usually based on the local system time. So if there is coordination necessary
    +   * between the node calculating the threshTime (say, driver node), and the local system time
    +   * (say, worker node), the caller has to take account of possible time skew.
    +   */
    +  def cleanupOldLogs(threshTime: Long): Unit = {
    +    val oldLogFiles = synchronized { pastLogs.filter { _.endTime < threshTime } }
    +    logInfo(s"Attempting to clear ${oldLogFiles.size} old log files in $logDirectory " +
    +      s"older than $threshTime: ${oldLogFiles.map { _.path }.mkString("\n")}")
    +
    +    def deleteFiles() {
    +      oldLogFiles.foreach { logInfo =>
    +        try {
    +          val path = new Path(logInfo.path)
    +          val fs = hadoopConf.synchronized { path.getFileSystem(hadoopConf) }
    +          fs.delete(path, true)
    +          synchronized { pastLogs -= logInfo }
    +          logDebug(s"Cleared log file $logInfo")
    +        } catch {
    +          case ex: Exception =>
    +            logWarning(s"Error clearing write ahead log file $logInfo", ex)
    +        }
    +      }
    +      logInfo(s"Cleared log files in $logDirectory older than $threshTime")
    +    }
    +    if (!executionContext.isShutdown) {
    +      Future { deleteFiles() }
    +    }
    +  }
    +
    +  /** Stop the manager, close any open log writer */
    +  def stop(): Unit = synchronized {
    +    if (currentLogWriter != null) {
    +      currentLogWriter.close()
    +    }
    +    executionContext.shutdown()
    +    logInfo("Stopped write ahead log manager")
    +  }
    +
    +  /** Get the current log writer while taking care of rotation */
    +  private def getLogWriter(currentTime: Long): WriteAheadLogWriter = synchronized {
    +    if (currentLogWriter == null || currentTime > currentLogWriterStopTime) {
    +      resetWriter()
    +      if (currentLogPath != null) {
    +        pastLogs += LogInfo(currentLogWriterStartTime, currentLogWriterStopTime, currentLogPath)
    +      }
    +      currentLogWriterStartTime = currentTime
    +      currentLogWriterStopTime = currentTime + (rollingIntervalSecs * 1000)
    +      val newLogPath = new Path(logDirectory,
    +        timeToLogFile(currentLogWriterStartTime, currentLogWriterStopTime))
    +      currentLogPath = newLogPath.toString
    +      currentLogWriter = new WriteAheadLogWriter(currentLogPath, hadoopConf)
    +    }
    +    currentLogWriter
    +  }
    +
    +  /** Initialize the log directory or recover existing logs inside the directory */
    +  private def initializeOrRecover(): Unit = synchronized {
    +    val logDirectoryPath = new Path(logDirectory)
    +    val fileSystem = logDirectoryPath.getFileSystem(hadoopConf)
    +
    +    if (fileSystem.exists(logDirectoryPath) && fileSystem.getFileStatus(logDirectoryPath).isDir) {
    +      val logFileInfo = logFilesTologInfo(fileSystem.listStatus(logDirectoryPath).map { _.getPath })
    +      pastLogs.clear()
    +      pastLogs ++= logFileInfo
    +      logInfo(s"Recovered ${logFileInfo.size} write ahead log files from $logDirectory")
    +      logDebug(s"Recovered files are:\n${logFileInfo.map(_.path).mkString("\n")}")
    +    } else {
    +      fileSystem.mkdirs(logDirectoryPath,
    +        FsPermission.createImmutable(Integer.parseInt("770", 8).toShort))
    +      logInfo(s"Created ${logDirectory} for write ahead log files")
    --- End diff --
    
    Don't need the curly braces for this string interpolation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/2882#issuecomment-60219275
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/22068/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

Posted by harishreedharan <gi...@git.apache.org>.
Github user harishreedharan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2882#discussion_r19195386
  
    --- Diff: streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala ---
    @@ -0,0 +1,298 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.streaming.util
    +
    +import java.io.{DataInputStream, File, FileInputStream, RandomAccessFile}
    +import java.nio.ByteBuffer
    +
    +import scala.collection.mutable.ArrayBuffer
    +import scala.concurrent.duration._
    +import scala.language.implicitConversions
    +import scala.language.postfixOps
    +import scala.util.Random
    +
    +import WriteAheadLogSuite._
    +import com.google.common.io.Files
    +import org.apache.commons.io.FileUtils
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.spark.util.Utils
    +import org.scalatest.{BeforeAndAfter, FunSuite}
    +import org.scalatest.concurrent.Eventually._
    +
    +/**
    + * This testsuite tests all classes related to write ahead logs.
    + */
    +class WriteAheadLogSuite extends FunSuite with BeforeAndAfter {
    +
    +  val hadoopConf = new Configuration()
    +  var tempDirectory: File = null
    +
    +  before {
    +    tempDirectory = Files.createTempDir()
    +  }
    +
    +  after {
    +    if (tempDirectory != null && tempDirectory.exists()) {
    +      FileUtils.deleteDirectory(tempDirectory)
    +      tempDirectory = null
    +    }
    +  }
    +
    +  test("WriteAheadLogWriter - writing data") {
    +    val file = new File(tempDirectory, Random.nextString(10))
    --- End diff --
    
    I am refactoring much of the tests right now. So we won't use local files at all - so we should be ok.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the pull request:

    https://github.com/apache/spark/pull/2882#issuecomment-60184841
  
    Which profile are you running with ?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2882#discussion_r19318091
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala ---
    @@ -0,0 +1,224 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.streaming.util
    +
    +import java.nio.ByteBuffer
    +
    +import scala.collection.mutable.ArrayBuffer
    +import scala.concurrent.{ExecutionContext, Future}
    +
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.fs.Path
    +import org.apache.hadoop.fs.permission.FsPermission
    +import org.apache.spark.Logging
    +import org.apache.spark.util.Utils
    +import WriteAheadLogManager._
    +
    +/**
    + * This class manages write ahead log files.
    + * - Writes records (bytebuffers) to periodically rotating log files.
    + * - Recovers the log files and the reads the recovered records upon failures.
    + * - Cleans up old log files.
    + *
    + * Uses [[org.apache.spark.streaming.util.WriteAheadLogWriter]] to write
    + * and [[org.apache.spark.streaming.util.WriteAheadLogReader]] to read.
    + *
    + *@param logDirectory Directory when rotating log files will be created.
    --- End diff --
    
    Need an extra space before `@param`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

Posted by harishreedharan <gi...@git.apache.org>.
Github user harishreedharan commented on the pull request:

    https://github.com/apache/spark/pull/2882#issuecomment-60286594
  
    Yay, finally! 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2882#discussion_r19195389
  
    --- Diff: streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala ---
    @@ -0,0 +1,298 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.streaming.util
    +
    +import java.io.{DataInputStream, File, FileInputStream, RandomAccessFile}
    +import java.nio.ByteBuffer
    +
    +import scala.collection.mutable.ArrayBuffer
    +import scala.concurrent.duration._
    +import scala.language.implicitConversions
    +import scala.language.postfixOps
    +import scala.util.Random
    +
    +import WriteAheadLogSuite._
    +import com.google.common.io.Files
    +import org.apache.commons.io.FileUtils
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.spark.util.Utils
    +import org.scalatest.{BeforeAndAfter, FunSuite}
    +import org.scalatest.concurrent.Eventually._
    +
    +/**
    + * This testsuite tests all classes related to write ahead logs.
    + */
    +class WriteAheadLogSuite extends FunSuite with BeforeAndAfter {
    +
    +  val hadoopConf = new Configuration()
    +  var tempDirectory: File = null
    +
    +  before {
    +    tempDirectory = Files.createTempDir()
    +  }
    +
    +  after {
    +    if (tempDirectory != null && tempDirectory.exists()) {
    +      FileUtils.deleteDirectory(tempDirectory)
    +      tempDirectory = null
    +    }
    +  }
    +
    +  test("WriteAheadLogWriter - writing data") {
    +    val file = new File(tempDirectory, Random.nextString(10))
    +    val dataToWrite = generateRandomData()
    +    val writer = new WriteAheadLogWriter("file:///" + file, hadoopConf)
    +    val segments = dataToWrite.map(data => writer.write(data))
    +    writer.close()
    +    val writtenData = readDataManually(file, segments)
    +    assert(writtenData.toArray === dataToWrite.toArray)
    +  }
    +
    +  test("WriteAheadLogWriter - syncing of data by writing and reading immediately") {
    +    val file = new File(tempDirectory, Random.nextString(10))
    +    val dataToWrite = generateRandomData()
    +    val writer = new WriteAheadLogWriter("file:///" + file, hadoopConf)
    +    dataToWrite.foreach { data =>
    +      val segment = writer.write(data)
    +      assert(readDataManually(file, Seq(segment)).head === data)
    +    }
    +    writer.close()
    +  }
    +
    +  test("WriteAheadLogReader - sequentially reading data") {
    +    // Write data manually for testing the sequential reader
    +    val file = File.createTempFile("TestSequentialReads", "", tempDirectory)
    +    val writtenData = generateRandomData()
    +    writeDataManually(writtenData, file)
    +    val reader = new WriteAheadLogReader("file:///" + file.toString, hadoopConf)
    +    val readData = reader.toSeq.map(byteBufferToString)
    +    assert(readData.toList === writtenData.toList)
    +    assert(reader.hasNext === false)
    +    intercept[Exception] {
    +      reader.next()
    +    }
    +    reader.close()
    +  }
    +
    +  test("WriteAheadLogReader - sequentially reading data written with writer") {
    +    // Write data manually for testing the sequential reader
    +    val file = new File(tempDirectory, "TestWriter")
    +    val dataToWrite = generateRandomData()
    +    writeDataUsingWriter(file, dataToWrite)
    +    val iter = dataToWrite.iterator
    +    val reader = new WriteAheadLogReader("file:///" + file.toString, hadoopConf)
    +    reader.foreach { byteBuffer =>
    +      assert(byteBufferToString(byteBuffer) === iter.next())
    +    }
    +    reader.close()
    +  }
    +
    +  test("WriteAheadLogRandomReader - reading data using random reader") {
    +    // Write data manually for testing the random reader
    +    val file = File.createTempFile("TestRandomReads", "", tempDirectory)
    +    val writtenData = generateRandomData()
    +    val segments = writeDataManually(writtenData, file)
    +
    +    // Get a random order of these segments and read them back
    +    val writtenDataAndSegments = writtenData.zip(segments).toSeq.permutations.take(10).flatten
    +    val reader = new WriteAheadLogRandomReader("file:///" + file.toString, hadoopConf)
    +    writtenDataAndSegments.foreach { case (data, segment) =>
    +      assert(data === byteBufferToString(reader.read(segment)))
    +    }
    +    reader.close()
    +  }
    +
    +  test("WriteAheadLogRandomReader - reading data using random reader written with writer") {
    +    // Write data using writer for testing the random reader
    +    val file = new File(tempDirectory, "TestRandomReads")
    +    val data = generateRandomData()
    +    val segments = writeDataUsingWriter(file, data)
    +
    +    // Read a random sequence of segments and verify read data
    +    val dataAndSegments = data.zip(segments).toSeq.permutations.take(10).flatten
    +    val reader = new WriteAheadLogRandomReader("file:///" + file.toString, hadoopConf)
    +    dataAndSegments.foreach { case(data, segment) =>
    --- End diff --
    
    Space after the open paren here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2882#discussion_r19193702
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala ---
    @@ -0,0 +1,223 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.streaming.util
    +
    +import java.nio.ByteBuffer
    +
    +import scala.collection.mutable.ArrayBuffer
    +import scala.concurrent.{ExecutionContext, Future}
    +
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.fs.Path
    +import org.apache.hadoop.fs.permission.FsPermission
    +import org.apache.spark.Logging
    +import org.apache.spark.util.Utils
    +import WriteAheadLogManager._
    +
    +/**
    + * This class manages write ahead log files.
    + * - Writes records (bytebuffers) to periodically rotating log files.
    + * - Recovers the log files and the reads the recovered records upon failures.
    + * - Cleans up old log files.
    + *
    + * Uses [[org.apache.spark.streaming.util.WriteAheadLogWriter]] to write
    + * and [[org.apache.spark.streaming.util.WriteAheadLogReader]] to read.
    + *
    + *@param logDirectory Directory when rotating log files will be created.
    + * @param hadoopConf Hadoop configuration for reading/writing log files.
    + * @param rollingIntervalSecs The interval in seconds with which logs will be rolled over.
    + *                            Default is one minute.
    + * @param maxFailures Max number of failures that is tolerated for every attempt to write to log.
    + *                    Default is three.
    + * @param callerName Optional name of the class who is using this manager.
    + * @param clock Optional clock that is used to check for rotation interval.
    + */
    +private[streaming] class WriteAheadLogManager(
    +    logDirectory: String,
    +    hadoopConf: Configuration,
    +    rollingIntervalSecs: Int = 60,
    +    maxFailures: Int = 3,
    +    callerName: String = "",
    +    clock: Clock = new SystemClock
    +  ) extends Logging {
    +
    +  private val pastLogs = new ArrayBuffer[LogInfo]
    +  private val callerNameTag =
    +    if (callerName != null && callerName.nonEmpty) s" for $callerName" else ""
    +  private val threadpoolName = s"WriteAheadLogManager $callerNameTag"
    +  implicit private val executionContext = ExecutionContext.fromExecutorService(
    +    Utils.newDaemonFixedThreadPool(1, threadpoolName))
    +  override protected val logName = s"WriteAheadLogManager $callerNameTag"
    +
    +  private var currentLogPath: String = null
    +  private var currentLogWriter: WriteAheadLogWriter = null
    +  private var currentLogWriterStartTime: Long = -1L
    +  private var currentLogWriterStopTime: Long = -1L
    +
    +  initializeOrRecover()
    +
    +  /** Write a byte buffer to the log file */
    +  def writeToLog(byteBuffer: ByteBuffer): FileSegment = synchronized {
    +    var fileSegment: FileSegment = null
    +    var failures = 0
    +    var lastException: Exception = null
    +    var succeeded = false
    +    while (!succeeded && failures < maxFailures) {
    +      try {
    +        fileSegment = getLogWriter(clock.currentTime).write(byteBuffer)
    +        succeeded = true
    +      } catch {
    +        case ex: Exception =>
    +          lastException = ex
    +          logWarning("Failed to write to write ahead log")
    +          resetWriter()
    +          failures += 1
    +      }
    +    }
    +    if (fileSegment == null) {
    +      logError(s"Failed to write to write ahead log after $failures failures")
    +      throw lastException
    +    }
    +    fileSegment
    +  }
    +
    +  /**
    +   * Read all the existing logs from the log directory.
    +   *
    +   * Note that this is typically called when the caller is initializing and wants
    +   * to recover past  state from the write ahead logs (that is, before making any writes).
    +   * If this is called after writes have been made using this manager, then it may not return
    +   * the latest the records. This does not deal with currently active log files, and
    +   * hence the implementation is kept simple.
    +   */
    +  def readFromLog(): Iterator[ByteBuffer] = synchronized {
    +    val logFilesToRead = pastLogs.map{ _.path} ++ Option(currentLogPath)
    +    logInfo("Reading from the logs: " + logFilesToRead.mkString("\n"))
    +    logFilesToRead.iterator.map { file =>
    +        logDebug(s"Creating log reader with $file")
    +        new WriteAheadLogReader(file, hadoopConf)
    +    } flatMap { x => x }
    --- End diff --
    
    I think you can just write `.flatten` instead of flatMapping over `identity`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

Posted by jerryshao <gi...@git.apache.org>.
Github user jerryshao commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2882#discussion_r19190926
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/util/FileSegment.scala ---
    @@ -0,0 +1,19 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.streaming.util
    +
    +private[streaming] case class FileSegment (path: String, offset: Long, length: Int)
    --- End diff --
    
    So maybe we can change to another name, I think.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2882#discussion_r19195094
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogWriter.scala ---
    @@ -0,0 +1,101 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.streaming.util
    +
    +import java.io._
    +import java.net.URI
    +import java.nio.ByteBuffer
    +
    +import scala.util.Try
    +
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.fs.{FSDataOutputStream, FileSystem}
    +
    +/**
    + * A writer for writing byte-buffers to a write ahead log file.
    + */
    +private[streaming] class WriteAheadLogWriter(path: String, hadoopConf: Configuration)
    +  extends Closeable {
    +  private val underlyingStream: Either[DataOutputStream, FSDataOutputStream] = {
    +    val uri = new URI(path)
    +    val defaultFs = FileSystem.getDefaultUri(hadoopConf).getScheme
    +    val isDefaultLocal = defaultFs == null || defaultFs == "file"
    +
    +    if ((isDefaultLocal && uri.getScheme == null) || uri.getScheme == "file") {
    +      assert(!new File(uri.getPath).exists)
    +      Left(new DataOutputStream(new BufferedOutputStream(new FileOutputStream(uri.getPath))))
    +    } else {
    +      Right(HdfsUtils.getOutputStream(path, hadoopConf))
    +    }
    +  }
    +
    +  private lazy val hadoopFlushMethod = {
    +    val cls = classOf[FSDataOutputStream]
    +    Try(cls.getMethod("hflush")).orElse(Try(cls.getMethod("sync"))).toOption
    --- End diff --
    
    Nice Scala one-liner :)
    
    Why do we need this reflection, though?  Is this necessary to support multiple Hadoop versions?  If so, could you add a one-line comment to explain this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/2882#issuecomment-60171305
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/22046/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the pull request:

    https://github.com/apache/spark/pull/2882#issuecomment-60183139
  
    Have you done a sbt/sbt clean? I did it. 
    Either ways, there is a real problem. The hdfs-minicluster depends on hdfs-core stuff, a lot of whose dependencies have been excluded in the core/pom.xml. That's because we dont want HDFS versions specific dependencies to interfere with Spark build, This is okay since we dont need to run HDFS, just interface with it. But in this case, since we need to run HDFS as a minicluster, those exclusions are a problem. 
    
    There are probably two possible solutions
    1. Based on maven profile, reinclude the excluded stuff in test and run the mini-cluster. This is going to painfully complicated.
    2.  Not use mini-cluster, and rather have two different code paths for local file system and hdfs. More complex code, but simpler to maintain, though the unit test will not really test HDFS flushes correctly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2882#issuecomment-60333918
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22103/consoleFull) for   PR 2882 at commit [`e4bee20`](https://github.com/apache/spark/commit/e4bee2065293d7373c43fe5636dd9971dede257e).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2882#discussion_r19195244
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala ---
    @@ -0,0 +1,223 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.streaming.util
    +
    +import java.nio.ByteBuffer
    +
    +import scala.collection.mutable.ArrayBuffer
    +import scala.concurrent.{ExecutionContext, Future}
    +
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.fs.Path
    +import org.apache.hadoop.fs.permission.FsPermission
    +import org.apache.spark.Logging
    +import org.apache.spark.util.Utils
    +import WriteAheadLogManager._
    +
    +/**
    + * This class manages write ahead log files.
    + * - Writes records (bytebuffers) to periodically rotating log files.
    + * - Recovers the log files and the reads the recovered records upon failures.
    + * - Cleans up old log files.
    + *
    + * Uses [[org.apache.spark.streaming.util.WriteAheadLogWriter]] to write
    + * and [[org.apache.spark.streaming.util.WriteAheadLogReader]] to read.
    + *
    + *@param logDirectory Directory when rotating log files will be created.
    + * @param hadoopConf Hadoop configuration for reading/writing log files.
    + * @param rollingIntervalSecs The interval in seconds with which logs will be rolled over.
    + *                            Default is one minute.
    + * @param maxFailures Max number of failures that is tolerated for every attempt to write to log.
    + *                    Default is three.
    + * @param callerName Optional name of the class who is using this manager.
    + * @param clock Optional clock that is used to check for rotation interval.
    + */
    +private[streaming] class WriteAheadLogManager(
    +    logDirectory: String,
    +    hadoopConf: Configuration,
    +    rollingIntervalSecs: Int = 60,
    +    maxFailures: Int = 3,
    +    callerName: String = "",
    +    clock: Clock = new SystemClock
    +  ) extends Logging {
    +
    +  private val pastLogs = new ArrayBuffer[LogInfo]
    +  private val callerNameTag =
    +    if (callerName != null && callerName.nonEmpty) s" for $callerName" else ""
    +  private val threadpoolName = s"WriteAheadLogManager $callerNameTag"
    +  implicit private val executionContext = ExecutionContext.fromExecutorService(
    +    Utils.newDaemonFixedThreadPool(1, threadpoolName))
    +  override protected val logName = s"WriteAheadLogManager $callerNameTag"
    +
    +  private var currentLogPath: String = null
    +  private var currentLogWriter: WriteAheadLogWriter = null
    +  private var currentLogWriterStartTime: Long = -1L
    +  private var currentLogWriterStopTime: Long = -1L
    +
    +  initializeOrRecover()
    +
    +  /** Write a byte buffer to the log file */
    --- End diff --
    
    It might be nice to explicitly state that this write is synchronous / blocking, if that's the case.  If I call this and it returns, am I guaranteed that my log record is durably stored in HDFS and readable during recovery?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the pull request:

    https://github.com/apache/spark/pull/2882#issuecomment-60322270
  
    @JoshRosen whenever you get a chance. :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2882#discussion_r19195548
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala ---
    @@ -0,0 +1,72 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.streaming.util
    +
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.fs.{FSDataInputStream, FSDataOutputStream, Path}
    +
    +private[streaming] object HdfsUtils {
    +
    +  def getOutputStream(path: String, conf: Configuration): FSDataOutputStream = {
    +    // HDFS is not thread-safe when getFileSystem is called, so synchronize on that
    +
    +    val dfsPath = new Path(path)
    +    val dfs =
    +      this.synchronized {
    +        dfsPath.getFileSystem(conf)
    +      }
    +    // If the file exists and we have append support, append instead of creating a new file
    +    val stream: FSDataOutputStream = {
    +      if (dfs.isFile(dfsPath)) {
    +        if (conf.getBoolean("hdfs.append.support", false)) {
    +          dfs.append(dfsPath)
    +        } else {
    +          throw new IllegalStateException("File exists and there is no append support!")
    +        }
    +      } else {
    +        dfs.create(dfsPath)
    +      }
    +    }
    +    stream
    +  }
    +
    +  def getInputStream(path: String, conf: Configuration): FSDataInputStream = {
    +    val dfsPath = new Path(path)
    +    val dfs = this.synchronized {
    +      dfsPath.getFileSystem(conf)
    +    }
    +    val instream = dfs.open(dfsPath)
    +    instream
    +  }
    +
    +  def checkState(state: Boolean, errorMsg: => String) {
    +    if(!state) {
    --- End diff --
    
    Space after the `if`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2882#issuecomment-60201254
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22067/consoleFull) for   PR 2882 at commit [`3881706`](https://github.com/apache/spark/commit/38817069e66cc8c161cc2a8033873a3342cff4e2).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2882#discussion_r19198019
  
    --- Diff: streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala ---
    @@ -0,0 +1,298 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.streaming.util
    +
    +import java.io.{DataInputStream, File, FileInputStream, RandomAccessFile}
    +import java.nio.ByteBuffer
    +
    +import scala.collection.mutable.ArrayBuffer
    +import scala.concurrent.duration._
    +import scala.language.implicitConversions
    +import scala.language.postfixOps
    +import scala.util.Random
    +
    +import WriteAheadLogSuite._
    +import com.google.common.io.Files
    +import org.apache.commons.io.FileUtils
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.spark.util.Utils
    +import org.scalatest.{BeforeAndAfter, FunSuite}
    +import org.scalatest.concurrent.Eventually._
    +
    +/**
    + * This testsuite tests all classes related to write ahead logs.
    + */
    +class WriteAheadLogSuite extends FunSuite with BeforeAndAfter {
    +
    +  val hadoopConf = new Configuration()
    +  var tempDirectory: File = null
    +
    +  before {
    +    tempDirectory = Files.createTempDir()
    +  }
    +
    +  after {
    +    if (tempDirectory != null && tempDirectory.exists()) {
    +      FileUtils.deleteDirectory(tempDirectory)
    +      tempDirectory = null
    +    }
    +  }
    +
    +  test("WriteAheadLogWriter - writing data") {
    +    val file = new File(tempDirectory, Random.nextString(10))
    +    val dataToWrite = generateRandomData()
    +    val writer = new WriteAheadLogWriter("file:///" + file, hadoopConf)
    +    val segments = dataToWrite.map(data => writer.write(data))
    +    writer.close()
    +    val writtenData = readDataManually(file, segments)
    +    assert(writtenData.toArray === dataToWrite.toArray)
    +  }
    +
    +  test("WriteAheadLogWriter - syncing of data by writing and reading immediately") {
    +    val file = new File(tempDirectory, Random.nextString(10))
    +    val dataToWrite = generateRandomData()
    +    val writer = new WriteAheadLogWriter("file:///" + file, hadoopConf)
    +    dataToWrite.foreach { data =>
    +      val segment = writer.write(data)
    +      assert(readDataManually(file, Seq(segment)).head === data)
    +    }
    +    writer.close()
    +  }
    +
    +  test("WriteAheadLogReader - sequentially reading data") {
    +    // Write data manually for testing the sequential reader
    +    val file = File.createTempFile("TestSequentialReads", "", tempDirectory)
    +    val writtenData = generateRandomData()
    +    writeDataManually(writtenData, file)
    +    val reader = new WriteAheadLogReader("file:///" + file.toString, hadoopConf)
    +    val readData = reader.toSeq.map(byteBufferToString)
    +    assert(readData.toList === writtenData.toList)
    +    assert(reader.hasNext === false)
    +    intercept[Exception] {
    +      reader.next()
    +    }
    +    reader.close()
    +  }
    +
    +  test("WriteAheadLogReader - sequentially reading data written with writer") {
    +    // Write data manually for testing the sequential reader
    +    val file = new File(tempDirectory, "TestWriter")
    +    val dataToWrite = generateRandomData()
    +    writeDataUsingWriter(file, dataToWrite)
    +    val iter = dataToWrite.iterator
    +    val reader = new WriteAheadLogReader("file:///" + file.toString, hadoopConf)
    +    reader.foreach { byteBuffer =>
    +      assert(byteBufferToString(byteBuffer) === iter.next())
    +    }
    +    reader.close()
    +  }
    +
    +  test("WriteAheadLogRandomReader - reading data using random reader") {
    +    // Write data manually for testing the random reader
    +    val file = File.createTempFile("TestRandomReads", "", tempDirectory)
    +    val writtenData = generateRandomData()
    +    val segments = writeDataManually(writtenData, file)
    +
    +    // Get a random order of these segments and read them back
    +    val writtenDataAndSegments = writtenData.zip(segments).toSeq.permutations.take(10).flatten
    +    val reader = new WriteAheadLogRandomReader("file:///" + file.toString, hadoopConf)
    +    writtenDataAndSegments.foreach { case (data, segment) =>
    +      assert(data === byteBufferToString(reader.read(segment)))
    +    }
    +    reader.close()
    +  }
    +
    +  test("WriteAheadLogRandomReader - reading data using random reader written with writer") {
    +    // Write data using writer for testing the random reader
    +    val file = new File(tempDirectory, "TestRandomReads")
    +    val data = generateRandomData()
    +    val segments = writeDataUsingWriter(file, data)
    +
    +    // Read a random sequence of segments and verify read data
    +    val dataAndSegments = data.zip(segments).toSeq.permutations.take(10).flatten
    +    val reader = new WriteAheadLogRandomReader("file:///" + file.toString, hadoopConf)
    +    dataAndSegments.foreach { case(data, segment) =>
    +      assert(data === byteBufferToString(reader.read(segment)))
    +    }
    +    reader.close()
    +  }
    +
    +  test("WriteAheadLogManager - write rotating logs") {
    +    // Write data using manager
    +    val dataToWrite = generateRandomData(10)
    +    writeDataUsingManager(tempDirectory, dataToWrite)
    +
    +    // Read data manually to verify the written data
    +    val logFiles = getLogFilesInDirectory(tempDirectory)
    +    assert(logFiles.size > 1)
    +    val writtenData = logFiles.flatMap { file => readDataManually(file) }
    +    assert(writtenData.toList === dataToWrite.toList)
    +  }
    +
    +  test("WriteAheadLogManager - read rotating logs") {
    +    // Write data manually for testing reading through manager
    +    val writtenData = (1 to 10).map { i =>
    +      val data = generateRandomData(10)
    +      val file = new File(tempDirectory, s"log-$i-${i + 1}")
    +      writeDataManually(data, file)
    +      data
    +    }.flatten
    +
    +    // Read data using manager and verify
    +    val readData = readDataUsingManager(tempDirectory)
    +    assert(readData.toList === writtenData.toList)
    +  }
    +
    +  test("WriteAheadLogManager - recover past logs when creating new manager") {
    +    // Write data with manager, recover with new manager and verify
    +    val dataToWrite = generateRandomData(100)
    +    writeDataUsingManager(tempDirectory, dataToWrite)
    +    val logFiles = getLogFilesInDirectory(tempDirectory)
    +    assert(logFiles.size > 1)
    +    val readData = readDataUsingManager(tempDirectory)
    +    assert(dataToWrite.toList === readData.toList)
    +  }
    +
    +  test("WriteAheadLogManager - cleanup old logs") {
    +    // Write data with manager, recover with new manager and verify
    +    val dataToWrite = generateRandomData(100)
    +    val fakeClock = new ManualClock
    +    val manager = new WriteAheadLogManager(tempDirectory.toString, hadoopConf,
    +      rollingIntervalSecs = 1, callerName = "WriteAheadLogSuite", clock = fakeClock)
    +    dataToWrite.foreach { item =>
    +      fakeClock.addToTime(500) // half second for each
    +      manager.writeToLog(item)
    +    }
    +    val logFiles = getLogFilesInDirectory(tempDirectory)
    +    assert(logFiles.size > 1)
    +    manager.cleanupOldLogs(fakeClock.currentTime() / 2)
    +    eventually(timeout(1 second), interval(10 milliseconds)) {
    +      assert(getLogFilesInDirectory(tempDirectory).size < logFiles.size)
    +    }
    +  }
    +
    +  // TODO (Hari, TD): Test different failure conditions of writers and readers.
    +  //  - Failure while reading incomplete/corrupt file
    +}
    +
    +object WriteAheadLogSuite {
    +
    +  private val hadoopConf = new Configuration()
    +
    +  /**
    +   * Write data to the file and returns the an array of the bytes written.
    +   * This is used to test the WAL reader independently of the WAL writer.
    +   */
    +  def writeDataManually(data: Seq[String], file: File): Seq[FileSegment] = {
    +    val segments = new ArrayBuffer[FileSegment]()
    +    val writer = new RandomAccessFile(file, "rw")
    +    data.foreach { item =>
    +      val offset = writer.getFilePointer()
    +      val bytes = Utils.serialize(item)
    +      writer.writeInt(bytes.size)
    +      writer.write(bytes)
    +      segments += FileSegment(file.toString, offset, bytes.size)
    +    }
    +    writer.close()
    +    segments
    +  }
    +
    +  def writeDataUsingWriter(file: File, data: Seq[String]): Seq[FileSegment] = {
    +    val writer = new WriteAheadLogWriter(file.toString, hadoopConf)
    +    val segments = data.map {
    +      item => writer.write(item)
    +    }
    +    writer.close()
    +    segments
    +  }
    +
    +  def writeDataUsingManager(logDirectory: File, data: Seq[String]) {
    +    val fakeClock = new ManualClock
    +    val manager = new WriteAheadLogManager(logDirectory.toString, hadoopConf,
    +      rollingIntervalSecs = 1, callerName = "WriteAheadLogSuite", clock = fakeClock)
    +    data.foreach { item =>
    +      fakeClock.addToTime(500)
    +      manager.writeToLog(item)
    +    }
    +    manager.stop()
    +  }
    +
    +  /**
    +   * Read data from the given segments of log file and returns the read list of byte buffers.
    +   * This is used to test the WAL writer independently of the WAL reader.
    +   */
    +  def readDataManually(file: File, segments: Seq[FileSegment]): Seq[String] = {
    +    val reader = new RandomAccessFile(file, "r")
    +    segments.map { x =>
    +      reader.seek(x.offset)
    +      val data = new Array[Byte](x.length)
    +      reader.readInt()
    +      reader.readFully(data)
    +      Utils.deserialize[String](data)
    +    }
    +  }
    +
    +  def readDataManually(file: File): Seq[String] = {
    +    val reader = new DataInputStream(new FileInputStream(file))
    +    val buffer = new ArrayBuffer[String]
    +    try {
    +      while (reader.available > 0) {
    +        val length = reader.readInt()
    +        val bytes = new Array[Byte](length)
    +        reader.read(bytes)
    +        buffer += Utils.deserialize[String](bytes)
    +      }
    +    } finally {
    +      reader.close()
    +    }
    +    buffer
    +  }
    +
    +  def readDataUsingManager(logDirectory: File): Seq[String] = {
    +    val manager = new WriteAheadLogManager(logDirectory.toString, hadoopConf,
    +      callerName = "WriteAheadLogSuite")
    +    val data = manager.readFromLog().map(byteBufferToString).toSeq
    +    manager.stop()
    +    data
    +  }
    +
    +  def generateRandomData(numItems: Int = 50, itemSize: Int = 50): Seq[String] = {
    +    (1 to numItems).map { _.toString }
    +  }
    +
    +  def getLogFilesInDirectory(directory: File): Seq[File] = {
    +    if (directory.exists) {
    +      directory.listFiles().filter(_.getName().startsWith("log-"))
    +        .sortBy(_.getName.split("-")(1).toLong)
    +    } else {
    +      Seq.empty
    +    }
    +  }
    +
    +  def printData(data: Seq[String]) {
    --- End diff --
    
    Right. @harishreedharan can you remove this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/2882


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2882#issuecomment-60274484
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22075/consoleFull) for   PR 2882 at commit [`d29fddd`](https://github.com/apache/spark/commit/d29fddd880fd7efec8ed05017a12600bcb2aa829).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2882#discussion_r19190537
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/util/FileSegment.scala ---
    @@ -0,0 +1,19 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.streaming.util
    +
    +private[streaming] case class FileSegment (path: String, offset: Long, length: Int)
    --- End diff --
    
    Good point. But that FileSegment is for local files and uses java's File to store the file name. So can;t use that. And I did not want to change anything in core because of streaming.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

Posted by harishreedharan <gi...@git.apache.org>.
Github user harishreedharan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2882#discussion_r19195209
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/util/FileSegment.scala ---
    @@ -0,0 +1,19 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.streaming.util
    +
    +private[streaming] case class FileSegment (path: String, offset: Long, length: Int)
    --- End diff --
    
    Will make the change in the next update.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2882#discussion_r19193688
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala ---
    @@ -0,0 +1,223 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.streaming.util
    +
    +import java.nio.ByteBuffer
    +
    +import scala.collection.mutable.ArrayBuffer
    +import scala.concurrent.{ExecutionContext, Future}
    +
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.fs.Path
    +import org.apache.hadoop.fs.permission.FsPermission
    +import org.apache.spark.Logging
    +import org.apache.spark.util.Utils
    +import WriteAheadLogManager._
    +
    +/**
    + * This class manages write ahead log files.
    + * - Writes records (bytebuffers) to periodically rotating log files.
    + * - Recovers the log files and the reads the recovered records upon failures.
    + * - Cleans up old log files.
    + *
    + * Uses [[org.apache.spark.streaming.util.WriteAheadLogWriter]] to write
    + * and [[org.apache.spark.streaming.util.WriteAheadLogReader]] to read.
    + *
    + *@param logDirectory Directory when rotating log files will be created.
    + * @param hadoopConf Hadoop configuration for reading/writing log files.
    + * @param rollingIntervalSecs The interval in seconds with which logs will be rolled over.
    + *                            Default is one minute.
    + * @param maxFailures Max number of failures that is tolerated for every attempt to write to log.
    + *                    Default is three.
    + * @param callerName Optional name of the class who is using this manager.
    + * @param clock Optional clock that is used to check for rotation interval.
    + */
    +private[streaming] class WriteAheadLogManager(
    +    logDirectory: String,
    +    hadoopConf: Configuration,
    +    rollingIntervalSecs: Int = 60,
    +    maxFailures: Int = 3,
    +    callerName: String = "",
    +    clock: Clock = new SystemClock
    +  ) extends Logging {
    +
    +  private val pastLogs = new ArrayBuffer[LogInfo]
    +  private val callerNameTag =
    +    if (callerName != null && callerName.nonEmpty) s" for $callerName" else ""
    +  private val threadpoolName = s"WriteAheadLogManager $callerNameTag"
    +  implicit private val executionContext = ExecutionContext.fromExecutorService(
    +    Utils.newDaemonFixedThreadPool(1, threadpoolName))
    +  override protected val logName = s"WriteAheadLogManager $callerNameTag"
    +
    +  private var currentLogPath: String = null
    +  private var currentLogWriter: WriteAheadLogWriter = null
    +  private var currentLogWriterStartTime: Long = -1L
    +  private var currentLogWriterStopTime: Long = -1L
    +
    +  initializeOrRecover()
    +
    +  /** Write a byte buffer to the log file */
    +  def writeToLog(byteBuffer: ByteBuffer): FileSegment = synchronized {
    +    var fileSegment: FileSegment = null
    +    var failures = 0
    +    var lastException: Exception = null
    +    var succeeded = false
    +    while (!succeeded && failures < maxFailures) {
    +      try {
    +        fileSegment = getLogWriter(clock.currentTime).write(byteBuffer)
    +        succeeded = true
    +      } catch {
    +        case ex: Exception =>
    +          lastException = ex
    +          logWarning("Failed to write to write ahead log")
    +          resetWriter()
    +          failures += 1
    +      }
    +    }
    +    if (fileSegment == null) {
    +      logError(s"Failed to write to write ahead log after $failures failures")
    +      throw lastException
    +    }
    +    fileSegment
    +  }
    +
    +  /**
    +   * Read all the existing logs from the log directory.
    +   *
    +   * Note that this is typically called when the caller is initializing and wants
    +   * to recover past  state from the write ahead logs (that is, before making any writes).
    +   * If this is called after writes have been made using this manager, then it may not return
    +   * the latest the records. This does not deal with currently active log files, and
    +   * hence the implementation is kept simple.
    +   */
    +  def readFromLog(): Iterator[ByteBuffer] = synchronized {
    +    val logFilesToRead = pastLogs.map{ _.path} ++ Option(currentLogPath)
    --- End diff --
    
    Why not make `currentLogPath` into an `Option[String]`?  This seems clearer than wrapping a null into an option.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2882#discussion_r19318524
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala ---
    @@ -0,0 +1,71 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.streaming.util
    +
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.fs.{LocalFileSystem, FSDataInputStream, FSDataOutputStream, Path}
    +
    +private[streaming] object HdfsUtils {
    +
    +  def getOutputStream(path: String, conf: Configuration): FSDataOutputStream = {
    +    // HDFS is not thread-safe when getFileSystem is called, so synchronize on that
    --- End diff --
    
    Moved.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

Posted by harishreedharan <gi...@git.apache.org>.
Github user harishreedharan commented on the pull request:

    https://github.com/apache/spark/pull/2882#issuecomment-60180749
  
    Any idea why MiniDFSCluster can't be resolved via sbt, via it works fine via mvn?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the pull request:

    https://github.com/apache/spark/pull/2882#issuecomment-60181254
  
    I am running the test suite locally with sbt, and getting this!
    ```
    [error] Uncaught exception when running org.apache.spark.streaming.util.WriteAheadLogSuite: java.lang.NoClassDefFoundError: org/mortbay/thread/ThreadPool
    sbt.ForkMain$ForkError: org/mortbay/thread/ThreadPool
    	at org.apache.hadoop.hdfs.server.namenode.NameNode$1.run(NameNode.java:358)
    	at org.apache.hadoop.hdfs.server.namenode.NameNode$1.run(NameNode.java:353)
    	at java.security.AccessController.doPrivileged(Native Method)
    	at javax.security.auth.Subject.doAs(Subject.java:415)
    	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121)
    	at org.apache.hadoop.hdfs.server.namenode.NameNode.startHttpServer(NameNode.java:353)
    	at org.apache.hadoop.hdfs.server.namenode.NameNode.initialize(NameNode.java:305)
    	at org.apache.hadoop.hdfs.server.namenode.NameNode.<init>(NameNode.java:496)
    	at org.apache.hadoop.hdfs.server.namenode.NameNode.createNameNode(NameNode.java:1279)
    	at org.apache.hadoop.hdfs.MiniDFSCluster.<init>(MiniDFSCluster.java:277)
    	at org.apache.hadoop.hdfs.MiniDFSCluster.<init>(MiniDFSCluster.java:124)
    	at org.apache.spark.streaming.util.WriteAheadLogSuite.<init>(WriteAheadLogSuite.scala:46)
    	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
    	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    	at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
    	at java.lang.Class.newInstance(Class.java:374)
    	at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:621)
    	at sbt.ForkMain$Run$2.call(ForkMain.java:294)
    	at sbt.ForkMain$Run$2.call(ForkMain.java:284)
    	at java.util.concurrent.FutureTask.run(FutureTask.java:262)
    	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    	at java.lang.Thread.run(Thread.java:744)
    Caused by: sbt.ForkMain$ForkError: org.mortbay.thread.ThreadPool
    	at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
    	at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
    	at java.security.AccessController.doPrivileged(Native Method)
    	at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
    	at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
    	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
    	at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
    	at org.apache.hadoop.hdfs.server.namenode.NameNode$1.run(NameNode.java:358)
    	at org.apache.hadoop.hdfs.server.namenode.NameNode$1.run(NameNode.java:353)
    	at java.security.AccessController.doPrivileged(Native Method)
    	at javax.security.auth.Subject.doAs(Subject.java:415)
    	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121)
    	at org.apache.hadoop.hdfs.server.namenode.NameNode.startHttpServer(NameNode.java:353)
    	at org.apache.hadoop.hdfs.server.namenode.NameNode.initialize(NameNode.java:305)
    	at org.apache.hadoop.hdfs.server.namenode.NameNode.<init>(NameNode.java:496)
    	at org.apache.hadoop.hdfs.server.namenode.NameNode.createNameNode(NameNode.java:1279)
    	at org.apache.hadoop.hdfs.MiniDFSCluster.<init>(MiniDFSCluster.java:277)
    	at org.apache.hadoop.hdfs.MiniDFSCluster.<init>(MiniDFSCluster.java:124)
    	at org.apache.spark.streaming.util.WriteAheadLogSuite.<init>(WriteAheadLogSuite.scala:46)
    	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
    	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    	at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
    	at java.lang.Class.newInstance(Class.java:374)
    	at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:621)
    	at sbt.ForkMain$Run$2.call(ForkMain.java:294)
    	at sbt.ForkMain$Run$2.call(ForkMain.java:284)
    	at java.util.concurrent.FutureTask.run(FutureTask.java:262)
    	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    	at java.lang.Thread.run(Thread.java:744)
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4026][Streaming] synchronously write re...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the pull request:

    https://github.com/apache/spark/pull/2882#issuecomment-59989801
  
    Please review this @JoshRosen 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

Posted by harishreedharan <gi...@git.apache.org>.
Github user harishreedharan commented on the pull request:

    https://github.com/apache/spark/pull/2882#issuecomment-60429942
  
    Talked to @cmccabe who says we should not worry about the thread-safety. If at all there was an issue, it was in too old a version which we need not worry about. Let's merge this!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2882#discussion_r19194043
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogWriter.scala ---
    @@ -0,0 +1,101 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.streaming.util
    +
    +import java.io._
    +import java.net.URI
    +import java.nio.ByteBuffer
    +
    +import scala.util.Try
    +
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.fs.{FSDataOutputStream, FileSystem}
    +
    +/**
    + * A writer for writing byte-buffers to a write ahead log file.
    + */
    +private[streaming] class WriteAheadLogWriter(path: String, hadoopConf: Configuration)
    +  extends Closeable {
    +  private val underlyingStream: Either[DataOutputStream, FSDataOutputStream] = {
    --- End diff --
    
    Ah, that makes sense.  I guess you can still use the HDFS API to write to local files for testing purposes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2882#discussion_r19193668
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala ---
    @@ -0,0 +1,223 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.streaming.util
    +
    +import java.nio.ByteBuffer
    +
    +import scala.collection.mutable.ArrayBuffer
    +import scala.concurrent.{ExecutionContext, Future}
    +
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.fs.Path
    +import org.apache.hadoop.fs.permission.FsPermission
    +import org.apache.spark.Logging
    +import org.apache.spark.util.Utils
    +import WriteAheadLogManager._
    +
    +/**
    + * This class manages write ahead log files.
    + * - Writes records (bytebuffers) to periodically rotating log files.
    + * - Recovers the log files and the reads the recovered records upon failures.
    + * - Cleans up old log files.
    + *
    + * Uses [[org.apache.spark.streaming.util.WriteAheadLogWriter]] to write
    + * and [[org.apache.spark.streaming.util.WriteAheadLogReader]] to read.
    + *
    + *@param logDirectory Directory when rotating log files will be created.
    + * @param hadoopConf Hadoop configuration for reading/writing log files.
    + * @param rollingIntervalSecs The interval in seconds with which logs will be rolled over.
    + *                            Default is one minute.
    + * @param maxFailures Max number of failures that is tolerated for every attempt to write to log.
    + *                    Default is three.
    + * @param callerName Optional name of the class who is using this manager.
    + * @param clock Optional clock that is used to check for rotation interval.
    + */
    +private[streaming] class WriteAheadLogManager(
    +    logDirectory: String,
    +    hadoopConf: Configuration,
    +    rollingIntervalSecs: Int = 60,
    +    maxFailures: Int = 3,
    +    callerName: String = "",
    +    clock: Clock = new SystemClock
    +  ) extends Logging {
    +
    +  private val pastLogs = new ArrayBuffer[LogInfo]
    +  private val callerNameTag =
    +    if (callerName != null && callerName.nonEmpty) s" for $callerName" else ""
    +  private val threadpoolName = s"WriteAheadLogManager $callerNameTag"
    +  implicit private val executionContext = ExecutionContext.fromExecutorService(
    +    Utils.newDaemonFixedThreadPool(1, threadpoolName))
    +  override protected val logName = s"WriteAheadLogManager $callerNameTag"
    +
    +  private var currentLogPath: String = null
    +  private var currentLogWriter: WriteAheadLogWriter = null
    +  private var currentLogWriterStartTime: Long = -1L
    +  private var currentLogWriterStopTime: Long = -1L
    +
    +  initializeOrRecover()
    +
    +  /** Write a byte buffer to the log file */
    +  def writeToLog(byteBuffer: ByteBuffer): FileSegment = synchronized {
    +    var fileSegment: FileSegment = null
    +    var failures = 0
    +    var lastException: Exception = null
    +    var succeeded = false
    +    while (!succeeded && failures < maxFailures) {
    +      try {
    +        fileSegment = getLogWriter(clock.currentTime).write(byteBuffer)
    +        succeeded = true
    +      } catch {
    +        case ex: Exception =>
    +          lastException = ex
    +          logWarning("Failed to write to write ahead log")
    +          resetWriter()
    +          failures += 1
    +      }
    +    }
    +    if (fileSegment == null) {
    +      logError(s"Failed to write to write ahead log after $failures failures")
    +      throw lastException
    +    }
    +    fileSegment
    +  }
    +
    +  /**
    +   * Read all the existing logs from the log directory.
    +   *
    +   * Note that this is typically called when the caller is initializing and wants
    +   * to recover past  state from the write ahead logs (that is, before making any writes).
    --- End diff --
    
    Extra space after past.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

Posted by harishreedharan <gi...@git.apache.org>.
Github user harishreedharan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2882#discussion_r19195364
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala ---
    @@ -0,0 +1,223 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.streaming.util
    +
    +import java.nio.ByteBuffer
    +
    +import scala.collection.mutable.ArrayBuffer
    +import scala.concurrent.{ExecutionContext, Future}
    +
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.fs.Path
    +import org.apache.hadoop.fs.permission.FsPermission
    +import org.apache.spark.Logging
    +import org.apache.spark.util.Utils
    +import WriteAheadLogManager._
    +
    +/**
    + * This class manages write ahead log files.
    + * - Writes records (bytebuffers) to periodically rotating log files.
    + * - Recovers the log files and the reads the recovered records upon failures.
    + * - Cleans up old log files.
    + *
    + * Uses [[org.apache.spark.streaming.util.WriteAheadLogWriter]] to write
    + * and [[org.apache.spark.streaming.util.WriteAheadLogReader]] to read.
    + *
    + *@param logDirectory Directory when rotating log files will be created.
    + * @param hadoopConf Hadoop configuration for reading/writing log files.
    + * @param rollingIntervalSecs The interval in seconds with which logs will be rolled over.
    + *                            Default is one minute.
    + * @param maxFailures Max number of failures that is tolerated for every attempt to write to log.
    + *                    Default is three.
    + * @param callerName Optional name of the class who is using this manager.
    + * @param clock Optional clock that is used to check for rotation interval.
    + */
    +private[streaming] class WriteAheadLogManager(
    +    logDirectory: String,
    +    hadoopConf: Configuration,
    +    rollingIntervalSecs: Int = 60,
    +    maxFailures: Int = 3,
    +    callerName: String = "",
    +    clock: Clock = new SystemClock
    +  ) extends Logging {
    +
    +  private val pastLogs = new ArrayBuffer[LogInfo]
    +  private val callerNameTag =
    +    if (callerName != null && callerName.nonEmpty) s" for $callerName" else ""
    +  private val threadpoolName = s"WriteAheadLogManager $callerNameTag"
    +  implicit private val executionContext = ExecutionContext.fromExecutorService(
    +    Utils.newDaemonFixedThreadPool(1, threadpoolName))
    +  override protected val logName = s"WriteAheadLogManager $callerNameTag"
    +
    +  private var currentLogPath: String = null
    +  private var currentLogWriter: WriteAheadLogWriter = null
    +  private var currentLogWriterStartTime: Long = -1L
    +  private var currentLogWriterStopTime: Long = -1L
    +
    +  initializeOrRecover()
    +
    +  /** Write a byte buffer to the log file */
    --- End diff --
    
    Yes, that is right. I believe we are lacking Scaladocs everywhere - we should do it I guess.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the pull request:

    https://github.com/apache/spark/pull/2882#issuecomment-60047058
  
    Yes, the deletion is completely application specific and it is left for the application of the WAL to decide when to cleanup. 
    
    In case of Spark Streaming, the scheduler of Spark Streaming intelligently keeps track of which batch is completely done, so any associated data + metadata can be cleaned.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

Posted by harishreedharan <gi...@git.apache.org>.
Github user harishreedharan commented on the pull request:

    https://github.com/apache/spark/pull/2882#issuecomment-60181713
  
    I am getting the same error as Jenkins on sbt. Looks like sbt somehow is not adding the Minicluster jar to classpath for the tests, though it runs fine in mvn and intellij!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2882#discussion_r19318248
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala ---
    @@ -0,0 +1,223 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.streaming.util
    +
    +import java.nio.ByteBuffer
    +
    +import scala.collection.mutable.ArrayBuffer
    +import scala.concurrent.{ExecutionContext, Future}
    +
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.fs.Path
    +import org.apache.hadoop.fs.permission.FsPermission
    +import org.apache.spark.Logging
    +import org.apache.spark.util.Utils
    +import WriteAheadLogManager._
    +
    +/**
    + * This class manages write ahead log files.
    + * - Writes records (bytebuffers) to periodically rotating log files.
    + * - Recovers the log files and the reads the recovered records upon failures.
    + * - Cleans up old log files.
    + *
    + * Uses [[org.apache.spark.streaming.util.WriteAheadLogWriter]] to write
    + * and [[org.apache.spark.streaming.util.WriteAheadLogReader]] to read.
    + *
    + *@param logDirectory Directory when rotating log files will be created.
    + * @param hadoopConf Hadoop configuration for reading/writing log files.
    + * @param rollingIntervalSecs The interval in seconds with which logs will be rolled over.
    + *                            Default is one minute.
    + * @param maxFailures Max number of failures that is tolerated for every attempt to write to log.
    + *                    Default is three.
    + * @param callerName Optional name of the class who is using this manager.
    + * @param clock Optional clock that is used to check for rotation interval.
    + */
    +private[streaming] class WriteAheadLogManager(
    +    logDirectory: String,
    +    hadoopConf: Configuration,
    +    rollingIntervalSecs: Int = 60,
    +    maxFailures: Int = 3,
    +    callerName: String = "",
    +    clock: Clock = new SystemClock
    +  ) extends Logging {
    +
    +  private val pastLogs = new ArrayBuffer[LogInfo]
    +  private val callerNameTag =
    +    if (callerName != null && callerName.nonEmpty) s" for $callerName" else ""
    +  private val threadpoolName = s"WriteAheadLogManager $callerNameTag"
    +  implicit private val executionContext = ExecutionContext.fromExecutorService(
    +    Utils.newDaemonFixedThreadPool(1, threadpoolName))
    +  override protected val logName = s"WriteAheadLogManager $callerNameTag"
    +
    +  private var currentLogPath: String = null
    +  private var currentLogWriter: WriteAheadLogWriter = null
    +  private var currentLogWriterStartTime: Long = -1L
    +  private var currentLogWriterStopTime: Long = -1L
    +
    +  initializeOrRecover()
    +
    +  /** Write a byte buffer to the log file */
    +  def writeToLog(byteBuffer: ByteBuffer): FileSegment = synchronized {
    +    var fileSegment: FileSegment = null
    +    var failures = 0
    +    var lastException: Exception = null
    +    var succeeded = false
    +    while (!succeeded && failures < maxFailures) {
    +      try {
    +        fileSegment = getLogWriter(clock.currentTime).write(byteBuffer)
    +        succeeded = true
    +      } catch {
    +        case ex: Exception =>
    +          lastException = ex
    +          logWarning("Failed to write to write ahead log")
    +          resetWriter()
    +          failures += 1
    +      }
    +    }
    +    if (fileSegment == null) {
    +      logError(s"Failed to write to write ahead log after $failures failures")
    +      throw lastException
    +    }
    +    fileSegment
    +  }
    +
    +  /**
    +   * Read all the existing logs from the log directory.
    +   *
    +   * Note that this is typically called when the caller is initializing and wants
    +   * to recover past  state from the write ahead logs (that is, before making any writes).
    +   * If this is called after writes have been made using this manager, then it may not return
    +   * the latest the records. This does not deal with currently active log files, and
    +   * hence the implementation is kept simple.
    +   */
    +  def readFromLog(): Iterator[ByteBuffer] = synchronized {
    +    val logFilesToRead = pastLogs.map{ _.path} ++ Option(currentLogPath)
    +    logInfo("Reading from the logs: " + logFilesToRead.mkString("\n"))
    +    logFilesToRead.iterator.map { file =>
    +        logDebug(s"Creating log reader with $file")
    +        new WriteAheadLogReader(file, hadoopConf)
    +    } flatMap { x => x }
    --- End diff --
    
    Ah, that makes sense, actually: `flatMap(identity)` on an iterator here is the equivalent of Python's itertools.chain, whereas `flatten` might imply materializing the flattened collection, which doesn't make sense for an iterator.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4026][Streaming] Write ahead log manage...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2882#discussion_r19318044
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala ---
    @@ -0,0 +1,71 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.streaming.util
    +
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.fs.{LocalFileSystem, FSDataInputStream, FSDataOutputStream, Path}
    +
    +private[streaming] object HdfsUtils {
    +
    +  def getOutputStream(path: String, conf: Configuration): FSDataOutputStream = {
    +    // HDFS is not thread-safe when getFileSystem is called, so synchronize on that
    --- End diff --
    
    It looks like this comment is no longer relevant, or perhaps like it should be moved somewhere else?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4026][Streaming] synchronously write re...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2882#issuecomment-59990329
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/22002/consoleFull) for   PR 2882 at commit [`5182ffb`](https://github.com/apache/spark/commit/5182ffb3053a143f221f1e56ed21e2461b4d9e4f).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org