You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2015/04/29 22:06:23 UTC

[2/2] spark git commit: [SPARK-7056] [STREAMING] Make the Write Ahead Log pluggable

[SPARK-7056] [STREAMING] Make the Write Ahead Log pluggable

Users may want the WAL data to be written to non-HDFS data storage systems. To allow that, we have to make the WAL pluggable. The following design doc outlines the plan.

https://docs.google.com/a/databricks.com/document/d/1A2XaOLRFzvIZSi18i_luNw5Rmm9j2j4AigktXxIYxmY/edit?usp=sharing

Things to add.
* Unit tests for WriteAheadLogUtils

Author: Tathagata Das <ta...@gmail.com>

Closes #5645 from tdas/wal-pluggable and squashes the following commits:

2c431fd [Tathagata Das] Minor fixes.
c2bc7384 [Tathagata Das] More changes based on PR comments.
569a416 [Tathagata Das] fixed long line
bde26b1 [Tathagata Das] Renamed segment to record handle everywhere
b65e155 [Tathagata Das] More changes based on PR comments.
d7cd15b [Tathagata Das] Fixed test
1a32a4b [Tathagata Das] Fixed test
e0d19fb [Tathagata Das] Fixed defaults
9310cbf [Tathagata Das] style fix.
86abcb1 [Tathagata Das] Refactored WriteAheadLogUtils, and consolidated all WAL related configuration into it.
84ce469 [Tathagata Das] Added unit test and fixed compilation error.
bce5e75 [Tathagata Das] Fixed long lines.
837c4f5 [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into wal-pluggable
754fbf8 [Tathagata Das] Added license and docs.
09bc6fe [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into wal-pluggable
7dd2d4b [Tathagata Das] Added pluggable WriteAheadLog interface, and refactored all code along with it


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1868bd40
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1868bd40
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1868bd40

Branch: refs/heads/master
Commit: 1868bd40dcce23990b98748b0239bd00452b1ca5
Parents: c0c0ba6
Author: Tathagata Das <ta...@gmail.com>
Authored: Wed Apr 29 13:06:11 2015 -0700
Committer: Tathagata Das <ta...@gmail.com>
Committed: Wed Apr 29 13:06:11 2015 -0700

----------------------------------------------------------------------
 .../spark/streaming/kafka/KafkaUtils.scala      |   3 +-
 .../spark/streaming/util/WriteAheadLog.java     |  60 +++++
 .../util/WriteAheadLogRecordHandle.java         |  30 +++
 .../dstream/ReceiverInputDStream.scala          |   2 +-
 .../rdd/WriteAheadLogBackedBlockRDD.scala       |  79 ++++--
 .../receiver/ReceivedBlockHandler.scala         |  38 ++-
 .../receiver/ReceiverSupervisorImpl.scala       |   5 +-
 .../scheduler/ReceivedBlockTracker.scala        |  38 ++-
 .../streaming/scheduler/ReceiverTracker.scala   |   3 +-
 .../streaming/util/FileBasedWriteAheadLog.scala | 249 +++++++++++++++++++
 .../FileBasedWriteAheadLogRandomReader.scala    |  54 ++++
 .../util/FileBasedWriteAheadLogReader.scala     |  82 ++++++
 .../util/FileBasedWriteAheadLogSegment.scala    |  21 ++
 .../util/FileBasedWriteAheadLogWriter.scala     |  81 ++++++
 .../util/WriteAheadLogFileSegment.scala         |  20 --
 .../streaming/util/WriteAheadLogManager.scala   | 233 -----------------
 .../util/WriteAheadLogRandomReader.scala        |  54 ----
 .../streaming/util/WriteAheadLogReader.scala    |  82 ------
 .../streaming/util/WriteAheadLogUtils.scala     | 129 ++++++++++
 .../streaming/util/WriteAheadLogWriter.scala    |  82 ------
 .../spark/streaming/JavaWriteAheadLogSuite.java | 129 ++++++++++
 .../streaming/ReceivedBlockHandlerSuite.scala   |  18 +-
 .../streaming/ReceivedBlockTrackerSuite.scala   |  28 ++-
 .../apache/spark/streaming/ReceiverSuite.scala  |   2 +-
 .../spark/streaming/StreamingContextSuite.scala |   4 +-
 .../rdd/WriteAheadLogBackedBlockRDDSuite.scala  |  31 +--
 .../streaming/util/WriteAheadLogSuite.scala     | 194 ++++++++++-----
 27 files changed, 1115 insertions(+), 636 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/1868bd40/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
index 0721dda..d7cf500 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
@@ -31,6 +31,7 @@ import kafka.message.MessageAndMetadata
 import kafka.serializer.{DefaultDecoder, Decoder, StringDecoder}
 
 import org.apache.spark.api.java.function.{Function => JFunction}
+import org.apache.spark.streaming.util.WriteAheadLogUtils
 import org.apache.spark.{SparkContext, SparkException}
 import org.apache.spark.annotation.Experimental
 import org.apache.spark.rdd.RDD
@@ -80,7 +81,7 @@ object KafkaUtils {
       topics: Map[String, Int],
       storageLevel: StorageLevel
     ): ReceiverInputDStream[(K, V)] = {
-    val walEnabled = ssc.conf.getBoolean("spark.streaming.receiver.writeAheadLog.enable", false)
+    val walEnabled = WriteAheadLogUtils.enableReceiverLog(ssc.conf)
     new KafkaInputDStream[K, V, U, T](ssc, kafkaParams, topics, walEnabled, storageLevel)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/1868bd40/streaming/src/main/java/org/apache/spark/streaming/util/WriteAheadLog.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/spark/streaming/util/WriteAheadLog.java b/streaming/src/main/java/org/apache/spark/streaming/util/WriteAheadLog.java
new file mode 100644
index 0000000..8c0fdfa
--- /dev/null
+++ b/streaming/src/main/java/org/apache/spark/streaming/util/WriteAheadLog.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.util;
+
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+
+/**
+ * This abstract class represents a write ahead log (aka journal) that is used by Spark Streaming
+ * to save the received data (by receivers) and associated metadata to a reliable storage, so that
+ * they can be recovered after driver failures. See the Spark documentation for more information
+ * on how to plug in your own custom implementation of a write ahead log.
+ */
+@org.apache.spark.annotation.DeveloperApi
+public abstract class WriteAheadLog {
+  /**
+   * Write the record to the log and return a record handle, which contains all the information
+   * necessary to read back the written record. The time is used to the index the record,
+   * such that it can be cleaned later. Note that implementations of this abstract class must
+   * ensure that the written data is durable and readable (using the record handle) by the
+   * time this function returns.
+   */
+  abstract public WriteAheadLogRecordHandle write(ByteBuffer record, long time);
+
+  /**
+   * Read a written record based on the given record handle.
+   */
+  abstract public ByteBuffer read(WriteAheadLogRecordHandle handle);
+
+  /**
+   * Read and return an iterator of all the records that have been written but not yet cleaned up.
+   */
+  abstract public Iterator<ByteBuffer> readAll();
+
+  /**
+   * Clean all the records that are older than the threshold time. It can wait for
+   * the completion of the deletion.
+   */
+  abstract public void clean(long threshTime, boolean waitForCompletion);
+
+  /**
+   * Close this log and release any resources.
+   */
+  abstract public void close();
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/1868bd40/streaming/src/main/java/org/apache/spark/streaming/util/WriteAheadLogRecordHandle.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/spark/streaming/util/WriteAheadLogRecordHandle.java b/streaming/src/main/java/org/apache/spark/streaming/util/WriteAheadLogRecordHandle.java
new file mode 100644
index 0000000..0232418
--- /dev/null
+++ b/streaming/src/main/java/org/apache/spark/streaming/util/WriteAheadLogRecordHandle.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.util;
+
+/**
+ * This abstract class represents a handle that refers to a record written in a
+ * {@link org.apache.spark.streaming.util.WriteAheadLog WriteAheadLog}.
+ * It must contain all the information necessary for the record to be read and returned by
+ * an implemenation of the WriteAheadLog class.
+ *
+ * @see org.apache.spark.streaming.util.WriteAheadLog
+ */
+@org.apache.spark.annotation.DeveloperApi
+public abstract class WriteAheadLogRecordHandle implements java.io.Serializable {
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/1868bd40/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
index 8be0431..4c7fd2c 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
@@ -82,7 +82,7 @@ abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingCont
         // WriteAheadLogBackedBlockRDD else create simple BlockRDD.
         if (resultTypes.size == 1 && resultTypes.head == classOf[WriteAheadLogBasedStoreResult]) {
           val logSegments = blockStoreResults.map {
-            _.asInstanceOf[WriteAheadLogBasedStoreResult].segment
+            _.asInstanceOf[WriteAheadLogBasedStoreResult].walRecordHandle
           }.toArray
           // Since storeInBlockManager = false, the storage level does not matter.
           new WriteAheadLogBackedBlockRDD[T](ssc.sparkContext,

http://git-wip-us.apache.org/repos/asf/spark/blob/1868bd40/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
index 93caa4b..ebdf418 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
@@ -16,14 +16,17 @@
  */
 package org.apache.spark.streaming.rdd
 
+import java.nio.ByteBuffer
+
 import scala.reflect.ClassTag
+import scala.util.control.NonFatal
 
-import org.apache.hadoop.conf.Configuration
+import org.apache.commons.io.FileUtils
 
 import org.apache.spark._
 import org.apache.spark.rdd.BlockRDD
 import org.apache.spark.storage.{BlockId, StorageLevel}
-import org.apache.spark.streaming.util.{HdfsUtils, WriteAheadLogFileSegment, WriteAheadLogRandomReader}
+import org.apache.spark.streaming.util._
 
 /**
  * Partition class for [[org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD]].
@@ -31,26 +34,27 @@ import org.apache.spark.streaming.util.{HdfsUtils, WriteAheadLogFileSegment, Wri
  * the segment of the write ahead log that backs the partition.
  * @param index index of the partition
  * @param blockId id of the block having the partition data
- * @param segment segment of the write ahead log having the partition data
+ * @param walRecordHandle Handle of the record in a write ahead log having the partition data
  */
 private[streaming]
 class WriteAheadLogBackedBlockRDDPartition(
     val index: Int,
     val blockId: BlockId,
-    val segment: WriteAheadLogFileSegment)
+    val walRecordHandle: WriteAheadLogRecordHandle)
   extends Partition
 
 
 /**
  * This class represents a special case of the BlockRDD where the data blocks in
- * the block manager are also backed by segments in write ahead logs. For reading
+ * the block manager are also backed by data in write ahead logs. For reading
  * the data, this RDD first looks up the blocks by their ids in the block manager.
- * If it does not find them, it looks up the corresponding file segment.
+ * If it does not find them, it looks up the corresponding data in the write ahead log.
  *
  * @param sc SparkContext
  * @param blockIds Ids of the blocks that contains this RDD's data
- * @param segments Segments in write ahead logs that contain this RDD's data
- * @param storeInBlockManager Whether to store in the block manager after reading from the segment
+ * @param walRecordHandles Record handles in write ahead logs that contain this RDD's data
+ * @param storeInBlockManager Whether to store in the block manager after reading
+ *                            from the WAL record
  * @param storageLevel storage level to store when storing in block manager
  *                     (applicable when storeInBlockManager = true)
  */
@@ -58,15 +62,15 @@ private[streaming]
 class WriteAheadLogBackedBlockRDD[T: ClassTag](
     @transient sc: SparkContext,
     @transient blockIds: Array[BlockId],
-    @transient segments: Array[WriteAheadLogFileSegment],
+    @transient walRecordHandles: Array[WriteAheadLogRecordHandle],
     storeInBlockManager: Boolean,
     storageLevel: StorageLevel)
   extends BlockRDD[T](sc, blockIds) {
 
   require(
-    blockIds.length == segments.length,
+    blockIds.length == walRecordHandles.length,
     s"Number of block ids (${blockIds.length}) must be " +
-      s"the same as number of segments (${segments.length}})!")
+      s"the same as number of WAL record handles (${walRecordHandles.length}})!")
 
   // Hadoop configuration is not serializable, so broadcast it as a serializable.
   @transient private val hadoopConfig = sc.hadoopConfiguration
@@ -75,13 +79,13 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag](
   override def getPartitions: Array[Partition] = {
     assertValid()
     Array.tabulate(blockIds.size) { i =>
-      new WriteAheadLogBackedBlockRDDPartition(i, blockIds(i), segments(i))
+      new WriteAheadLogBackedBlockRDDPartition(i, blockIds(i), walRecordHandles(i))
     }
   }
 
   /**
    * Gets the partition data by getting the corresponding block from the block manager.
-   * If the block does not exist, then the data is read from the corresponding segment
+   * If the block does not exist, then the data is read from the corresponding record
    * in write ahead log files.
    */
   override def compute(split: Partition, context: TaskContext): Iterator[T] = {
@@ -96,10 +100,35 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag](
         logDebug(s"Read partition data of $this from block manager, block $blockId")
         iterator
       case None => // Data not found in Block Manager, grab it from write ahead log file
-        val reader = new WriteAheadLogRandomReader(partition.segment.path, hadoopConf)
-        val dataRead = reader.read(partition.segment)
-        reader.close()
-        logInfo(s"Read partition data of $this from write ahead log, segment ${partition.segment}")
+        var dataRead: ByteBuffer = null
+        var writeAheadLog: WriteAheadLog = null
+        try {
+          // The WriteAheadLogUtils.createLog*** method needs a directory to create a
+          // WriteAheadLog object as the default FileBasedWriteAheadLog needs a directory for
+          // writing log data. However, the directory is not needed if data needs to be read, hence
+          // a dummy path is provided to satisfy the method parameter requirements.
+          // FileBasedWriteAheadLog will not create any file or directory at that path.
+          val dummyDirectory = FileUtils.getTempDirectoryPath()
+          writeAheadLog = WriteAheadLogUtils.createLogForReceiver(
+            SparkEnv.get.conf, dummyDirectory, hadoopConf)
+          dataRead = writeAheadLog.read(partition.walRecordHandle)
+        } catch {
+          case NonFatal(e) =>
+            throw new SparkException(
+              s"Could not read data from write ahead log record ${partition.walRecordHandle}", e)
+        } finally {
+          if (writeAheadLog != null) {
+            writeAheadLog.close()
+            writeAheadLog = null
+          }
+        }
+        if (dataRead == null) {
+          throw new SparkException(
+            s"Could not read data from write ahead log record ${partition.walRecordHandle}, " +
+              s"read returned null")
+        }
+        logInfo(s"Read partition data of $this from write ahead log, record handle " +
+          partition.walRecordHandle)
         if (storeInBlockManager) {
           blockManager.putBytes(blockId, dataRead, storageLevel)
           logDebug(s"Stored partition data of $this into block manager with level $storageLevel")
@@ -111,14 +140,20 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag](
 
   /**
    * Get the preferred location of the partition. This returns the locations of the block
-   * if it is present in the block manager, else it returns the location of the
-   * corresponding segment in HDFS.
+   * if it is present in the block manager, else if FileBasedWriteAheadLogSegment is used,
+   * it returns the location of the corresponding file segment in HDFS .
    */
   override def getPreferredLocations(split: Partition): Seq[String] = {
     val partition = split.asInstanceOf[WriteAheadLogBackedBlockRDDPartition]
     val blockLocations = getBlockIdLocations().get(partition.blockId)
-    blockLocations.getOrElse(
-      HdfsUtils.getFileSegmentLocations(
-        partition.segment.path, partition.segment.offset, partition.segment.length, hadoopConfig))
+    blockLocations.getOrElse {
+      partition.walRecordHandle match {
+        case fileSegment: FileBasedWriteAheadLogSegment =>
+          HdfsUtils.getFileSegmentLocations(
+            fileSegment.path, fileSegment.offset, fileSegment.length, hadoopConfig)
+        case _ =>
+          Seq.empty
+      }
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/1868bd40/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
index 297bf04..4b3d9ee 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
@@ -17,18 +17,18 @@
 
 package org.apache.spark.streaming.receiver
 
-import scala.concurrent.{Await, ExecutionContext, Future}
 import scala.concurrent.duration._
+import scala.concurrent.{Await, ExecutionContext, Future}
 import scala.language.{existentials, postfixOps}
 
-import WriteAheadLogBasedBlockHandler._
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 
-import org.apache.spark.{Logging, SparkConf, SparkException}
 import org.apache.spark.storage._
-import org.apache.spark.streaming.util.{WriteAheadLogFileSegment, WriteAheadLogManager}
-import org.apache.spark.util.{ThreadUtils, Clock, SystemClock}
+import org.apache.spark.streaming.receiver.WriteAheadLogBasedBlockHandler._
+import org.apache.spark.streaming.util.{WriteAheadLogRecordHandle, WriteAheadLogUtils}
+import org.apache.spark.util.{Clock, SystemClock, ThreadUtils}
+import org.apache.spark.{Logging, SparkConf, SparkException}
 
 /** Trait that represents the metadata related to storage of blocks */
 private[streaming] trait ReceivedBlockStoreResult {
@@ -96,7 +96,7 @@ private[streaming] class BlockManagerBasedBlockHandler(
  */
 private[streaming] case class WriteAheadLogBasedStoreResult(
     blockId: StreamBlockId,
-    segment: WriteAheadLogFileSegment
+    walRecordHandle: WriteAheadLogRecordHandle
   ) extends ReceivedBlockStoreResult
 
 
@@ -116,10 +116,6 @@ private[streaming] class WriteAheadLogBasedBlockHandler(
 
   private val blockStoreTimeout = conf.getInt(
     "spark.streaming.receiver.blockStoreTimeout", 30).seconds
-  private val rollingInterval = conf.getInt(
-    "spark.streaming.receiver.writeAheadLog.rollingInterval", 60)
-  private val maxFailures = conf.getInt(
-    "spark.streaming.receiver.writeAheadLog.maxFailures", 3)
 
   private val effectiveStorageLevel = {
     if (storageLevel.deserialized) {
@@ -139,13 +135,9 @@ private[streaming] class WriteAheadLogBasedBlockHandler(
       s"$effectiveStorageLevel when write ahead log is enabled")
   }
 
-  // Manages rolling log files
-  private val logManager = new WriteAheadLogManager(
-    checkpointDirToLogDir(checkpointDir, streamId),
-    hadoopConf, rollingInterval, maxFailures,
-    callerName = this.getClass.getSimpleName,
-    clock = clock
-  )
+  // Write ahead log manages
+  private val writeAheadLog = WriteAheadLogUtils.createLogForReceiver(
+    conf, checkpointDirToLogDir(checkpointDir, streamId), hadoopConf)
 
   // For processing futures used in parallel block storing into block manager and write ahead log
   // # threads = 2, so that both writing to BM and WAL can proceed in parallel
@@ -183,21 +175,21 @@ private[streaming] class WriteAheadLogBasedBlockHandler(
 
     // Store the block in write ahead log
     val storeInWriteAheadLogFuture = Future {
-      logManager.writeToLog(serializedBlock)
+      writeAheadLog.write(serializedBlock, clock.getTimeMillis())
     }
 
-    // Combine the futures, wait for both to complete, and return the write ahead log segment
+    // Combine the futures, wait for both to complete, and return the write ahead log record handle
     val combinedFuture = storeInBlockManagerFuture.zip(storeInWriteAheadLogFuture).map(_._2)
-    val segment = Await.result(combinedFuture, blockStoreTimeout)
-    WriteAheadLogBasedStoreResult(blockId, segment)
+    val walRecordHandle = Await.result(combinedFuture, blockStoreTimeout)
+    WriteAheadLogBasedStoreResult(blockId, walRecordHandle)
   }
 
   def cleanupOldBlocks(threshTime: Long) {
-    logManager.cleanupOldLogs(threshTime, waitForCompletion = false)
+    writeAheadLog.clean(threshTime, false)
   }
 
   def stop() {
-    logManager.stop()
+    writeAheadLog.close()
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/1868bd40/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
index f237936..93f047b 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
@@ -25,12 +25,13 @@ import scala.collection.mutable.ArrayBuffer
 import com.google.common.base.Throwables
 import org.apache.hadoop.conf.Configuration
 
-import org.apache.spark.{Logging, SparkEnv, SparkException}
 import org.apache.spark.rpc.{RpcEnv, ThreadSafeRpcEndpoint}
 import org.apache.spark.storage.StreamBlockId
 import org.apache.spark.streaming.Time
 import org.apache.spark.streaming.scheduler._
+import org.apache.spark.streaming.util.WriteAheadLogUtils
 import org.apache.spark.util.{RpcUtils, Utils}
+import org.apache.spark.{Logging, SparkEnv, SparkException}
 
 /**
  * Concrete implementation of [[org.apache.spark.streaming.receiver.ReceiverSupervisor]]
@@ -46,7 +47,7 @@ private[streaming] class ReceiverSupervisorImpl(
   ) extends ReceiverSupervisor(receiver, env.conf) with Logging {
 
   private val receivedBlockHandler: ReceivedBlockHandler = {
-    if (env.conf.getBoolean("spark.streaming.receiver.writeAheadLog.enable", false)) {
+    if (WriteAheadLogUtils.enableReceiverLog(env.conf)) {
       if (checkpointDirOption.isEmpty) {
         throw new SparkException(
           "Cannot enable receiver write-ahead log without checkpoint directory set. " +

http://git-wip-us.apache.org/repos/asf/spark/blob/1868bd40/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
index 200cf4e..14e769a 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
@@ -25,10 +25,10 @@ import scala.language.implicitConversions
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 
-import org.apache.spark.{SparkException, Logging, SparkConf}
 import org.apache.spark.streaming.Time
-import org.apache.spark.streaming.util.WriteAheadLogManager
+import org.apache.spark.streaming.util.{WriteAheadLog, WriteAheadLogUtils}
 import org.apache.spark.util.{Clock, Utils}
+import org.apache.spark.{Logging, SparkConf, SparkException}
 
 /** Trait representing any event in the ReceivedBlockTracker that updates its state. */
 private[streaming] sealed trait ReceivedBlockTrackerLogEvent
@@ -70,7 +70,7 @@ private[streaming] class ReceivedBlockTracker(
 
   private val streamIdToUnallocatedBlockQueues = new mutable.HashMap[Int, ReceivedBlockQueue]
   private val timeToAllocatedBlocks = new mutable.HashMap[Time, AllocatedBlocks]
-  private val logManagerOption = createLogManager()
+  private val writeAheadLogOption = createWriteAheadLog()
 
   private var lastAllocatedBatchTime: Time = null
 
@@ -155,12 +155,12 @@ private[streaming] class ReceivedBlockTracker(
     logInfo("Deleting batches " + timesToCleanup)
     writeToLog(BatchCleanupEvent(timesToCleanup))
     timeToAllocatedBlocks --= timesToCleanup
-    logManagerOption.foreach(_.cleanupOldLogs(cleanupThreshTime.milliseconds, waitForCompletion))
+    writeAheadLogOption.foreach(_.clean(cleanupThreshTime.milliseconds, waitForCompletion))
   }
 
   /** Stop the block tracker. */
   def stop() {
-    logManagerOption.foreach { _.stop() }
+    writeAheadLogOption.foreach { _.close() }
   }
 
   /**
@@ -190,9 +190,10 @@ private[streaming] class ReceivedBlockTracker(
       timeToAllocatedBlocks --= batchTimes
     }
 
-    logManagerOption.foreach { logManager =>
+    writeAheadLogOption.foreach { writeAheadLog =>
       logInfo(s"Recovering from write ahead logs in ${checkpointDirOption.get}")
-      logManager.readFromLog().foreach { byteBuffer =>
+      import scala.collection.JavaConversions._
+      writeAheadLog.readAll().foreach { byteBuffer =>
         logTrace("Recovering record " + byteBuffer)
         Utils.deserialize[ReceivedBlockTrackerLogEvent](byteBuffer.array) match {
           case BlockAdditionEvent(receivedBlockInfo) =>
@@ -208,10 +209,10 @@ private[streaming] class ReceivedBlockTracker(
 
   /** Write an update to the tracker to the write ahead log */
   private def writeToLog(record: ReceivedBlockTrackerLogEvent) {
-    if (isLogManagerEnabled) {
+    if (isWriteAheadLogEnabled) {
       logDebug(s"Writing to log $record")
-      logManagerOption.foreach { logManager =>
-        logManager.writeToLog(ByteBuffer.wrap(Utils.serialize(record)))
+      writeAheadLogOption.foreach { logManager =>
+        logManager.write(ByteBuffer.wrap(Utils.serialize(record)), clock.getTimeMillis())
       }
     }
   }
@@ -222,8 +223,8 @@ private[streaming] class ReceivedBlockTracker(
   }
 
   /** Optionally create the write ahead log manager only if the feature is enabled */
-  private def createLogManager(): Option[WriteAheadLogManager] = {
-    if (conf.getBoolean("spark.streaming.receiver.writeAheadLog.enable", false)) {
+  private def createWriteAheadLog(): Option[WriteAheadLog] = {
+    if (WriteAheadLogUtils.enableReceiverLog(conf)) {
       if (checkpointDirOption.isEmpty) {
         throw new SparkException(
           "Cannot enable receiver write-ahead log without checkpoint directory set. " +
@@ -231,19 +232,16 @@ private[streaming] class ReceivedBlockTracker(
             "See documentation for more details.")
       }
       val logDir = ReceivedBlockTracker.checkpointDirToLogDir(checkpointDirOption.get)
-      val rollingIntervalSecs = conf.getInt(
-        "spark.streaming.receivedBlockTracker.writeAheadLog.rotationIntervalSecs", 60)
-      val logManager = new WriteAheadLogManager(logDir, hadoopConf,
-        rollingIntervalSecs = rollingIntervalSecs, clock = clock,
-        callerName = "ReceivedBlockHandlerMaster")
-      Some(logManager)
+
+      val log = WriteAheadLogUtils.createLogForDriver(conf, logDir, hadoopConf)
+      Some(log)
     } else {
       None
     }
   }
 
-  /** Check if the log manager is enabled. This is only used for testing purposes. */
-  private[streaming] def isLogManagerEnabled: Boolean = logManagerOption.nonEmpty
+  /** Check if the write ahead log is enabled. This is only used for testing purposes. */
+  private[streaming] def isWriteAheadLogEnabled: Boolean = writeAheadLogOption.nonEmpty
 }
 
 private[streaming] object ReceivedBlockTracker {

http://git-wip-us.apache.org/repos/asf/spark/blob/1868bd40/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
index c4ead6f..1af6571 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
@@ -20,6 +20,7 @@ package org.apache.spark.streaming.scheduler
 import scala.collection.mutable.{HashMap, SynchronizedMap}
 import scala.language.existentials
 
+import org.apache.spark.streaming.util.WriteAheadLogUtils
 import org.apache.spark.{Logging, SerializableWritable, SparkEnv, SparkException}
 import org.apache.spark.rpc._
 import org.apache.spark.streaming.{StreamingContext, Time}
@@ -125,7 +126,7 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
     receivedBlockTracker.cleanupOldBatches(cleanupThreshTime, waitForCompletion = false)
 
     // Signal the receivers to delete old block data
-    if (ssc.conf.getBoolean("spark.streaming.receiver.writeAheadLog.enable", false)) {
+    if (WriteAheadLogUtils.enableReceiverLog(ssc.conf)) {
       logInfo(s"Cleanup old received batch data: $cleanupThreshTime")
       receiverInfo.values.flatMap { info => Option(info.endpoint) }
         .foreach { _.send(CleanupOldBlocks(cleanupThreshTime)) }

http://git-wip-us.apache.org/repos/asf/spark/blob/1868bd40/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
new file mode 100644
index 0000000..9985fed
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.util
+
+import java.nio.ByteBuffer
+import java.util.{Iterator => JIterator}
+
+import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.{Await, ExecutionContext, Future}
+import scala.language.postfixOps
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+
+import org.apache.spark.util.ThreadUtils
+import org.apache.spark.{Logging, SparkConf}
+
+/**
+ * This class manages write ahead log files.
+ * - Writes records (bytebuffers) to periodically rotating log files.
+ * - Recovers the log files and the reads the recovered records upon failures.
+ * - Cleans up old log files.
+ *
+ * Uses [[org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter]] to write
+ * and [[org.apache.spark.streaming.util.FileBasedWriteAheadLogReader]] to read.
+ *
+ * @param logDirectory Directory when rotating log files will be created.
+ * @param hadoopConf Hadoop configuration for reading/writing log files.
+ */
+private[streaming] class FileBasedWriteAheadLog(
+    conf: SparkConf,
+    logDirectory: String,
+    hadoopConf: Configuration,
+    rollingIntervalSecs: Int,
+    maxFailures: Int
+  ) extends WriteAheadLog with Logging {
+
+  import FileBasedWriteAheadLog._
+
+  private val pastLogs = new ArrayBuffer[LogInfo]
+  private val callerNameTag = getCallerName.map(c => s" for $c").getOrElse("")
+
+  private val threadpoolName = s"WriteAheadLogManager $callerNameTag"
+  implicit private val executionContext = ExecutionContext.fromExecutorService(
+    ThreadUtils.newDaemonSingleThreadExecutor(threadpoolName))
+  override protected val logName = s"WriteAheadLogManager $callerNameTag"
+
+  private var currentLogPath: Option[String] = None
+  private var currentLogWriter: FileBasedWriteAheadLogWriter = null
+  private var currentLogWriterStartTime: Long = -1L
+  private var currentLogWriterStopTime: Long = -1L
+
+  initializeOrRecover()
+
+  /**
+   * Write a byte buffer to the log file. This method synchronously writes the data in the
+   * ByteBuffer to HDFS. When this method returns, the data is guaranteed to have been flushed
+   * to HDFS, and will be available for readers to read.
+   */
+  def write(byteBuffer: ByteBuffer, time: Long): FileBasedWriteAheadLogSegment = synchronized {
+    var fileSegment: FileBasedWriteAheadLogSegment = null
+    var failures = 0
+    var lastException: Exception = null
+    var succeeded = false
+    while (!succeeded && failures < maxFailures) {
+      try {
+        fileSegment = getLogWriter(time).write(byteBuffer)
+        succeeded = true
+      } catch {
+        case ex: Exception =>
+          lastException = ex
+          logWarning("Failed to write to write ahead log")
+          resetWriter()
+          failures += 1
+      }
+    }
+    if (fileSegment == null) {
+      logError(s"Failed to write to write ahead log after $failures failures")
+      throw lastException
+    }
+    fileSegment
+  }
+
+  def read(segment: WriteAheadLogRecordHandle): ByteBuffer = {
+    val fileSegment = segment.asInstanceOf[FileBasedWriteAheadLogSegment]
+    var reader: FileBasedWriteAheadLogRandomReader = null
+    var byteBuffer: ByteBuffer = null
+    try {
+      reader = new FileBasedWriteAheadLogRandomReader(fileSegment.path, hadoopConf)
+      byteBuffer = reader.read(fileSegment)
+    } finally {
+      reader.close()
+    }
+    byteBuffer
+  }
+
+  /**
+   * Read all the existing logs from the log directory.
+   *
+   * Note that this is typically called when the caller is initializing and wants
+   * to recover past state from the write ahead logs (that is, before making any writes).
+   * If this is called after writes have been made using this manager, then it may not return
+   * the latest the records. This does not deal with currently active log files, and
+   * hence the implementation is kept simple.
+   */
+  def readAll(): JIterator[ByteBuffer] = synchronized {
+    import scala.collection.JavaConversions._
+    val logFilesToRead = pastLogs.map{ _.path} ++ currentLogPath
+    logInfo("Reading from the logs: " + logFilesToRead.mkString("\n"))
+
+    logFilesToRead.iterator.map { file =>
+      logDebug(s"Creating log reader with $file")
+      new FileBasedWriteAheadLogReader(file, hadoopConf)
+    } flatMap { x => x }
+  }
+
+  /**
+   * Delete the log files that are older than the threshold time.
+   *
+   * Its important to note that the threshold time is based on the time stamps used in the log
+   * files, which is usually based on the local system time. So if there is coordination necessary
+   * between the node calculating the threshTime (say, driver node), and the local system time
+   * (say, worker node), the caller has to take account of possible time skew.
+   *
+   * If waitForCompletion is set to true, this method will return only after old logs have been
+   * deleted. This should be set to true only for testing. Else the files will be deleted
+   * asynchronously.
+   */
+  def clean(threshTime: Long, waitForCompletion: Boolean): Unit = {
+    val oldLogFiles = synchronized { pastLogs.filter { _.endTime < threshTime } }
+    logInfo(s"Attempting to clear ${oldLogFiles.size} old log files in $logDirectory " +
+      s"older than $threshTime: ${oldLogFiles.map { _.path }.mkString("\n")}")
+
+    def deleteFiles() {
+      oldLogFiles.foreach { logInfo =>
+        try {
+          val path = new Path(logInfo.path)
+          val fs = HdfsUtils.getFileSystemForPath(path, hadoopConf)
+          fs.delete(path, true)
+          synchronized { pastLogs -= logInfo }
+          logDebug(s"Cleared log file $logInfo")
+        } catch {
+          case ex: Exception =>
+            logWarning(s"Error clearing write ahead log file $logInfo", ex)
+        }
+      }
+      logInfo(s"Cleared log files in $logDirectory older than $threshTime")
+    }
+    if (!executionContext.isShutdown) {
+      val f = Future { deleteFiles() }
+      if (waitForCompletion) {
+        import scala.concurrent.duration._
+        Await.ready(f, 1 second)
+      }
+    }
+  }
+
+
+  /** Stop the manager, close any open log writer */
+  def close(): Unit = synchronized {
+    if (currentLogWriter != null) {
+      currentLogWriter.close()
+    }
+    executionContext.shutdown()
+    logInfo("Stopped write ahead log manager")
+  }
+
+  /** Get the current log writer while taking care of rotation */
+  private def getLogWriter(currentTime: Long): FileBasedWriteAheadLogWriter = synchronized {
+    if (currentLogWriter == null || currentTime > currentLogWriterStopTime) {
+      resetWriter()
+      currentLogPath.foreach {
+        pastLogs += LogInfo(currentLogWriterStartTime, currentLogWriterStopTime, _)
+      }
+      currentLogWriterStartTime = currentTime
+      currentLogWriterStopTime = currentTime + (rollingIntervalSecs * 1000)
+      val newLogPath = new Path(logDirectory,
+        timeToLogFile(currentLogWriterStartTime, currentLogWriterStopTime))
+      currentLogPath = Some(newLogPath.toString)
+      currentLogWriter = new FileBasedWriteAheadLogWriter(currentLogPath.get, hadoopConf)
+    }
+    currentLogWriter
+  }
+
+  /** Initialize the log directory or recover existing logs inside the directory */
+  private def initializeOrRecover(): Unit = synchronized {
+    val logDirectoryPath = new Path(logDirectory)
+    val fileSystem =  HdfsUtils.getFileSystemForPath(logDirectoryPath, hadoopConf)
+
+    if (fileSystem.exists(logDirectoryPath) && fileSystem.getFileStatus(logDirectoryPath).isDir) {
+      val logFileInfo = logFilesTologInfo(fileSystem.listStatus(logDirectoryPath).map { _.getPath })
+      pastLogs.clear()
+      pastLogs ++= logFileInfo
+      logInfo(s"Recovered ${logFileInfo.size} write ahead log files from $logDirectory")
+      logDebug(s"Recovered files are:\n${logFileInfo.map(_.path).mkString("\n")}")
+    }
+  }
+
+  private def resetWriter(): Unit = synchronized {
+    if (currentLogWriter != null) {
+      currentLogWriter.close()
+      currentLogWriter = null
+    }
+  }
+}
+
+private[streaming] object FileBasedWriteAheadLog {
+
+  case class LogInfo(startTime: Long, endTime: Long, path: String)
+
+  val logFileRegex = """log-(\d+)-(\d+)""".r
+
+  def timeToLogFile(startTime: Long, stopTime: Long): String = {
+    s"log-$startTime-$stopTime"
+  }
+
+  def getCallerName(): Option[String] = {
+    val stackTraceClasses = Thread.currentThread.getStackTrace().map(_.getClassName)
+    stackTraceClasses.find(!_.contains("WriteAheadLog")).flatMap(_.split(".").lastOption)
+  }
+
+  /** Convert a sequence of files to a sequence of sorted LogInfo objects */
+  def logFilesTologInfo(files: Seq[Path]): Seq[LogInfo] = {
+    files.flatMap { file =>
+      logFileRegex.findFirstIn(file.getName()) match {
+        case Some(logFileRegex(startTimeStr, stopTimeStr)) =>
+          val startTime = startTimeStr.toLong
+          val stopTime = stopTimeStr.toLong
+          Some(LogInfo(startTime, stopTime, file.toString))
+        case None =>
+          None
+      }
+    }.sortBy { _.startTime }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/1868bd40/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogRandomReader.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogRandomReader.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogRandomReader.scala
new file mode 100644
index 0000000..f716822
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogRandomReader.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.util
+
+import java.io.Closeable
+import java.nio.ByteBuffer
+
+import org.apache.hadoop.conf.Configuration
+
+/**
+ * A random access reader for reading write ahead log files written using
+ * [[org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter]]. Given the file segment info,
+ * this reads the record (ByteBuffer) from the log file.
+ */
+private[streaming] class FileBasedWriteAheadLogRandomReader(path: String, conf: Configuration)
+  extends Closeable {
+
+  private val instream = HdfsUtils.getInputStream(path, conf)
+  private var closed = false
+
+  def read(segment: FileBasedWriteAheadLogSegment): ByteBuffer = synchronized {
+    assertOpen()
+    instream.seek(segment.offset)
+    val nextLength = instream.readInt()
+    HdfsUtils.checkState(nextLength == segment.length,
+      s"Expected message length to be ${segment.length}, but was $nextLength")
+    val buffer = new Array[Byte](nextLength)
+    instream.readFully(buffer)
+    ByteBuffer.wrap(buffer)
+  }
+
+  override def close(): Unit = synchronized {
+    closed = true
+    instream.close()
+  }
+
+  private def assertOpen() {
+    HdfsUtils.checkState(!closed, "Stream is closed. Create a new Reader to read from the file.")
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/1868bd40/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogReader.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogReader.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogReader.scala
new file mode 100644
index 0000000..c3bb59f
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogReader.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.util
+
+import java.io.{Closeable, EOFException}
+import java.nio.ByteBuffer
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.spark.Logging
+
+/**
+ * A reader for reading write ahead log files written using
+ * [[org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter]]. This reads
+ * the records (bytebuffers) in the log file sequentially and return them as an
+ * iterator of bytebuffers.
+ */
+private[streaming] class FileBasedWriteAheadLogReader(path: String, conf: Configuration)
+  extends Iterator[ByteBuffer] with Closeable with Logging {
+
+  private val instream = HdfsUtils.getInputStream(path, conf)
+  private var closed = false
+  private var nextItem: Option[ByteBuffer] = None
+
+  override def hasNext: Boolean = synchronized {
+    if (closed) {
+      return false
+    }
+
+    if (nextItem.isDefined) { // handle the case where hasNext is called without calling next
+      true
+    } else {
+      try {
+        val length = instream.readInt()
+        val buffer = new Array[Byte](length)
+        instream.readFully(buffer)
+        nextItem = Some(ByteBuffer.wrap(buffer))
+        logTrace("Read next item " + nextItem.get)
+        true
+      } catch {
+        case e: EOFException =>
+          logDebug("Error reading next item, EOF reached", e)
+          close()
+          false
+        case e: Exception =>
+          logWarning("Error while trying to read data from HDFS.", e)
+          close()
+          throw e
+      }
+    }
+  }
+
+  override def next(): ByteBuffer = synchronized {
+    val data = nextItem.getOrElse {
+      close()
+      throw new IllegalStateException(
+        "next called without calling hasNext or after hasNext returned false")
+    }
+    nextItem = None // Ensure the next hasNext call loads new data.
+    data
+  }
+
+  override def close(): Unit = synchronized {
+    if (!closed) {
+      instream.close()
+    }
+    closed = true
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/1868bd40/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogSegment.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogSegment.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogSegment.scala
new file mode 100644
index 0000000..2e1f152
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogSegment.scala
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.util
+
+/** Class for representing a segment of data in a write ahead log file */
+private[streaming] case class FileBasedWriteAheadLogSegment(path: String, offset: Long, length: Int)
+  extends WriteAheadLogRecordHandle

http://git-wip-us.apache.org/repos/asf/spark/blob/1868bd40/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogWriter.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogWriter.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogWriter.scala
new file mode 100644
index 0000000..e146bec
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogWriter.scala
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.util
+
+import java.io._
+import java.nio.ByteBuffer
+
+import scala.util.Try
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.FSDataOutputStream
+
+/**
+ * A writer for writing byte-buffers to a write ahead log file.
+ */
+private[streaming] class FileBasedWriteAheadLogWriter(path: String, hadoopConf: Configuration)
+  extends Closeable {
+
+  private lazy val stream = HdfsUtils.getOutputStream(path, hadoopConf)
+
+  private lazy val hadoopFlushMethod = {
+    // Use reflection to get the right flush operation
+    val cls = classOf[FSDataOutputStream]
+    Try(cls.getMethod("hflush")).orElse(Try(cls.getMethod("sync"))).toOption
+  }
+
+  private var nextOffset = stream.getPos()
+  private var closed = false
+
+  /** Write the bytebuffer to the log file */
+  def write(data: ByteBuffer): FileBasedWriteAheadLogSegment = synchronized {
+    assertOpen()
+    data.rewind() // Rewind to ensure all data in the buffer is retrieved
+    val lengthToWrite = data.remaining()
+    val segment = new FileBasedWriteAheadLogSegment(path, nextOffset, lengthToWrite)
+    stream.writeInt(lengthToWrite)
+    if (data.hasArray) {
+      stream.write(data.array())
+    } else {
+      // If the buffer is not backed by an array, we transfer using temp array
+      // Note that despite the extra array copy, this should be faster than byte-by-byte copy
+      while (data.hasRemaining) {
+        val array = new Array[Byte](data.remaining)
+        data.get(array)
+        stream.write(array)
+      }
+    }
+    flush()
+    nextOffset = stream.getPos()
+    segment
+  }
+
+  override def close(): Unit = synchronized {
+    closed = true
+    stream.close()
+  }
+
+  private def flush() {
+    hadoopFlushMethod.foreach { _.invoke(stream) }
+    // Useful for local file system where hflush/sync does not work (HADOOP-7844)
+    stream.getWrappedStream.flush()
+  }
+
+  private def assertOpen() {
+    HdfsUtils.checkState(!closed, "Stream is closed. Create a new Writer to write to file.")
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/1868bd40/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogFileSegment.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogFileSegment.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogFileSegment.scala
deleted file mode 100644
index 1005a2c..0000000
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogFileSegment.scala
+++ /dev/null
@@ -1,20 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.streaming.util
-
-/** Class for representing a segment of data in a write ahead log file */
-private[streaming] case class WriteAheadLogFileSegment (path: String, offset: Long, length: Int)

http://git-wip-us.apache.org/repos/asf/spark/blob/1868bd40/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala
deleted file mode 100644
index 38a93cc..0000000
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala
+++ /dev/null
@@ -1,233 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.streaming.util
-
-import java.nio.ByteBuffer
-
-import scala.collection.mutable.ArrayBuffer
-import scala.concurrent.{Await, ExecutionContext, Future}
-import scala.language.postfixOps
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
-import org.apache.spark.Logging
-import org.apache.spark.util.{ThreadUtils, Clock, SystemClock}
-import WriteAheadLogManager._
-
-/**
- * This class manages write ahead log files.
- * - Writes records (bytebuffers) to periodically rotating log files.
- * - Recovers the log files and the reads the recovered records upon failures.
- * - Cleans up old log files.
- *
- * Uses [[org.apache.spark.streaming.util.WriteAheadLogWriter]] to write
- * and [[org.apache.spark.streaming.util.WriteAheadLogReader]] to read.
- *
- * @param logDirectory Directory when rotating log files will be created.
- * @param hadoopConf Hadoop configuration for reading/writing log files.
- * @param rollingIntervalSecs The interval in seconds with which logs will be rolled over.
- *                            Default is one minute.
- * @param maxFailures Max number of failures that is tolerated for every attempt to write to log.
- *                    Default is three.
- * @param callerName Optional name of the class who is using this manager.
- * @param clock Optional clock that is used to check for rotation interval.
- */
-private[streaming] class WriteAheadLogManager(
-    logDirectory: String,
-    hadoopConf: Configuration,
-    rollingIntervalSecs: Int = 60,
-    maxFailures: Int = 3,
-    callerName: String = "",
-    clock: Clock = new SystemClock
-  ) extends Logging {
-
-  private val pastLogs = new ArrayBuffer[LogInfo]
-  private val callerNameTag =
-    if (callerName.nonEmpty) s" for $callerName" else ""
-  private val threadpoolName = s"WriteAheadLogManager $callerNameTag"
-  implicit private val executionContext = ExecutionContext.fromExecutorService(
-    ThreadUtils.newDaemonSingleThreadExecutor(threadpoolName))
-  override protected val logName = s"WriteAheadLogManager $callerNameTag"
-
-  private var currentLogPath: Option[String] = None
-  private var currentLogWriter: WriteAheadLogWriter = null
-  private var currentLogWriterStartTime: Long = -1L
-  private var currentLogWriterStopTime: Long = -1L
-
-  initializeOrRecover()
-
-  /**
-   * Write a byte buffer to the log file. This method synchronously writes the data in the
-   * ByteBuffer to HDFS. When this method returns, the data is guaranteed to have been flushed
-   * to HDFS, and will be available for readers to read.
-   */
-  def writeToLog(byteBuffer: ByteBuffer): WriteAheadLogFileSegment = synchronized {
-    var fileSegment: WriteAheadLogFileSegment = null
-    var failures = 0
-    var lastException: Exception = null
-    var succeeded = false
-    while (!succeeded && failures < maxFailures) {
-      try {
-        fileSegment = getLogWriter(clock.getTimeMillis()).write(byteBuffer)
-        succeeded = true
-      } catch {
-        case ex: Exception =>
-          lastException = ex
-          logWarning("Failed to write to write ahead log")
-          resetWriter()
-          failures += 1
-      }
-    }
-    if (fileSegment == null) {
-      logError(s"Failed to write to write ahead log after $failures failures")
-      throw lastException
-    }
-    fileSegment
-  }
-
-  /**
-   * Read all the existing logs from the log directory.
-   *
-   * Note that this is typically called when the caller is initializing and wants
-   * to recover past state from the write ahead logs (that is, before making any writes).
-   * If this is called after writes have been made using this manager, then it may not return
-   * the latest the records. This does not deal with currently active log files, and
-   * hence the implementation is kept simple.
-   */
-  def readFromLog(): Iterator[ByteBuffer] = synchronized {
-    val logFilesToRead = pastLogs.map{ _.path} ++ currentLogPath
-    logInfo("Reading from the logs: " + logFilesToRead.mkString("\n"))
-    logFilesToRead.iterator.map { file =>
-      logDebug(s"Creating log reader with $file")
-      new WriteAheadLogReader(file, hadoopConf)
-    } flatMap { x => x }
-  }
-
-  /**
-   * Delete the log files that are older than the threshold time.
-   *
-   * Its important to note that the threshold time is based on the time stamps used in the log
-   * files, which is usually based on the local system time. So if there is coordination necessary
-   * between the node calculating the threshTime (say, driver node), and the local system time
-   * (say, worker node), the caller has to take account of possible time skew.
-   *
-   * If waitForCompletion is set to true, this method will return only after old logs have been
-   * deleted. This should be set to true only for testing. Else the files will be deleted
-   * asynchronously.
-   */
-  def cleanupOldLogs(threshTime: Long, waitForCompletion: Boolean): Unit = {
-    val oldLogFiles = synchronized { pastLogs.filter { _.endTime < threshTime } }
-    logInfo(s"Attempting to clear ${oldLogFiles.size} old log files in $logDirectory " +
-      s"older than $threshTime: ${oldLogFiles.map { _.path }.mkString("\n")}")
-
-    def deleteFiles() {
-      oldLogFiles.foreach { logInfo =>
-        try {
-          val path = new Path(logInfo.path)
-          val fs = HdfsUtils.getFileSystemForPath(path, hadoopConf)
-          fs.delete(path, true)
-          synchronized { pastLogs -= logInfo }
-          logDebug(s"Cleared log file $logInfo")
-        } catch {
-          case ex: Exception =>
-            logWarning(s"Error clearing write ahead log file $logInfo", ex)
-        }
-      }
-      logInfo(s"Cleared log files in $logDirectory older than $threshTime")
-    }
-    if (!executionContext.isShutdown) {
-      val f = Future { deleteFiles() }
-      if (waitForCompletion) {
-        import scala.concurrent.duration._
-        Await.ready(f, 1 second)
-      }
-    }
-  }
-
-
-  /** Stop the manager, close any open log writer */
-  def stop(): Unit = synchronized {
-    if (currentLogWriter != null) {
-      currentLogWriter.close()
-    }
-    executionContext.shutdown()
-    logInfo("Stopped write ahead log manager")
-  }
-
-  /** Get the current log writer while taking care of rotation */
-  private def getLogWriter(currentTime: Long): WriteAheadLogWriter = synchronized {
-    if (currentLogWriter == null || currentTime > currentLogWriterStopTime) {
-      resetWriter()
-      currentLogPath.foreach {
-        pastLogs += LogInfo(currentLogWriterStartTime, currentLogWriterStopTime, _)
-      }
-      currentLogWriterStartTime = currentTime
-      currentLogWriterStopTime = currentTime + (rollingIntervalSecs * 1000)
-      val newLogPath = new Path(logDirectory,
-        timeToLogFile(currentLogWriterStartTime, currentLogWriterStopTime))
-      currentLogPath = Some(newLogPath.toString)
-      currentLogWriter = new WriteAheadLogWriter(currentLogPath.get, hadoopConf)
-    }
-    currentLogWriter
-  }
-
-  /** Initialize the log directory or recover existing logs inside the directory */
-  private def initializeOrRecover(): Unit = synchronized {
-    val logDirectoryPath = new Path(logDirectory)
-    val fileSystem =  HdfsUtils.getFileSystemForPath(logDirectoryPath, hadoopConf)
-
-    if (fileSystem.exists(logDirectoryPath) && fileSystem.getFileStatus(logDirectoryPath).isDir) {
-      val logFileInfo = logFilesTologInfo(fileSystem.listStatus(logDirectoryPath).map { _.getPath })
-      pastLogs.clear()
-      pastLogs ++= logFileInfo
-      logInfo(s"Recovered ${logFileInfo.size} write ahead log files from $logDirectory")
-      logDebug(s"Recovered files are:\n${logFileInfo.map(_.path).mkString("\n")}")
-    }
-  }
-
-  private def resetWriter(): Unit = synchronized {
-    if (currentLogWriter != null) {
-      currentLogWriter.close()
-      currentLogWriter = null
-    }
-  }
-}
-
-private[util] object WriteAheadLogManager {
-
-  case class LogInfo(startTime: Long, endTime: Long, path: String)
-
-  val logFileRegex = """log-(\d+)-(\d+)""".r
-
-  def timeToLogFile(startTime: Long, stopTime: Long): String = {
-    s"log-$startTime-$stopTime"
-  }
-
-  /** Convert a sequence of files to a sequence of sorted LogInfo objects */
-  def logFilesTologInfo(files: Seq[Path]): Seq[LogInfo] = {
-    files.flatMap { file =>
-      logFileRegex.findFirstIn(file.getName()) match {
-        case Some(logFileRegex(startTimeStr, stopTimeStr)) =>
-          val startTime = startTimeStr.toLong
-          val stopTime = stopTimeStr.toLong
-          Some(LogInfo(startTime, stopTime, file.toString))
-        case None =>
-          None
-      }
-    }.sortBy { _.startTime }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/1868bd40/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogRandomReader.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogRandomReader.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogRandomReader.scala
deleted file mode 100644
index 0039890..0000000
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogRandomReader.scala
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.streaming.util
-
-import java.io.Closeable
-import java.nio.ByteBuffer
-
-import org.apache.hadoop.conf.Configuration
-
-/**
- * A random access reader for reading write ahead log files written using
- * [[org.apache.spark.streaming.util.WriteAheadLogWriter]]. Given the file segment info,
- * this reads the record (bytebuffer) from the log file.
- */
-private[streaming] class WriteAheadLogRandomReader(path: String, conf: Configuration)
-  extends Closeable {
-
-  private val instream = HdfsUtils.getInputStream(path, conf)
-  private var closed = false
-
-  def read(segment: WriteAheadLogFileSegment): ByteBuffer = synchronized {
-    assertOpen()
-    instream.seek(segment.offset)
-    val nextLength = instream.readInt()
-    HdfsUtils.checkState(nextLength == segment.length,
-      s"Expected message length to be ${segment.length}, but was $nextLength")
-    val buffer = new Array[Byte](nextLength)
-    instream.readFully(buffer)
-    ByteBuffer.wrap(buffer)
-  }
-
-  override def close(): Unit = synchronized {
-    closed = true
-    instream.close()
-  }
-
-  private def assertOpen() {
-    HdfsUtils.checkState(!closed, "Stream is closed. Create a new Reader to read from the file.")
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/1868bd40/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogReader.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogReader.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogReader.scala
deleted file mode 100644
index 2afc0d1..0000000
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogReader.scala
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.streaming.util
-
-import java.io.{Closeable, EOFException}
-import java.nio.ByteBuffer
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.spark.Logging
-
-/**
- * A reader for reading write ahead log files written using
- * [[org.apache.spark.streaming.util.WriteAheadLogWriter]]. This reads
- * the records (bytebuffers) in the log file sequentially and return them as an
- * iterator of bytebuffers.
- */
-private[streaming] class WriteAheadLogReader(path: String, conf: Configuration)
-  extends Iterator[ByteBuffer] with Closeable with Logging {
-
-  private val instream = HdfsUtils.getInputStream(path, conf)
-  private var closed = false
-  private var nextItem: Option[ByteBuffer] = None
-
-  override def hasNext: Boolean = synchronized {
-    if (closed) {
-      return false
-    }
-
-    if (nextItem.isDefined) { // handle the case where hasNext is called without calling next
-      true
-    } else {
-      try {
-        val length = instream.readInt()
-        val buffer = new Array[Byte](length)
-        instream.readFully(buffer)
-        nextItem = Some(ByteBuffer.wrap(buffer))
-        logTrace("Read next item " + nextItem.get)
-        true
-      } catch {
-        case e: EOFException =>
-          logDebug("Error reading next item, EOF reached", e)
-          close()
-          false
-        case e: Exception =>
-          logWarning("Error while trying to read data from HDFS.", e)
-          close()
-          throw e
-      }
-    }
-  }
-
-  override def next(): ByteBuffer = synchronized {
-    val data = nextItem.getOrElse {
-      close()
-      throw new IllegalStateException(
-        "next called without calling hasNext or after hasNext returned false")
-    }
-    nextItem = None // Ensure the next hasNext call loads new data.
-    data
-  }
-
-  override def close(): Unit = synchronized {
-    if (!closed) {
-      instream.close()
-    }
-    closed = true
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/1868bd40/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogUtils.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogUtils.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogUtils.scala
new file mode 100644
index 0000000..7f6ff12
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogUtils.scala
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.util
+
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.conf.Configuration
+
+import org.apache.spark.util.Utils
+import org.apache.spark.{Logging, SparkConf, SparkException}
+
+/** A helper class with utility functions related to the WriteAheadLog interface */
+private[streaming] object WriteAheadLogUtils extends Logging {
+  val RECEIVER_WAL_ENABLE_CONF_KEY = "spark.streaming.receiver.writeAheadLog.enable"
+  val RECEIVER_WAL_CLASS_CONF_KEY = "spark.streaming.receiver.writeAheadLog.class"
+  val RECEIVER_WAL_ROLLING_INTERVAL_CONF_KEY =
+    "spark.streaming.receiver.writeAheadLog.rollingIntervalSecs"
+  val RECEIVER_WAL_MAX_FAILURES_CONF_KEY = "spark.streaming.receiver.writeAheadLog.maxFailures"
+
+  val DRIVER_WAL_CLASS_CONF_KEY = "spark.streaming.driver.writeAheadLog.class"
+  val DRIVER_WAL_ROLLING_INTERVAL_CONF_KEY =
+    "spark.streaming.driver.writeAheadLog.rollingIntervalSecs"
+  val DRIVER_WAL_MAX_FAILURES_CONF_KEY = "spark.streaming.driver.writeAheadLog.maxFailures"
+
+  val DEFAULT_ROLLING_INTERVAL_SECS = 60
+  val DEFAULT_MAX_FAILURES = 3
+
+  def enableReceiverLog(conf: SparkConf): Boolean = {
+    conf.getBoolean(RECEIVER_WAL_ENABLE_CONF_KEY, false)
+  }
+
+  def getRollingIntervalSecs(conf: SparkConf, isDriver: Boolean): Int = {
+    if (isDriver) {
+      conf.getInt(DRIVER_WAL_ROLLING_INTERVAL_CONF_KEY, DEFAULT_ROLLING_INTERVAL_SECS)
+    } else {
+      conf.getInt(RECEIVER_WAL_ROLLING_INTERVAL_CONF_KEY, DEFAULT_ROLLING_INTERVAL_SECS)
+    }
+  }
+
+  def getMaxFailures(conf: SparkConf, isDriver: Boolean): Int = {
+    if (isDriver) {
+      conf.getInt(DRIVER_WAL_MAX_FAILURES_CONF_KEY, DEFAULT_MAX_FAILURES)
+    } else {
+      conf.getInt(RECEIVER_WAL_MAX_FAILURES_CONF_KEY, DEFAULT_MAX_FAILURES)
+    }
+  }
+
+  /**
+   * Create a WriteAheadLog for the driver. If configured with custom WAL class, it will try
+   * to create instance of that class, otherwise it will create the default FileBasedWriteAheadLog.
+   */
+  def createLogForDriver(
+      sparkConf: SparkConf,
+      fileWalLogDirectory: String,
+      fileWalHadoopConf: Configuration
+    ): WriteAheadLog = {
+    createLog(true, sparkConf, fileWalLogDirectory, fileWalHadoopConf)
+  }
+
+  /**
+   * Create a WriteAheadLog for the receiver. If configured with custom WAL class, it will try
+   * to create instance of that class, otherwise it will create the default FileBasedWriteAheadLog.
+   */
+  def createLogForReceiver(
+      sparkConf: SparkConf,
+      fileWalLogDirectory: String,
+      fileWalHadoopConf: Configuration
+    ): WriteAheadLog = {
+    createLog(false, sparkConf, fileWalLogDirectory, fileWalHadoopConf)
+  }
+
+  /**
+   * Create a WriteAheadLog based on the value of the given config key. The config key is used
+   * to get the class name from the SparkConf. If the class is configured, it will try to
+   * create instance of that class by first trying `new CustomWAL(sparkConf, logDir)` then trying
+   * `new CustomWAL(sparkConf)`. If either fails, it will fail. If no class is configured, then
+   * it will create the default FileBasedWriteAheadLog.
+   */
+  private def createLog(
+      isDriver: Boolean,
+      sparkConf: SparkConf,
+      fileWalLogDirectory: String,
+      fileWalHadoopConf: Configuration
+    ): WriteAheadLog = {
+
+    val classNameOption = if (isDriver) {
+      sparkConf.getOption(DRIVER_WAL_CLASS_CONF_KEY)
+    } else {
+      sparkConf.getOption(RECEIVER_WAL_CLASS_CONF_KEY)
+    }
+    classNameOption.map { className =>
+      try {
+        instantiateClass(
+          Utils.classForName(className).asInstanceOf[Class[_ <: WriteAheadLog]], sparkConf)
+      } catch {
+        case NonFatal(e) =>
+          throw new SparkException(s"Could not create a write ahead log of class $className", e)
+      }
+    }.getOrElse {
+      new FileBasedWriteAheadLog(sparkConf, fileWalLogDirectory, fileWalHadoopConf,
+        getRollingIntervalSecs(sparkConf, isDriver), getMaxFailures(sparkConf, isDriver))
+    }
+  }
+
+  /** Instantiate the class, either using single arg constructor or zero arg constructor */
+  private def instantiateClass(cls: Class[_ <: WriteAheadLog], conf: SparkConf): WriteAheadLog = {
+    try {
+      cls.getConstructor(classOf[SparkConf]).newInstance(conf)
+    } catch {
+      case nsme: NoSuchMethodException =>
+        cls.getConstructor().newInstance()
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/1868bd40/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogWriter.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogWriter.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogWriter.scala
deleted file mode 100644
index 679f6a6..0000000
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogWriter.scala
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.streaming.util
-
-import java.io._
-import java.net.URI
-import java.nio.ByteBuffer
-
-import scala.util.Try
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FSDataOutputStream, FileSystem}
-
-/**
- * A writer for writing byte-buffers to a write ahead log file.
- */
-private[streaming] class WriteAheadLogWriter(path: String, hadoopConf: Configuration)
-  extends Closeable {
-
-  private lazy val stream = HdfsUtils.getOutputStream(path, hadoopConf)
-
-  private lazy val hadoopFlushMethod = {
-    // Use reflection to get the right flush operation
-    val cls = classOf[FSDataOutputStream]
-    Try(cls.getMethod("hflush")).orElse(Try(cls.getMethod("sync"))).toOption
-  }
-
-  private var nextOffset = stream.getPos()
-  private var closed = false
-
-  /** Write the bytebuffer to the log file */
-  def write(data: ByteBuffer): WriteAheadLogFileSegment = synchronized {
-    assertOpen()
-    data.rewind() // Rewind to ensure all data in the buffer is retrieved
-    val lengthToWrite = data.remaining()
-    val segment = new WriteAheadLogFileSegment(path, nextOffset, lengthToWrite)
-    stream.writeInt(lengthToWrite)
-    if (data.hasArray) {
-      stream.write(data.array())
-    } else {
-      // If the buffer is not backed by an array, we transfer using temp array
-      // Note that despite the extra array copy, this should be faster than byte-by-byte copy
-      while (data.hasRemaining) {
-        val array = new Array[Byte](data.remaining)
-        data.get(array)
-        stream.write(array)
-      }
-    }
-    flush()
-    nextOffset = stream.getPos()
-    segment
-  }
-
-  override def close(): Unit = synchronized {
-    closed = true
-    stream.close()
-  }
-
-  private def flush() {
-    hadoopFlushMethod.foreach { _.invoke(stream) }
-    // Useful for local file system where hflush/sync does not work (HADOOP-7844)
-    stream.getWrappedStream.flush()
-  }
-
-  private def assertOpen() {
-    HdfsUtils.checkState(!closed, "Stream is closed. Create a new Writer to write to file.")
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/1868bd40/streaming/src/test/java/org/apache/spark/streaming/JavaWriteAheadLogSuite.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaWriteAheadLogSuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaWriteAheadLogSuite.java
new file mode 100644
index 0000000..50e8f9f
--- /dev/null
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaWriteAheadLogSuite.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming;
+
+import java.util.ArrayList;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.Transformer;
+import org.apache.spark.SparkConf;
+import org.apache.spark.streaming.util.WriteAheadLog;
+import org.apache.spark.streaming.util.WriteAheadLogRecordHandle;
+import org.apache.spark.streaming.util.WriteAheadLogUtils;
+
+import org.junit.Test;
+import org.junit.Assert;
+
+class JavaWriteAheadLogSuiteHandle extends WriteAheadLogRecordHandle {
+  int index = -1;
+  public JavaWriteAheadLogSuiteHandle(int idx) {
+    index = idx;
+  }
+}
+
+public class JavaWriteAheadLogSuite extends WriteAheadLog {
+
+  class Record {
+    long time;
+    int index;
+    ByteBuffer buffer;
+
+    public Record(long tym, int idx, ByteBuffer buf) {
+      index = idx;
+      time = tym;
+      buffer = buf;
+    }
+  }
+  private int index = -1;
+  private ArrayList<Record> records = new ArrayList<Record>();
+
+
+  // Methods for WriteAheadLog
+  @Override
+  public WriteAheadLogRecordHandle write(java.nio.ByteBuffer record, long time) {
+    index += 1;
+    records.add(new org.apache.spark.streaming.JavaWriteAheadLogSuite.Record(time, index, record));
+    return new JavaWriteAheadLogSuiteHandle(index);
+  }
+
+  @Override
+  public java.nio.ByteBuffer read(WriteAheadLogRecordHandle handle) {
+    if (handle instanceof JavaWriteAheadLogSuiteHandle) {
+      int reqdIndex = ((JavaWriteAheadLogSuiteHandle) handle).index;
+      for (Record record: records) {
+        if (record.index == reqdIndex) {
+          return record.buffer;
+        }
+      }
+    }
+    return null;
+  }
+
+  @Override
+  public java.util.Iterator<java.nio.ByteBuffer> readAll() {
+    Collection<ByteBuffer> buffers = CollectionUtils.collect(records, new Transformer() {
+      @Override
+      public Object transform(Object input) {
+        return ((Record) input).buffer;
+      }
+    });
+    return buffers.iterator();
+  }
+
+  @Override
+  public void clean(long threshTime, boolean waitForCompletion) {
+    for (int i = 0; i < records.size(); i++) {
+      if (records.get(i).time < threshTime) {
+        records.remove(i);
+        i--;
+      }
+    }
+  }
+
+  @Override
+  public void close() {
+    records.clear();
+  }
+
+  @Test
+  public void testCustomWAL() {
+    SparkConf conf = new SparkConf();
+    conf.set("spark.streaming.driver.writeAheadLog.class", JavaWriteAheadLogSuite.class.getName());
+    WriteAheadLog wal = WriteAheadLogUtils.createLogForDriver(conf, null, null);
+
+    String data1 = "data1";
+    WriteAheadLogRecordHandle handle = wal.write(ByteBuffer.wrap(data1.getBytes()), 1234);
+    Assert.assertTrue(handle instanceof JavaWriteAheadLogSuiteHandle);
+    Assert.assertTrue(new String(wal.read(handle).array()).equals(data1));
+
+    wal.write(ByteBuffer.wrap("data2".getBytes()), 1235);
+    wal.write(ByteBuffer.wrap("data3".getBytes()), 1236);
+    wal.write(ByteBuffer.wrap("data4".getBytes()), 1237);
+    wal.clean(1236, false);
+
+    java.util.Iterator<java.nio.ByteBuffer> dataIterator = wal.readAll();
+    ArrayList<String> readData = new ArrayList<String>();
+    while (dataIterator.hasNext()) {
+      readData.add(new String(dataIterator.next().array()));
+    }
+    Assert.assertTrue(readData.equals(Arrays.asList("data3", "data4")));
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/1868bd40/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
index c090eae..2380423 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
@@ -43,7 +43,7 @@ import WriteAheadLogSuite._
 
 class ReceivedBlockHandlerSuite extends FunSuite with BeforeAndAfter with Matchers with Logging {
 
-  val conf = new SparkConf().set("spark.streaming.receiver.writeAheadLog.rollingInterval", "1")
+  val conf = new SparkConf().set("spark.streaming.receiver.writeAheadLog.rollingIntervalSecs", "1")
   val hadoopConf = new Configuration()
   val storageLevel = StorageLevel.MEMORY_ONLY_SER
   val streamId = 1
@@ -130,10 +130,13 @@ class ReceivedBlockHandlerSuite extends FunSuite with BeforeAndAfter with Matche
           "Unexpected store result type"
         )
         // Verify the data in write ahead log files is correct
-        val fileSegments = storeResults.map { _.asInstanceOf[WriteAheadLogBasedStoreResult].segment}
-        val loggedData = fileSegments.flatMap { segment =>
-          val reader = new WriteAheadLogRandomReader(segment.path, hadoopConf)
-          val bytes = reader.read(segment)
+        val walSegments = storeResults.map { result =>
+          result.asInstanceOf[WriteAheadLogBasedStoreResult].walRecordHandle
+        }
+        val loggedData = walSegments.flatMap { walSegment =>
+          val fileSegment = walSegment.asInstanceOf[FileBasedWriteAheadLogSegment]
+          val reader = new FileBasedWriteAheadLogRandomReader(fileSegment.path, hadoopConf)
+          val bytes = reader.read(fileSegment)
           reader.close()
           blockManager.dataDeserialize(generateBlockId(), bytes).toList
         }
@@ -148,13 +151,13 @@ class ReceivedBlockHandlerSuite extends FunSuite with BeforeAndAfter with Matche
     }
   }
 
-  test("WriteAheadLogBasedBlockHandler - cleanup old blocks") {
+  test("WriteAheadLogBasedBlockHandler - clean old blocks") {
     withWriteAheadLogBasedBlockHandler { handler =>
       val blocks = Seq.tabulate(10) { i => IteratorBlock(Iterator(1 to i)) }
       storeBlocks(handler, blocks)
 
       val preCleanupLogFiles = getWriteAheadLogFiles()
-      preCleanupLogFiles.size should be > 1
+      require(preCleanupLogFiles.size > 1)
 
       // this depends on the number of blocks inserted using generateAndStoreData()
       manualClock.getTimeMillis() shouldEqual 5000L
@@ -218,6 +221,7 @@ class ReceivedBlockHandlerSuite extends FunSuite with BeforeAndAfter with Matche
 
   /** Instantiate a WriteAheadLogBasedBlockHandler and run a code with it */
   private def withWriteAheadLogBasedBlockHandler(body: WriteAheadLogBasedBlockHandler => Unit) {
+    require(WriteAheadLogUtils.getRollingIntervalSecs(conf, isDriver = false) === 1)
     val receivedBlockHandler = new WriteAheadLogBasedBlockHandler(blockManager, 1,
       storageLevel, conf, hadoopConf, tempDirectory.toString, manualClock)
     try {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org