You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2015/04/29 22:06:23 UTC
[2/2] spark git commit: [SPARK-7056] [STREAMING] Make the Write Ahead
Log pluggable
[SPARK-7056] [STREAMING] Make the Write Ahead Log pluggable
Users may want the WAL data to be written to non-HDFS data storage systems. To allow that, we have to make the WAL pluggable. The following design doc outlines the plan.
https://docs.google.com/a/databricks.com/document/d/1A2XaOLRFzvIZSi18i_luNw5Rmm9j2j4AigktXxIYxmY/edit?usp=sharing
Things to add.
* Unit tests for WriteAheadLogUtils
Author: Tathagata Das <ta...@gmail.com>
Closes #5645 from tdas/wal-pluggable and squashes the following commits:
2c431fd [Tathagata Das] Minor fixes.
c2bc7384 [Tathagata Das] More changes based on PR comments.
569a416 [Tathagata Das] fixed long line
bde26b1 [Tathagata Das] Renamed segment to record handle everywhere
b65e155 [Tathagata Das] More changes based on PR comments.
d7cd15b [Tathagata Das] Fixed test
1a32a4b [Tathagata Das] Fixed test
e0d19fb [Tathagata Das] Fixed defaults
9310cbf [Tathagata Das] style fix.
86abcb1 [Tathagata Das] Refactored WriteAheadLogUtils, and consolidated all WAL related configuration into it.
84ce469 [Tathagata Das] Added unit test and fixed compilation error.
bce5e75 [Tathagata Das] Fixed long lines.
837c4f5 [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into wal-pluggable
754fbf8 [Tathagata Das] Added license and docs.
09bc6fe [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into wal-pluggable
7dd2d4b [Tathagata Das] Added pluggable WriteAheadLog interface, and refactored all code along with it
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1868bd40
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1868bd40
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1868bd40
Branch: refs/heads/master
Commit: 1868bd40dcce23990b98748b0239bd00452b1ca5
Parents: c0c0ba6
Author: Tathagata Das <ta...@gmail.com>
Authored: Wed Apr 29 13:06:11 2015 -0700
Committer: Tathagata Das <ta...@gmail.com>
Committed: Wed Apr 29 13:06:11 2015 -0700
----------------------------------------------------------------------
.../spark/streaming/kafka/KafkaUtils.scala | 3 +-
.../spark/streaming/util/WriteAheadLog.java | 60 +++++
.../util/WriteAheadLogRecordHandle.java | 30 +++
.../dstream/ReceiverInputDStream.scala | 2 +-
.../rdd/WriteAheadLogBackedBlockRDD.scala | 79 ++++--
.../receiver/ReceivedBlockHandler.scala | 38 ++-
.../receiver/ReceiverSupervisorImpl.scala | 5 +-
.../scheduler/ReceivedBlockTracker.scala | 38 ++-
.../streaming/scheduler/ReceiverTracker.scala | 3 +-
.../streaming/util/FileBasedWriteAheadLog.scala | 249 +++++++++++++++++++
.../FileBasedWriteAheadLogRandomReader.scala | 54 ++++
.../util/FileBasedWriteAheadLogReader.scala | 82 ++++++
.../util/FileBasedWriteAheadLogSegment.scala | 21 ++
.../util/FileBasedWriteAheadLogWriter.scala | 81 ++++++
.../util/WriteAheadLogFileSegment.scala | 20 --
.../streaming/util/WriteAheadLogManager.scala | 233 -----------------
.../util/WriteAheadLogRandomReader.scala | 54 ----
.../streaming/util/WriteAheadLogReader.scala | 82 ------
.../streaming/util/WriteAheadLogUtils.scala | 129 ++++++++++
.../streaming/util/WriteAheadLogWriter.scala | 82 ------
.../spark/streaming/JavaWriteAheadLogSuite.java | 129 ++++++++++
.../streaming/ReceivedBlockHandlerSuite.scala | 18 +-
.../streaming/ReceivedBlockTrackerSuite.scala | 28 ++-
.../apache/spark/streaming/ReceiverSuite.scala | 2 +-
.../spark/streaming/StreamingContextSuite.scala | 4 +-
.../rdd/WriteAheadLogBackedBlockRDDSuite.scala | 31 +--
.../streaming/util/WriteAheadLogSuite.scala | 194 ++++++++++-----
27 files changed, 1115 insertions(+), 636 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/1868bd40/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
index 0721dda..d7cf500 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
@@ -31,6 +31,7 @@ import kafka.message.MessageAndMetadata
import kafka.serializer.{DefaultDecoder, Decoder, StringDecoder}
import org.apache.spark.api.java.function.{Function => JFunction}
+import org.apache.spark.streaming.util.WriteAheadLogUtils
import org.apache.spark.{SparkContext, SparkException}
import org.apache.spark.annotation.Experimental
import org.apache.spark.rdd.RDD
@@ -80,7 +81,7 @@ object KafkaUtils {
topics: Map[String, Int],
storageLevel: StorageLevel
): ReceiverInputDStream[(K, V)] = {
- val walEnabled = ssc.conf.getBoolean("spark.streaming.receiver.writeAheadLog.enable", false)
+ val walEnabled = WriteAheadLogUtils.enableReceiverLog(ssc.conf)
new KafkaInputDStream[K, V, U, T](ssc, kafkaParams, topics, walEnabled, storageLevel)
}
http://git-wip-us.apache.org/repos/asf/spark/blob/1868bd40/streaming/src/main/java/org/apache/spark/streaming/util/WriteAheadLog.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/spark/streaming/util/WriteAheadLog.java b/streaming/src/main/java/org/apache/spark/streaming/util/WriteAheadLog.java
new file mode 100644
index 0000000..8c0fdfa
--- /dev/null
+++ b/streaming/src/main/java/org/apache/spark/streaming/util/WriteAheadLog.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.util;
+
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+
+/**
+ * This abstract class represents a write ahead log (aka journal) that is used by Spark Streaming
+ * to save the received data (by receivers) and associated metadata to a reliable storage, so that
+ * they can be recovered after driver failures. See the Spark documentation for more information
+ * on how to plug in your own custom implementation of a write ahead log.
+ */
+@org.apache.spark.annotation.DeveloperApi
+public abstract class WriteAheadLog {
+ /**
+ * Write the record to the log and return a record handle, which contains all the information
+ * necessary to read back the written record. The time is used to the index the record,
+ * such that it can be cleaned later. Note that implementations of this abstract class must
+ * ensure that the written data is durable and readable (using the record handle) by the
+ * time this function returns.
+ */
+ abstract public WriteAheadLogRecordHandle write(ByteBuffer record, long time);
+
+ /**
+ * Read a written record based on the given record handle.
+ */
+ abstract public ByteBuffer read(WriteAheadLogRecordHandle handle);
+
+ /**
+ * Read and return an iterator of all the records that have been written but not yet cleaned up.
+ */
+ abstract public Iterator<ByteBuffer> readAll();
+
+ /**
+ * Clean all the records that are older than the threshold time. It can wait for
+ * the completion of the deletion.
+ */
+ abstract public void clean(long threshTime, boolean waitForCompletion);
+
+ /**
+ * Close this log and release any resources.
+ */
+ abstract public void close();
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/1868bd40/streaming/src/main/java/org/apache/spark/streaming/util/WriteAheadLogRecordHandle.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/spark/streaming/util/WriteAheadLogRecordHandle.java b/streaming/src/main/java/org/apache/spark/streaming/util/WriteAheadLogRecordHandle.java
new file mode 100644
index 0000000..0232418
--- /dev/null
+++ b/streaming/src/main/java/org/apache/spark/streaming/util/WriteAheadLogRecordHandle.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.util;
+
+/**
+ * This abstract class represents a handle that refers to a record written in a
+ * {@link org.apache.spark.streaming.util.WriteAheadLog WriteAheadLog}.
+ * It must contain all the information necessary for the record to be read and returned by
+ * an implemenation of the WriteAheadLog class.
+ *
+ * @see org.apache.spark.streaming.util.WriteAheadLog
+ */
+@org.apache.spark.annotation.DeveloperApi
+public abstract class WriteAheadLogRecordHandle implements java.io.Serializable {
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/1868bd40/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
index 8be0431..4c7fd2c 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
@@ -82,7 +82,7 @@ abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingCont
// WriteAheadLogBackedBlockRDD else create simple BlockRDD.
if (resultTypes.size == 1 && resultTypes.head == classOf[WriteAheadLogBasedStoreResult]) {
val logSegments = blockStoreResults.map {
- _.asInstanceOf[WriteAheadLogBasedStoreResult].segment
+ _.asInstanceOf[WriteAheadLogBasedStoreResult].walRecordHandle
}.toArray
// Since storeInBlockManager = false, the storage level does not matter.
new WriteAheadLogBackedBlockRDD[T](ssc.sparkContext,
http://git-wip-us.apache.org/repos/asf/spark/blob/1868bd40/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
index 93caa4b..ebdf418 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
@@ -16,14 +16,17 @@
*/
package org.apache.spark.streaming.rdd
+import java.nio.ByteBuffer
+
import scala.reflect.ClassTag
+import scala.util.control.NonFatal
-import org.apache.hadoop.conf.Configuration
+import org.apache.commons.io.FileUtils
import org.apache.spark._
import org.apache.spark.rdd.BlockRDD
import org.apache.spark.storage.{BlockId, StorageLevel}
-import org.apache.spark.streaming.util.{HdfsUtils, WriteAheadLogFileSegment, WriteAheadLogRandomReader}
+import org.apache.spark.streaming.util._
/**
* Partition class for [[org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD]].
@@ -31,26 +34,27 @@ import org.apache.spark.streaming.util.{HdfsUtils, WriteAheadLogFileSegment, Wri
* the segment of the write ahead log that backs the partition.
* @param index index of the partition
* @param blockId id of the block having the partition data
- * @param segment segment of the write ahead log having the partition data
+ * @param walRecordHandle Handle of the record in a write ahead log having the partition data
*/
private[streaming]
class WriteAheadLogBackedBlockRDDPartition(
val index: Int,
val blockId: BlockId,
- val segment: WriteAheadLogFileSegment)
+ val walRecordHandle: WriteAheadLogRecordHandle)
extends Partition
/**
* This class represents a special case of the BlockRDD where the data blocks in
- * the block manager are also backed by segments in write ahead logs. For reading
+ * the block manager are also backed by data in write ahead logs. For reading
* the data, this RDD first looks up the blocks by their ids in the block manager.
- * If it does not find them, it looks up the corresponding file segment.
+ * If it does not find them, it looks up the corresponding data in the write ahead log.
*
* @param sc SparkContext
* @param blockIds Ids of the blocks that contains this RDD's data
- * @param segments Segments in write ahead logs that contain this RDD's data
- * @param storeInBlockManager Whether to store in the block manager after reading from the segment
+ * @param walRecordHandles Record handles in write ahead logs that contain this RDD's data
+ * @param storeInBlockManager Whether to store in the block manager after reading
+ * from the WAL record
* @param storageLevel storage level to store when storing in block manager
* (applicable when storeInBlockManager = true)
*/
@@ -58,15 +62,15 @@ private[streaming]
class WriteAheadLogBackedBlockRDD[T: ClassTag](
@transient sc: SparkContext,
@transient blockIds: Array[BlockId],
- @transient segments: Array[WriteAheadLogFileSegment],
+ @transient walRecordHandles: Array[WriteAheadLogRecordHandle],
storeInBlockManager: Boolean,
storageLevel: StorageLevel)
extends BlockRDD[T](sc, blockIds) {
require(
- blockIds.length == segments.length,
+ blockIds.length == walRecordHandles.length,
s"Number of block ids (${blockIds.length}) must be " +
- s"the same as number of segments (${segments.length}})!")
+ s"the same as number of WAL record handles (${walRecordHandles.length}})!")
// Hadoop configuration is not serializable, so broadcast it as a serializable.
@transient private val hadoopConfig = sc.hadoopConfiguration
@@ -75,13 +79,13 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag](
override def getPartitions: Array[Partition] = {
assertValid()
Array.tabulate(blockIds.size) { i =>
- new WriteAheadLogBackedBlockRDDPartition(i, blockIds(i), segments(i))
+ new WriteAheadLogBackedBlockRDDPartition(i, blockIds(i), walRecordHandles(i))
}
}
/**
* Gets the partition data by getting the corresponding block from the block manager.
- * If the block does not exist, then the data is read from the corresponding segment
+ * If the block does not exist, then the data is read from the corresponding record
* in write ahead log files.
*/
override def compute(split: Partition, context: TaskContext): Iterator[T] = {
@@ -96,10 +100,35 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag](
logDebug(s"Read partition data of $this from block manager, block $blockId")
iterator
case None => // Data not found in Block Manager, grab it from write ahead log file
- val reader = new WriteAheadLogRandomReader(partition.segment.path, hadoopConf)
- val dataRead = reader.read(partition.segment)
- reader.close()
- logInfo(s"Read partition data of $this from write ahead log, segment ${partition.segment}")
+ var dataRead: ByteBuffer = null
+ var writeAheadLog: WriteAheadLog = null
+ try {
+ // The WriteAheadLogUtils.createLog*** method needs a directory to create a
+ // WriteAheadLog object as the default FileBasedWriteAheadLog needs a directory for
+ // writing log data. However, the directory is not needed if data needs to be read, hence
+ // a dummy path is provided to satisfy the method parameter requirements.
+ // FileBasedWriteAheadLog will not create any file or directory at that path.
+ val dummyDirectory = FileUtils.getTempDirectoryPath()
+ writeAheadLog = WriteAheadLogUtils.createLogForReceiver(
+ SparkEnv.get.conf, dummyDirectory, hadoopConf)
+ dataRead = writeAheadLog.read(partition.walRecordHandle)
+ } catch {
+ case NonFatal(e) =>
+ throw new SparkException(
+ s"Could not read data from write ahead log record ${partition.walRecordHandle}", e)
+ } finally {
+ if (writeAheadLog != null) {
+ writeAheadLog.close()
+ writeAheadLog = null
+ }
+ }
+ if (dataRead == null) {
+ throw new SparkException(
+ s"Could not read data from write ahead log record ${partition.walRecordHandle}, " +
+ s"read returned null")
+ }
+ logInfo(s"Read partition data of $this from write ahead log, record handle " +
+ partition.walRecordHandle)
if (storeInBlockManager) {
blockManager.putBytes(blockId, dataRead, storageLevel)
logDebug(s"Stored partition data of $this into block manager with level $storageLevel")
@@ -111,14 +140,20 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag](
/**
* Get the preferred location of the partition. This returns the locations of the block
- * if it is present in the block manager, else it returns the location of the
- * corresponding segment in HDFS.
+ * if it is present in the block manager, else if FileBasedWriteAheadLogSegment is used,
+ * it returns the location of the corresponding file segment in HDFS .
*/
override def getPreferredLocations(split: Partition): Seq[String] = {
val partition = split.asInstanceOf[WriteAheadLogBackedBlockRDDPartition]
val blockLocations = getBlockIdLocations().get(partition.blockId)
- blockLocations.getOrElse(
- HdfsUtils.getFileSegmentLocations(
- partition.segment.path, partition.segment.offset, partition.segment.length, hadoopConfig))
+ blockLocations.getOrElse {
+ partition.walRecordHandle match {
+ case fileSegment: FileBasedWriteAheadLogSegment =>
+ HdfsUtils.getFileSegmentLocations(
+ fileSegment.path, fileSegment.offset, fileSegment.length, hadoopConfig)
+ case _ =>
+ Seq.empty
+ }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/1868bd40/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
index 297bf04..4b3d9ee 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
@@ -17,18 +17,18 @@
package org.apache.spark.streaming.receiver
-import scala.concurrent.{Await, ExecutionContext, Future}
import scala.concurrent.duration._
+import scala.concurrent.{Await, ExecutionContext, Future}
import scala.language.{existentials, postfixOps}
-import WriteAheadLogBasedBlockHandler._
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
-import org.apache.spark.{Logging, SparkConf, SparkException}
import org.apache.spark.storage._
-import org.apache.spark.streaming.util.{WriteAheadLogFileSegment, WriteAheadLogManager}
-import org.apache.spark.util.{ThreadUtils, Clock, SystemClock}
+import org.apache.spark.streaming.receiver.WriteAheadLogBasedBlockHandler._
+import org.apache.spark.streaming.util.{WriteAheadLogRecordHandle, WriteAheadLogUtils}
+import org.apache.spark.util.{Clock, SystemClock, ThreadUtils}
+import org.apache.spark.{Logging, SparkConf, SparkException}
/** Trait that represents the metadata related to storage of blocks */
private[streaming] trait ReceivedBlockStoreResult {
@@ -96,7 +96,7 @@ private[streaming] class BlockManagerBasedBlockHandler(
*/
private[streaming] case class WriteAheadLogBasedStoreResult(
blockId: StreamBlockId,
- segment: WriteAheadLogFileSegment
+ walRecordHandle: WriteAheadLogRecordHandle
) extends ReceivedBlockStoreResult
@@ -116,10 +116,6 @@ private[streaming] class WriteAheadLogBasedBlockHandler(
private val blockStoreTimeout = conf.getInt(
"spark.streaming.receiver.blockStoreTimeout", 30).seconds
- private val rollingInterval = conf.getInt(
- "spark.streaming.receiver.writeAheadLog.rollingInterval", 60)
- private val maxFailures = conf.getInt(
- "spark.streaming.receiver.writeAheadLog.maxFailures", 3)
private val effectiveStorageLevel = {
if (storageLevel.deserialized) {
@@ -139,13 +135,9 @@ private[streaming] class WriteAheadLogBasedBlockHandler(
s"$effectiveStorageLevel when write ahead log is enabled")
}
- // Manages rolling log files
- private val logManager = new WriteAheadLogManager(
- checkpointDirToLogDir(checkpointDir, streamId),
- hadoopConf, rollingInterval, maxFailures,
- callerName = this.getClass.getSimpleName,
- clock = clock
- )
+ // Write ahead log manages
+ private val writeAheadLog = WriteAheadLogUtils.createLogForReceiver(
+ conf, checkpointDirToLogDir(checkpointDir, streamId), hadoopConf)
// For processing futures used in parallel block storing into block manager and write ahead log
// # threads = 2, so that both writing to BM and WAL can proceed in parallel
@@ -183,21 +175,21 @@ private[streaming] class WriteAheadLogBasedBlockHandler(
// Store the block in write ahead log
val storeInWriteAheadLogFuture = Future {
- logManager.writeToLog(serializedBlock)
+ writeAheadLog.write(serializedBlock, clock.getTimeMillis())
}
- // Combine the futures, wait for both to complete, and return the write ahead log segment
+ // Combine the futures, wait for both to complete, and return the write ahead log record handle
val combinedFuture = storeInBlockManagerFuture.zip(storeInWriteAheadLogFuture).map(_._2)
- val segment = Await.result(combinedFuture, blockStoreTimeout)
- WriteAheadLogBasedStoreResult(blockId, segment)
+ val walRecordHandle = Await.result(combinedFuture, blockStoreTimeout)
+ WriteAheadLogBasedStoreResult(blockId, walRecordHandle)
}
def cleanupOldBlocks(threshTime: Long) {
- logManager.cleanupOldLogs(threshTime, waitForCompletion = false)
+ writeAheadLog.clean(threshTime, false)
}
def stop() {
- logManager.stop()
+ writeAheadLog.close()
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/1868bd40/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
index f237936..93f047b 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
@@ -25,12 +25,13 @@ import scala.collection.mutable.ArrayBuffer
import com.google.common.base.Throwables
import org.apache.hadoop.conf.Configuration
-import org.apache.spark.{Logging, SparkEnv, SparkException}
import org.apache.spark.rpc.{RpcEnv, ThreadSafeRpcEndpoint}
import org.apache.spark.storage.StreamBlockId
import org.apache.spark.streaming.Time
import org.apache.spark.streaming.scheduler._
+import org.apache.spark.streaming.util.WriteAheadLogUtils
import org.apache.spark.util.{RpcUtils, Utils}
+import org.apache.spark.{Logging, SparkEnv, SparkException}
/**
* Concrete implementation of [[org.apache.spark.streaming.receiver.ReceiverSupervisor]]
@@ -46,7 +47,7 @@ private[streaming] class ReceiverSupervisorImpl(
) extends ReceiverSupervisor(receiver, env.conf) with Logging {
private val receivedBlockHandler: ReceivedBlockHandler = {
- if (env.conf.getBoolean("spark.streaming.receiver.writeAheadLog.enable", false)) {
+ if (WriteAheadLogUtils.enableReceiverLog(env.conf)) {
if (checkpointDirOption.isEmpty) {
throw new SparkException(
"Cannot enable receiver write-ahead log without checkpoint directory set. " +
http://git-wip-us.apache.org/repos/asf/spark/blob/1868bd40/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
index 200cf4e..14e769a 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
@@ -25,10 +25,10 @@ import scala.language.implicitConversions
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
-import org.apache.spark.{SparkException, Logging, SparkConf}
import org.apache.spark.streaming.Time
-import org.apache.spark.streaming.util.WriteAheadLogManager
+import org.apache.spark.streaming.util.{WriteAheadLog, WriteAheadLogUtils}
import org.apache.spark.util.{Clock, Utils}
+import org.apache.spark.{Logging, SparkConf, SparkException}
/** Trait representing any event in the ReceivedBlockTracker that updates its state. */
private[streaming] sealed trait ReceivedBlockTrackerLogEvent
@@ -70,7 +70,7 @@ private[streaming] class ReceivedBlockTracker(
private val streamIdToUnallocatedBlockQueues = new mutable.HashMap[Int, ReceivedBlockQueue]
private val timeToAllocatedBlocks = new mutable.HashMap[Time, AllocatedBlocks]
- private val logManagerOption = createLogManager()
+ private val writeAheadLogOption = createWriteAheadLog()
private var lastAllocatedBatchTime: Time = null
@@ -155,12 +155,12 @@ private[streaming] class ReceivedBlockTracker(
logInfo("Deleting batches " + timesToCleanup)
writeToLog(BatchCleanupEvent(timesToCleanup))
timeToAllocatedBlocks --= timesToCleanup
- logManagerOption.foreach(_.cleanupOldLogs(cleanupThreshTime.milliseconds, waitForCompletion))
+ writeAheadLogOption.foreach(_.clean(cleanupThreshTime.milliseconds, waitForCompletion))
}
/** Stop the block tracker. */
def stop() {
- logManagerOption.foreach { _.stop() }
+ writeAheadLogOption.foreach { _.close() }
}
/**
@@ -190,9 +190,10 @@ private[streaming] class ReceivedBlockTracker(
timeToAllocatedBlocks --= batchTimes
}
- logManagerOption.foreach { logManager =>
+ writeAheadLogOption.foreach { writeAheadLog =>
logInfo(s"Recovering from write ahead logs in ${checkpointDirOption.get}")
- logManager.readFromLog().foreach { byteBuffer =>
+ import scala.collection.JavaConversions._
+ writeAheadLog.readAll().foreach { byteBuffer =>
logTrace("Recovering record " + byteBuffer)
Utils.deserialize[ReceivedBlockTrackerLogEvent](byteBuffer.array) match {
case BlockAdditionEvent(receivedBlockInfo) =>
@@ -208,10 +209,10 @@ private[streaming] class ReceivedBlockTracker(
/** Write an update to the tracker to the write ahead log */
private def writeToLog(record: ReceivedBlockTrackerLogEvent) {
- if (isLogManagerEnabled) {
+ if (isWriteAheadLogEnabled) {
logDebug(s"Writing to log $record")
- logManagerOption.foreach { logManager =>
- logManager.writeToLog(ByteBuffer.wrap(Utils.serialize(record)))
+ writeAheadLogOption.foreach { logManager =>
+ logManager.write(ByteBuffer.wrap(Utils.serialize(record)), clock.getTimeMillis())
}
}
}
@@ -222,8 +223,8 @@ private[streaming] class ReceivedBlockTracker(
}
/** Optionally create the write ahead log manager only if the feature is enabled */
- private def createLogManager(): Option[WriteAheadLogManager] = {
- if (conf.getBoolean("spark.streaming.receiver.writeAheadLog.enable", false)) {
+ private def createWriteAheadLog(): Option[WriteAheadLog] = {
+ if (WriteAheadLogUtils.enableReceiverLog(conf)) {
if (checkpointDirOption.isEmpty) {
throw new SparkException(
"Cannot enable receiver write-ahead log without checkpoint directory set. " +
@@ -231,19 +232,16 @@ private[streaming] class ReceivedBlockTracker(
"See documentation for more details.")
}
val logDir = ReceivedBlockTracker.checkpointDirToLogDir(checkpointDirOption.get)
- val rollingIntervalSecs = conf.getInt(
- "spark.streaming.receivedBlockTracker.writeAheadLog.rotationIntervalSecs", 60)
- val logManager = new WriteAheadLogManager(logDir, hadoopConf,
- rollingIntervalSecs = rollingIntervalSecs, clock = clock,
- callerName = "ReceivedBlockHandlerMaster")
- Some(logManager)
+
+ val log = WriteAheadLogUtils.createLogForDriver(conf, logDir, hadoopConf)
+ Some(log)
} else {
None
}
}
- /** Check if the log manager is enabled. This is only used for testing purposes. */
- private[streaming] def isLogManagerEnabled: Boolean = logManagerOption.nonEmpty
+ /** Check if the write ahead log is enabled. This is only used for testing purposes. */
+ private[streaming] def isWriteAheadLogEnabled: Boolean = writeAheadLogOption.nonEmpty
}
private[streaming] object ReceivedBlockTracker {
http://git-wip-us.apache.org/repos/asf/spark/blob/1868bd40/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
index c4ead6f..1af6571 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
@@ -20,6 +20,7 @@ package org.apache.spark.streaming.scheduler
import scala.collection.mutable.{HashMap, SynchronizedMap}
import scala.language.existentials
+import org.apache.spark.streaming.util.WriteAheadLogUtils
import org.apache.spark.{Logging, SerializableWritable, SparkEnv, SparkException}
import org.apache.spark.rpc._
import org.apache.spark.streaming.{StreamingContext, Time}
@@ -125,7 +126,7 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
receivedBlockTracker.cleanupOldBatches(cleanupThreshTime, waitForCompletion = false)
// Signal the receivers to delete old block data
- if (ssc.conf.getBoolean("spark.streaming.receiver.writeAheadLog.enable", false)) {
+ if (WriteAheadLogUtils.enableReceiverLog(ssc.conf)) {
logInfo(s"Cleanup old received batch data: $cleanupThreshTime")
receiverInfo.values.flatMap { info => Option(info.endpoint) }
.foreach { _.send(CleanupOldBlocks(cleanupThreshTime)) }
http://git-wip-us.apache.org/repos/asf/spark/blob/1868bd40/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
new file mode 100644
index 0000000..9985fed
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.util
+
+import java.nio.ByteBuffer
+import java.util.{Iterator => JIterator}
+
+import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.{Await, ExecutionContext, Future}
+import scala.language.postfixOps
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+
+import org.apache.spark.util.ThreadUtils
+import org.apache.spark.{Logging, SparkConf}
+
+/**
+ * This class manages write ahead log files.
+ * - Writes records (bytebuffers) to periodically rotating log files.
+ * - Recovers the log files and the reads the recovered records upon failures.
+ * - Cleans up old log files.
+ *
+ * Uses [[org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter]] to write
+ * and [[org.apache.spark.streaming.util.FileBasedWriteAheadLogReader]] to read.
+ *
+ * @param logDirectory Directory when rotating log files will be created.
+ * @param hadoopConf Hadoop configuration for reading/writing log files.
+ */
+private[streaming] class FileBasedWriteAheadLog(
+ conf: SparkConf,
+ logDirectory: String,
+ hadoopConf: Configuration,
+ rollingIntervalSecs: Int,
+ maxFailures: Int
+ ) extends WriteAheadLog with Logging {
+
+ import FileBasedWriteAheadLog._
+
+ private val pastLogs = new ArrayBuffer[LogInfo]
+ private val callerNameTag = getCallerName.map(c => s" for $c").getOrElse("")
+
+ private val threadpoolName = s"WriteAheadLogManager $callerNameTag"
+ implicit private val executionContext = ExecutionContext.fromExecutorService(
+ ThreadUtils.newDaemonSingleThreadExecutor(threadpoolName))
+ override protected val logName = s"WriteAheadLogManager $callerNameTag"
+
+ private var currentLogPath: Option[String] = None
+ private var currentLogWriter: FileBasedWriteAheadLogWriter = null
+ private var currentLogWriterStartTime: Long = -1L
+ private var currentLogWriterStopTime: Long = -1L
+
+ initializeOrRecover()
+
+ /**
+ * Write a byte buffer to the log file. This method synchronously writes the data in the
+ * ByteBuffer to HDFS. When this method returns, the data is guaranteed to have been flushed
+ * to HDFS, and will be available for readers to read.
+ */
+ def write(byteBuffer: ByteBuffer, time: Long): FileBasedWriteAheadLogSegment = synchronized {
+ var fileSegment: FileBasedWriteAheadLogSegment = null
+ var failures = 0
+ var lastException: Exception = null
+ var succeeded = false
+ while (!succeeded && failures < maxFailures) {
+ try {
+ fileSegment = getLogWriter(time).write(byteBuffer)
+ succeeded = true
+ } catch {
+ case ex: Exception =>
+ lastException = ex
+ logWarning("Failed to write to write ahead log")
+ resetWriter()
+ failures += 1
+ }
+ }
+ if (fileSegment == null) {
+ logError(s"Failed to write to write ahead log after $failures failures")
+ throw lastException
+ }
+ fileSegment
+ }
+
+ def read(segment: WriteAheadLogRecordHandle): ByteBuffer = {
+ val fileSegment = segment.asInstanceOf[FileBasedWriteAheadLogSegment]
+ var reader: FileBasedWriteAheadLogRandomReader = null
+ var byteBuffer: ByteBuffer = null
+ try {
+ reader = new FileBasedWriteAheadLogRandomReader(fileSegment.path, hadoopConf)
+ byteBuffer = reader.read(fileSegment)
+ } finally {
+ reader.close()
+ }
+ byteBuffer
+ }
+
+ /**
+ * Read all the existing logs from the log directory.
+ *
+ * Note that this is typically called when the caller is initializing and wants
+ * to recover past state from the write ahead logs (that is, before making any writes).
+ * If this is called after writes have been made using this manager, then it may not return
+ * the latest the records. This does not deal with currently active log files, and
+ * hence the implementation is kept simple.
+ */
+ def readAll(): JIterator[ByteBuffer] = synchronized {
+ import scala.collection.JavaConversions._
+ val logFilesToRead = pastLogs.map{ _.path} ++ currentLogPath
+ logInfo("Reading from the logs: " + logFilesToRead.mkString("\n"))
+
+ logFilesToRead.iterator.map { file =>
+ logDebug(s"Creating log reader with $file")
+ new FileBasedWriteAheadLogReader(file, hadoopConf)
+ } flatMap { x => x }
+ }
+
+ /**
+ * Delete the log files that are older than the threshold time.
+ *
+ * Its important to note that the threshold time is based on the time stamps used in the log
+ * files, which is usually based on the local system time. So if there is coordination necessary
+ * between the node calculating the threshTime (say, driver node), and the local system time
+ * (say, worker node), the caller has to take account of possible time skew.
+ *
+ * If waitForCompletion is set to true, this method will return only after old logs have been
+ * deleted. This should be set to true only for testing. Else the files will be deleted
+ * asynchronously.
+ */
+ def clean(threshTime: Long, waitForCompletion: Boolean): Unit = {
+ val oldLogFiles = synchronized { pastLogs.filter { _.endTime < threshTime } }
+ logInfo(s"Attempting to clear ${oldLogFiles.size} old log files in $logDirectory " +
+ s"older than $threshTime: ${oldLogFiles.map { _.path }.mkString("\n")}")
+
+ def deleteFiles() {
+ oldLogFiles.foreach { logInfo =>
+ try {
+ val path = new Path(logInfo.path)
+ val fs = HdfsUtils.getFileSystemForPath(path, hadoopConf)
+ fs.delete(path, true)
+ synchronized { pastLogs -= logInfo }
+ logDebug(s"Cleared log file $logInfo")
+ } catch {
+ case ex: Exception =>
+ logWarning(s"Error clearing write ahead log file $logInfo", ex)
+ }
+ }
+ logInfo(s"Cleared log files in $logDirectory older than $threshTime")
+ }
+ if (!executionContext.isShutdown) {
+ val f = Future { deleteFiles() }
+ if (waitForCompletion) {
+ import scala.concurrent.duration._
+ Await.ready(f, 1 second)
+ }
+ }
+ }
+
+
+ /** Stop the manager, close any open log writer */
+ def close(): Unit = synchronized {
+ if (currentLogWriter != null) {
+ currentLogWriter.close()
+ }
+ executionContext.shutdown()
+ logInfo("Stopped write ahead log manager")
+ }
+
+ /** Get the current log writer while taking care of rotation */
+ private def getLogWriter(currentTime: Long): FileBasedWriteAheadLogWriter = synchronized {
+ if (currentLogWriter == null || currentTime > currentLogWriterStopTime) {
+ resetWriter()
+ currentLogPath.foreach {
+ pastLogs += LogInfo(currentLogWriterStartTime, currentLogWriterStopTime, _)
+ }
+ currentLogWriterStartTime = currentTime
+ currentLogWriterStopTime = currentTime + (rollingIntervalSecs * 1000)
+ val newLogPath = new Path(logDirectory,
+ timeToLogFile(currentLogWriterStartTime, currentLogWriterStopTime))
+ currentLogPath = Some(newLogPath.toString)
+ currentLogWriter = new FileBasedWriteAheadLogWriter(currentLogPath.get, hadoopConf)
+ }
+ currentLogWriter
+ }
+
+ /** Initialize the log directory or recover existing logs inside the directory */
+ private def initializeOrRecover(): Unit = synchronized {
+ val logDirectoryPath = new Path(logDirectory)
+ val fileSystem = HdfsUtils.getFileSystemForPath(logDirectoryPath, hadoopConf)
+
+ if (fileSystem.exists(logDirectoryPath) && fileSystem.getFileStatus(logDirectoryPath).isDir) {
+ val logFileInfo = logFilesTologInfo(fileSystem.listStatus(logDirectoryPath).map { _.getPath })
+ pastLogs.clear()
+ pastLogs ++= logFileInfo
+ logInfo(s"Recovered ${logFileInfo.size} write ahead log files from $logDirectory")
+ logDebug(s"Recovered files are:\n${logFileInfo.map(_.path).mkString("\n")}")
+ }
+ }
+
+ private def resetWriter(): Unit = synchronized {
+ if (currentLogWriter != null) {
+ currentLogWriter.close()
+ currentLogWriter = null
+ }
+ }
+}
+
+private[streaming] object FileBasedWriteAheadLog {
+
+ case class LogInfo(startTime: Long, endTime: Long, path: String)
+
+ val logFileRegex = """log-(\d+)-(\d+)""".r
+
+ def timeToLogFile(startTime: Long, stopTime: Long): String = {
+ s"log-$startTime-$stopTime"
+ }
+
+ def getCallerName(): Option[String] = {
+ val stackTraceClasses = Thread.currentThread.getStackTrace().map(_.getClassName)
+ stackTraceClasses.find(!_.contains("WriteAheadLog")).flatMap(_.split(".").lastOption)
+ }
+
+ /** Convert a sequence of files to a sequence of sorted LogInfo objects */
+ def logFilesTologInfo(files: Seq[Path]): Seq[LogInfo] = {
+ files.flatMap { file =>
+ logFileRegex.findFirstIn(file.getName()) match {
+ case Some(logFileRegex(startTimeStr, stopTimeStr)) =>
+ val startTime = startTimeStr.toLong
+ val stopTime = stopTimeStr.toLong
+ Some(LogInfo(startTime, stopTime, file.toString))
+ case None =>
+ None
+ }
+ }.sortBy { _.startTime }
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/1868bd40/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogRandomReader.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogRandomReader.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogRandomReader.scala
new file mode 100644
index 0000000..f716822
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogRandomReader.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.util
+
+import java.io.Closeable
+import java.nio.ByteBuffer
+
+import org.apache.hadoop.conf.Configuration
+
+/**
+ * A random access reader for reading write ahead log files written using
+ * [[org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter]]. Given the file segment info,
+ * this reads the record (ByteBuffer) from the log file.
+ */
+private[streaming] class FileBasedWriteAheadLogRandomReader(path: String, conf: Configuration)
+ extends Closeable {
+
+ private val instream = HdfsUtils.getInputStream(path, conf)
+ private var closed = false
+
+ def read(segment: FileBasedWriteAheadLogSegment): ByteBuffer = synchronized {
+ assertOpen()
+ instream.seek(segment.offset)
+ val nextLength = instream.readInt()
+ HdfsUtils.checkState(nextLength == segment.length,
+ s"Expected message length to be ${segment.length}, but was $nextLength")
+ val buffer = new Array[Byte](nextLength)
+ instream.readFully(buffer)
+ ByteBuffer.wrap(buffer)
+ }
+
+ override def close(): Unit = synchronized {
+ closed = true
+ instream.close()
+ }
+
+ private def assertOpen() {
+ HdfsUtils.checkState(!closed, "Stream is closed. Create a new Reader to read from the file.")
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/1868bd40/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogReader.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogReader.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogReader.scala
new file mode 100644
index 0000000..c3bb59f
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogReader.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.util
+
+import java.io.{Closeable, EOFException}
+import java.nio.ByteBuffer
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.spark.Logging
+
+/**
+ * A reader for reading write ahead log files written using
+ * [[org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter]]. This reads
+ * the records (bytebuffers) in the log file sequentially and return them as an
+ * iterator of bytebuffers.
+ */
+private[streaming] class FileBasedWriteAheadLogReader(path: String, conf: Configuration)
+ extends Iterator[ByteBuffer] with Closeable with Logging {
+
+ private val instream = HdfsUtils.getInputStream(path, conf)
+ private var closed = false
+ private var nextItem: Option[ByteBuffer] = None
+
+ override def hasNext: Boolean = synchronized {
+ if (closed) {
+ return false
+ }
+
+ if (nextItem.isDefined) { // handle the case where hasNext is called without calling next
+ true
+ } else {
+ try {
+ val length = instream.readInt()
+ val buffer = new Array[Byte](length)
+ instream.readFully(buffer)
+ nextItem = Some(ByteBuffer.wrap(buffer))
+ logTrace("Read next item " + nextItem.get)
+ true
+ } catch {
+ case e: EOFException =>
+ logDebug("Error reading next item, EOF reached", e)
+ close()
+ false
+ case e: Exception =>
+ logWarning("Error while trying to read data from HDFS.", e)
+ close()
+ throw e
+ }
+ }
+ }
+
+ override def next(): ByteBuffer = synchronized {
+ val data = nextItem.getOrElse {
+ close()
+ throw new IllegalStateException(
+ "next called without calling hasNext or after hasNext returned false")
+ }
+ nextItem = None // Ensure the next hasNext call loads new data.
+ data
+ }
+
+ override def close(): Unit = synchronized {
+ if (!closed) {
+ instream.close()
+ }
+ closed = true
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/1868bd40/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogSegment.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogSegment.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogSegment.scala
new file mode 100644
index 0000000..2e1f152
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogSegment.scala
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.util
+
+/** Class for representing a segment of data in a write ahead log file */
+private[streaming] case class FileBasedWriteAheadLogSegment(path: String, offset: Long, length: Int)
+ extends WriteAheadLogRecordHandle
http://git-wip-us.apache.org/repos/asf/spark/blob/1868bd40/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogWriter.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogWriter.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogWriter.scala
new file mode 100644
index 0000000..e146bec
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogWriter.scala
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.util
+
+import java.io._
+import java.nio.ByteBuffer
+
+import scala.util.Try
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.FSDataOutputStream
+
+/**
+ * A writer for writing byte-buffers to a write ahead log file.
+ */
+private[streaming] class FileBasedWriteAheadLogWriter(path: String, hadoopConf: Configuration)
+ extends Closeable {
+
+ private lazy val stream = HdfsUtils.getOutputStream(path, hadoopConf)
+
+ private lazy val hadoopFlushMethod = {
+ // Use reflection to get the right flush operation
+ val cls = classOf[FSDataOutputStream]
+ Try(cls.getMethod("hflush")).orElse(Try(cls.getMethod("sync"))).toOption
+ }
+
+ private var nextOffset = stream.getPos()
+ private var closed = false
+
+ /** Write the bytebuffer to the log file */
+ def write(data: ByteBuffer): FileBasedWriteAheadLogSegment = synchronized {
+ assertOpen()
+ data.rewind() // Rewind to ensure all data in the buffer is retrieved
+ val lengthToWrite = data.remaining()
+ val segment = new FileBasedWriteAheadLogSegment(path, nextOffset, lengthToWrite)
+ stream.writeInt(lengthToWrite)
+ if (data.hasArray) {
+ stream.write(data.array())
+ } else {
+ // If the buffer is not backed by an array, we transfer using temp array
+ // Note that despite the extra array copy, this should be faster than byte-by-byte copy
+ while (data.hasRemaining) {
+ val array = new Array[Byte](data.remaining)
+ data.get(array)
+ stream.write(array)
+ }
+ }
+ flush()
+ nextOffset = stream.getPos()
+ segment
+ }
+
+ override def close(): Unit = synchronized {
+ closed = true
+ stream.close()
+ }
+
+ private def flush() {
+ hadoopFlushMethod.foreach { _.invoke(stream) }
+ // Useful for local file system where hflush/sync does not work (HADOOP-7844)
+ stream.getWrappedStream.flush()
+ }
+
+ private def assertOpen() {
+ HdfsUtils.checkState(!closed, "Stream is closed. Create a new Writer to write to file.")
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/1868bd40/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogFileSegment.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogFileSegment.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogFileSegment.scala
deleted file mode 100644
index 1005a2c..0000000
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogFileSegment.scala
+++ /dev/null
@@ -1,20 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.streaming.util
-
-/** Class for representing a segment of data in a write ahead log file */
-private[streaming] case class WriteAheadLogFileSegment (path: String, offset: Long, length: Int)
http://git-wip-us.apache.org/repos/asf/spark/blob/1868bd40/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala
deleted file mode 100644
index 38a93cc..0000000
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala
+++ /dev/null
@@ -1,233 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.streaming.util
-
-import java.nio.ByteBuffer
-
-import scala.collection.mutable.ArrayBuffer
-import scala.concurrent.{Await, ExecutionContext, Future}
-import scala.language.postfixOps
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
-import org.apache.spark.Logging
-import org.apache.spark.util.{ThreadUtils, Clock, SystemClock}
-import WriteAheadLogManager._
-
-/**
- * This class manages write ahead log files.
- * - Writes records (bytebuffers) to periodically rotating log files.
- * - Recovers the log files and the reads the recovered records upon failures.
- * - Cleans up old log files.
- *
- * Uses [[org.apache.spark.streaming.util.WriteAheadLogWriter]] to write
- * and [[org.apache.spark.streaming.util.WriteAheadLogReader]] to read.
- *
- * @param logDirectory Directory when rotating log files will be created.
- * @param hadoopConf Hadoop configuration for reading/writing log files.
- * @param rollingIntervalSecs The interval in seconds with which logs will be rolled over.
- * Default is one minute.
- * @param maxFailures Max number of failures that is tolerated for every attempt to write to log.
- * Default is three.
- * @param callerName Optional name of the class who is using this manager.
- * @param clock Optional clock that is used to check for rotation interval.
- */
-private[streaming] class WriteAheadLogManager(
- logDirectory: String,
- hadoopConf: Configuration,
- rollingIntervalSecs: Int = 60,
- maxFailures: Int = 3,
- callerName: String = "",
- clock: Clock = new SystemClock
- ) extends Logging {
-
- private val pastLogs = new ArrayBuffer[LogInfo]
- private val callerNameTag =
- if (callerName.nonEmpty) s" for $callerName" else ""
- private val threadpoolName = s"WriteAheadLogManager $callerNameTag"
- implicit private val executionContext = ExecutionContext.fromExecutorService(
- ThreadUtils.newDaemonSingleThreadExecutor(threadpoolName))
- override protected val logName = s"WriteAheadLogManager $callerNameTag"
-
- private var currentLogPath: Option[String] = None
- private var currentLogWriter: WriteAheadLogWriter = null
- private var currentLogWriterStartTime: Long = -1L
- private var currentLogWriterStopTime: Long = -1L
-
- initializeOrRecover()
-
- /**
- * Write a byte buffer to the log file. This method synchronously writes the data in the
- * ByteBuffer to HDFS. When this method returns, the data is guaranteed to have been flushed
- * to HDFS, and will be available for readers to read.
- */
- def writeToLog(byteBuffer: ByteBuffer): WriteAheadLogFileSegment = synchronized {
- var fileSegment: WriteAheadLogFileSegment = null
- var failures = 0
- var lastException: Exception = null
- var succeeded = false
- while (!succeeded && failures < maxFailures) {
- try {
- fileSegment = getLogWriter(clock.getTimeMillis()).write(byteBuffer)
- succeeded = true
- } catch {
- case ex: Exception =>
- lastException = ex
- logWarning("Failed to write to write ahead log")
- resetWriter()
- failures += 1
- }
- }
- if (fileSegment == null) {
- logError(s"Failed to write to write ahead log after $failures failures")
- throw lastException
- }
- fileSegment
- }
-
- /**
- * Read all the existing logs from the log directory.
- *
- * Note that this is typically called when the caller is initializing and wants
- * to recover past state from the write ahead logs (that is, before making any writes).
- * If this is called after writes have been made using this manager, then it may not return
- * the latest the records. This does not deal with currently active log files, and
- * hence the implementation is kept simple.
- */
- def readFromLog(): Iterator[ByteBuffer] = synchronized {
- val logFilesToRead = pastLogs.map{ _.path} ++ currentLogPath
- logInfo("Reading from the logs: " + logFilesToRead.mkString("\n"))
- logFilesToRead.iterator.map { file =>
- logDebug(s"Creating log reader with $file")
- new WriteAheadLogReader(file, hadoopConf)
- } flatMap { x => x }
- }
-
- /**
- * Delete the log files that are older than the threshold time.
- *
- * Its important to note that the threshold time is based on the time stamps used in the log
- * files, which is usually based on the local system time. So if there is coordination necessary
- * between the node calculating the threshTime (say, driver node), and the local system time
- * (say, worker node), the caller has to take account of possible time skew.
- *
- * If waitForCompletion is set to true, this method will return only after old logs have been
- * deleted. This should be set to true only for testing. Else the files will be deleted
- * asynchronously.
- */
- def cleanupOldLogs(threshTime: Long, waitForCompletion: Boolean): Unit = {
- val oldLogFiles = synchronized { pastLogs.filter { _.endTime < threshTime } }
- logInfo(s"Attempting to clear ${oldLogFiles.size} old log files in $logDirectory " +
- s"older than $threshTime: ${oldLogFiles.map { _.path }.mkString("\n")}")
-
- def deleteFiles() {
- oldLogFiles.foreach { logInfo =>
- try {
- val path = new Path(logInfo.path)
- val fs = HdfsUtils.getFileSystemForPath(path, hadoopConf)
- fs.delete(path, true)
- synchronized { pastLogs -= logInfo }
- logDebug(s"Cleared log file $logInfo")
- } catch {
- case ex: Exception =>
- logWarning(s"Error clearing write ahead log file $logInfo", ex)
- }
- }
- logInfo(s"Cleared log files in $logDirectory older than $threshTime")
- }
- if (!executionContext.isShutdown) {
- val f = Future { deleteFiles() }
- if (waitForCompletion) {
- import scala.concurrent.duration._
- Await.ready(f, 1 second)
- }
- }
- }
-
-
- /** Stop the manager, close any open log writer */
- def stop(): Unit = synchronized {
- if (currentLogWriter != null) {
- currentLogWriter.close()
- }
- executionContext.shutdown()
- logInfo("Stopped write ahead log manager")
- }
-
- /** Get the current log writer while taking care of rotation */
- private def getLogWriter(currentTime: Long): WriteAheadLogWriter = synchronized {
- if (currentLogWriter == null || currentTime > currentLogWriterStopTime) {
- resetWriter()
- currentLogPath.foreach {
- pastLogs += LogInfo(currentLogWriterStartTime, currentLogWriterStopTime, _)
- }
- currentLogWriterStartTime = currentTime
- currentLogWriterStopTime = currentTime + (rollingIntervalSecs * 1000)
- val newLogPath = new Path(logDirectory,
- timeToLogFile(currentLogWriterStartTime, currentLogWriterStopTime))
- currentLogPath = Some(newLogPath.toString)
- currentLogWriter = new WriteAheadLogWriter(currentLogPath.get, hadoopConf)
- }
- currentLogWriter
- }
-
- /** Initialize the log directory or recover existing logs inside the directory */
- private def initializeOrRecover(): Unit = synchronized {
- val logDirectoryPath = new Path(logDirectory)
- val fileSystem = HdfsUtils.getFileSystemForPath(logDirectoryPath, hadoopConf)
-
- if (fileSystem.exists(logDirectoryPath) && fileSystem.getFileStatus(logDirectoryPath).isDir) {
- val logFileInfo = logFilesTologInfo(fileSystem.listStatus(logDirectoryPath).map { _.getPath })
- pastLogs.clear()
- pastLogs ++= logFileInfo
- logInfo(s"Recovered ${logFileInfo.size} write ahead log files from $logDirectory")
- logDebug(s"Recovered files are:\n${logFileInfo.map(_.path).mkString("\n")}")
- }
- }
-
- private def resetWriter(): Unit = synchronized {
- if (currentLogWriter != null) {
- currentLogWriter.close()
- currentLogWriter = null
- }
- }
-}
-
-private[util] object WriteAheadLogManager {
-
- case class LogInfo(startTime: Long, endTime: Long, path: String)
-
- val logFileRegex = """log-(\d+)-(\d+)""".r
-
- def timeToLogFile(startTime: Long, stopTime: Long): String = {
- s"log-$startTime-$stopTime"
- }
-
- /** Convert a sequence of files to a sequence of sorted LogInfo objects */
- def logFilesTologInfo(files: Seq[Path]): Seq[LogInfo] = {
- files.flatMap { file =>
- logFileRegex.findFirstIn(file.getName()) match {
- case Some(logFileRegex(startTimeStr, stopTimeStr)) =>
- val startTime = startTimeStr.toLong
- val stopTime = stopTimeStr.toLong
- Some(LogInfo(startTime, stopTime, file.toString))
- case None =>
- None
- }
- }.sortBy { _.startTime }
- }
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/1868bd40/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogRandomReader.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogRandomReader.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogRandomReader.scala
deleted file mode 100644
index 0039890..0000000
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogRandomReader.scala
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.streaming.util
-
-import java.io.Closeable
-import java.nio.ByteBuffer
-
-import org.apache.hadoop.conf.Configuration
-
-/**
- * A random access reader for reading write ahead log files written using
- * [[org.apache.spark.streaming.util.WriteAheadLogWriter]]. Given the file segment info,
- * this reads the record (bytebuffer) from the log file.
- */
-private[streaming] class WriteAheadLogRandomReader(path: String, conf: Configuration)
- extends Closeable {
-
- private val instream = HdfsUtils.getInputStream(path, conf)
- private var closed = false
-
- def read(segment: WriteAheadLogFileSegment): ByteBuffer = synchronized {
- assertOpen()
- instream.seek(segment.offset)
- val nextLength = instream.readInt()
- HdfsUtils.checkState(nextLength == segment.length,
- s"Expected message length to be ${segment.length}, but was $nextLength")
- val buffer = new Array[Byte](nextLength)
- instream.readFully(buffer)
- ByteBuffer.wrap(buffer)
- }
-
- override def close(): Unit = synchronized {
- closed = true
- instream.close()
- }
-
- private def assertOpen() {
- HdfsUtils.checkState(!closed, "Stream is closed. Create a new Reader to read from the file.")
- }
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/1868bd40/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogReader.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogReader.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogReader.scala
deleted file mode 100644
index 2afc0d1..0000000
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogReader.scala
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.streaming.util
-
-import java.io.{Closeable, EOFException}
-import java.nio.ByteBuffer
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.spark.Logging
-
-/**
- * A reader for reading write ahead log files written using
- * [[org.apache.spark.streaming.util.WriteAheadLogWriter]]. This reads
- * the records (bytebuffers) in the log file sequentially and return them as an
- * iterator of bytebuffers.
- */
-private[streaming] class WriteAheadLogReader(path: String, conf: Configuration)
- extends Iterator[ByteBuffer] with Closeable with Logging {
-
- private val instream = HdfsUtils.getInputStream(path, conf)
- private var closed = false
- private var nextItem: Option[ByteBuffer] = None
-
- override def hasNext: Boolean = synchronized {
- if (closed) {
- return false
- }
-
- if (nextItem.isDefined) { // handle the case where hasNext is called without calling next
- true
- } else {
- try {
- val length = instream.readInt()
- val buffer = new Array[Byte](length)
- instream.readFully(buffer)
- nextItem = Some(ByteBuffer.wrap(buffer))
- logTrace("Read next item " + nextItem.get)
- true
- } catch {
- case e: EOFException =>
- logDebug("Error reading next item, EOF reached", e)
- close()
- false
- case e: Exception =>
- logWarning("Error while trying to read data from HDFS.", e)
- close()
- throw e
- }
- }
- }
-
- override def next(): ByteBuffer = synchronized {
- val data = nextItem.getOrElse {
- close()
- throw new IllegalStateException(
- "next called without calling hasNext or after hasNext returned false")
- }
- nextItem = None // Ensure the next hasNext call loads new data.
- data
- }
-
- override def close(): Unit = synchronized {
- if (!closed) {
- instream.close()
- }
- closed = true
- }
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/1868bd40/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogUtils.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogUtils.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogUtils.scala
new file mode 100644
index 0000000..7f6ff12
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogUtils.scala
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.util
+
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.conf.Configuration
+
+import org.apache.spark.util.Utils
+import org.apache.spark.{Logging, SparkConf, SparkException}
+
+/** A helper class with utility functions related to the WriteAheadLog interface */
+private[streaming] object WriteAheadLogUtils extends Logging {
+ val RECEIVER_WAL_ENABLE_CONF_KEY = "spark.streaming.receiver.writeAheadLog.enable"
+ val RECEIVER_WAL_CLASS_CONF_KEY = "spark.streaming.receiver.writeAheadLog.class"
+ val RECEIVER_WAL_ROLLING_INTERVAL_CONF_KEY =
+ "spark.streaming.receiver.writeAheadLog.rollingIntervalSecs"
+ val RECEIVER_WAL_MAX_FAILURES_CONF_KEY = "spark.streaming.receiver.writeAheadLog.maxFailures"
+
+ val DRIVER_WAL_CLASS_CONF_KEY = "spark.streaming.driver.writeAheadLog.class"
+ val DRIVER_WAL_ROLLING_INTERVAL_CONF_KEY =
+ "spark.streaming.driver.writeAheadLog.rollingIntervalSecs"
+ val DRIVER_WAL_MAX_FAILURES_CONF_KEY = "spark.streaming.driver.writeAheadLog.maxFailures"
+
+ val DEFAULT_ROLLING_INTERVAL_SECS = 60
+ val DEFAULT_MAX_FAILURES = 3
+
+ def enableReceiverLog(conf: SparkConf): Boolean = {
+ conf.getBoolean(RECEIVER_WAL_ENABLE_CONF_KEY, false)
+ }
+
+ def getRollingIntervalSecs(conf: SparkConf, isDriver: Boolean): Int = {
+ if (isDriver) {
+ conf.getInt(DRIVER_WAL_ROLLING_INTERVAL_CONF_KEY, DEFAULT_ROLLING_INTERVAL_SECS)
+ } else {
+ conf.getInt(RECEIVER_WAL_ROLLING_INTERVAL_CONF_KEY, DEFAULT_ROLLING_INTERVAL_SECS)
+ }
+ }
+
+ def getMaxFailures(conf: SparkConf, isDriver: Boolean): Int = {
+ if (isDriver) {
+ conf.getInt(DRIVER_WAL_MAX_FAILURES_CONF_KEY, DEFAULT_MAX_FAILURES)
+ } else {
+ conf.getInt(RECEIVER_WAL_MAX_FAILURES_CONF_KEY, DEFAULT_MAX_FAILURES)
+ }
+ }
+
+ /**
+ * Create a WriteAheadLog for the driver. If configured with custom WAL class, it will try
+ * to create instance of that class, otherwise it will create the default FileBasedWriteAheadLog.
+ */
+ def createLogForDriver(
+ sparkConf: SparkConf,
+ fileWalLogDirectory: String,
+ fileWalHadoopConf: Configuration
+ ): WriteAheadLog = {
+ createLog(true, sparkConf, fileWalLogDirectory, fileWalHadoopConf)
+ }
+
+ /**
+ * Create a WriteAheadLog for the receiver. If configured with custom WAL class, it will try
+ * to create instance of that class, otherwise it will create the default FileBasedWriteAheadLog.
+ */
+ def createLogForReceiver(
+ sparkConf: SparkConf,
+ fileWalLogDirectory: String,
+ fileWalHadoopConf: Configuration
+ ): WriteAheadLog = {
+ createLog(false, sparkConf, fileWalLogDirectory, fileWalHadoopConf)
+ }
+
+ /**
+ * Create a WriteAheadLog based on the value of the given config key. The config key is used
+ * to get the class name from the SparkConf. If the class is configured, it will try to
+ * create instance of that class by first trying `new CustomWAL(sparkConf, logDir)` then trying
+ * `new CustomWAL(sparkConf)`. If either fails, it will fail. If no class is configured, then
+ * it will create the default FileBasedWriteAheadLog.
+ */
+ private def createLog(
+ isDriver: Boolean,
+ sparkConf: SparkConf,
+ fileWalLogDirectory: String,
+ fileWalHadoopConf: Configuration
+ ): WriteAheadLog = {
+
+ val classNameOption = if (isDriver) {
+ sparkConf.getOption(DRIVER_WAL_CLASS_CONF_KEY)
+ } else {
+ sparkConf.getOption(RECEIVER_WAL_CLASS_CONF_KEY)
+ }
+ classNameOption.map { className =>
+ try {
+ instantiateClass(
+ Utils.classForName(className).asInstanceOf[Class[_ <: WriteAheadLog]], sparkConf)
+ } catch {
+ case NonFatal(e) =>
+ throw new SparkException(s"Could not create a write ahead log of class $className", e)
+ }
+ }.getOrElse {
+ new FileBasedWriteAheadLog(sparkConf, fileWalLogDirectory, fileWalHadoopConf,
+ getRollingIntervalSecs(sparkConf, isDriver), getMaxFailures(sparkConf, isDriver))
+ }
+ }
+
+ /** Instantiate the class, either using single arg constructor or zero arg constructor */
+ private def instantiateClass(cls: Class[_ <: WriteAheadLog], conf: SparkConf): WriteAheadLog = {
+ try {
+ cls.getConstructor(classOf[SparkConf]).newInstance(conf)
+ } catch {
+ case nsme: NoSuchMethodException =>
+ cls.getConstructor().newInstance()
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/1868bd40/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogWriter.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogWriter.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogWriter.scala
deleted file mode 100644
index 679f6a6..0000000
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogWriter.scala
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.streaming.util
-
-import java.io._
-import java.net.URI
-import java.nio.ByteBuffer
-
-import scala.util.Try
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FSDataOutputStream, FileSystem}
-
-/**
- * A writer for writing byte-buffers to a write ahead log file.
- */
-private[streaming] class WriteAheadLogWriter(path: String, hadoopConf: Configuration)
- extends Closeable {
-
- private lazy val stream = HdfsUtils.getOutputStream(path, hadoopConf)
-
- private lazy val hadoopFlushMethod = {
- // Use reflection to get the right flush operation
- val cls = classOf[FSDataOutputStream]
- Try(cls.getMethod("hflush")).orElse(Try(cls.getMethod("sync"))).toOption
- }
-
- private var nextOffset = stream.getPos()
- private var closed = false
-
- /** Write the bytebuffer to the log file */
- def write(data: ByteBuffer): WriteAheadLogFileSegment = synchronized {
- assertOpen()
- data.rewind() // Rewind to ensure all data in the buffer is retrieved
- val lengthToWrite = data.remaining()
- val segment = new WriteAheadLogFileSegment(path, nextOffset, lengthToWrite)
- stream.writeInt(lengthToWrite)
- if (data.hasArray) {
- stream.write(data.array())
- } else {
- // If the buffer is not backed by an array, we transfer using temp array
- // Note that despite the extra array copy, this should be faster than byte-by-byte copy
- while (data.hasRemaining) {
- val array = new Array[Byte](data.remaining)
- data.get(array)
- stream.write(array)
- }
- }
- flush()
- nextOffset = stream.getPos()
- segment
- }
-
- override def close(): Unit = synchronized {
- closed = true
- stream.close()
- }
-
- private def flush() {
- hadoopFlushMethod.foreach { _.invoke(stream) }
- // Useful for local file system where hflush/sync does not work (HADOOP-7844)
- stream.getWrappedStream.flush()
- }
-
- private def assertOpen() {
- HdfsUtils.checkState(!closed, "Stream is closed. Create a new Writer to write to file.")
- }
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/1868bd40/streaming/src/test/java/org/apache/spark/streaming/JavaWriteAheadLogSuite.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaWriteAheadLogSuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaWriteAheadLogSuite.java
new file mode 100644
index 0000000..50e8f9f
--- /dev/null
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaWriteAheadLogSuite.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming;
+
+import java.util.ArrayList;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.Transformer;
+import org.apache.spark.SparkConf;
+import org.apache.spark.streaming.util.WriteAheadLog;
+import org.apache.spark.streaming.util.WriteAheadLogRecordHandle;
+import org.apache.spark.streaming.util.WriteAheadLogUtils;
+
+import org.junit.Test;
+import org.junit.Assert;
+
+class JavaWriteAheadLogSuiteHandle extends WriteAheadLogRecordHandle {
+ int index = -1;
+ public JavaWriteAheadLogSuiteHandle(int idx) {
+ index = idx;
+ }
+}
+
+public class JavaWriteAheadLogSuite extends WriteAheadLog {
+
+ class Record {
+ long time;
+ int index;
+ ByteBuffer buffer;
+
+ public Record(long tym, int idx, ByteBuffer buf) {
+ index = idx;
+ time = tym;
+ buffer = buf;
+ }
+ }
+ private int index = -1;
+ private ArrayList<Record> records = new ArrayList<Record>();
+
+
+ // Methods for WriteAheadLog
+ @Override
+ public WriteAheadLogRecordHandle write(java.nio.ByteBuffer record, long time) {
+ index += 1;
+ records.add(new org.apache.spark.streaming.JavaWriteAheadLogSuite.Record(time, index, record));
+ return new JavaWriteAheadLogSuiteHandle(index);
+ }
+
+ @Override
+ public java.nio.ByteBuffer read(WriteAheadLogRecordHandle handle) {
+ if (handle instanceof JavaWriteAheadLogSuiteHandle) {
+ int reqdIndex = ((JavaWriteAheadLogSuiteHandle) handle).index;
+ for (Record record: records) {
+ if (record.index == reqdIndex) {
+ return record.buffer;
+ }
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public java.util.Iterator<java.nio.ByteBuffer> readAll() {
+ Collection<ByteBuffer> buffers = CollectionUtils.collect(records, new Transformer() {
+ @Override
+ public Object transform(Object input) {
+ return ((Record) input).buffer;
+ }
+ });
+ return buffers.iterator();
+ }
+
+ @Override
+ public void clean(long threshTime, boolean waitForCompletion) {
+ for (int i = 0; i < records.size(); i++) {
+ if (records.get(i).time < threshTime) {
+ records.remove(i);
+ i--;
+ }
+ }
+ }
+
+ @Override
+ public void close() {
+ records.clear();
+ }
+
+ @Test
+ public void testCustomWAL() {
+ SparkConf conf = new SparkConf();
+ conf.set("spark.streaming.driver.writeAheadLog.class", JavaWriteAheadLogSuite.class.getName());
+ WriteAheadLog wal = WriteAheadLogUtils.createLogForDriver(conf, null, null);
+
+ String data1 = "data1";
+ WriteAheadLogRecordHandle handle = wal.write(ByteBuffer.wrap(data1.getBytes()), 1234);
+ Assert.assertTrue(handle instanceof JavaWriteAheadLogSuiteHandle);
+ Assert.assertTrue(new String(wal.read(handle).array()).equals(data1));
+
+ wal.write(ByteBuffer.wrap("data2".getBytes()), 1235);
+ wal.write(ByteBuffer.wrap("data3".getBytes()), 1236);
+ wal.write(ByteBuffer.wrap("data4".getBytes()), 1237);
+ wal.clean(1236, false);
+
+ java.util.Iterator<java.nio.ByteBuffer> dataIterator = wal.readAll();
+ ArrayList<String> readData = new ArrayList<String>();
+ while (dataIterator.hasNext()) {
+ readData.add(new String(dataIterator.next().array()));
+ }
+ Assert.assertTrue(readData.equals(Arrays.asList("data3", "data4")));
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/1868bd40/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
index c090eae..2380423 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
@@ -43,7 +43,7 @@ import WriteAheadLogSuite._
class ReceivedBlockHandlerSuite extends FunSuite with BeforeAndAfter with Matchers with Logging {
- val conf = new SparkConf().set("spark.streaming.receiver.writeAheadLog.rollingInterval", "1")
+ val conf = new SparkConf().set("spark.streaming.receiver.writeAheadLog.rollingIntervalSecs", "1")
val hadoopConf = new Configuration()
val storageLevel = StorageLevel.MEMORY_ONLY_SER
val streamId = 1
@@ -130,10 +130,13 @@ class ReceivedBlockHandlerSuite extends FunSuite with BeforeAndAfter with Matche
"Unexpected store result type"
)
// Verify the data in write ahead log files is correct
- val fileSegments = storeResults.map { _.asInstanceOf[WriteAheadLogBasedStoreResult].segment}
- val loggedData = fileSegments.flatMap { segment =>
- val reader = new WriteAheadLogRandomReader(segment.path, hadoopConf)
- val bytes = reader.read(segment)
+ val walSegments = storeResults.map { result =>
+ result.asInstanceOf[WriteAheadLogBasedStoreResult].walRecordHandle
+ }
+ val loggedData = walSegments.flatMap { walSegment =>
+ val fileSegment = walSegment.asInstanceOf[FileBasedWriteAheadLogSegment]
+ val reader = new FileBasedWriteAheadLogRandomReader(fileSegment.path, hadoopConf)
+ val bytes = reader.read(fileSegment)
reader.close()
blockManager.dataDeserialize(generateBlockId(), bytes).toList
}
@@ -148,13 +151,13 @@ class ReceivedBlockHandlerSuite extends FunSuite with BeforeAndAfter with Matche
}
}
- test("WriteAheadLogBasedBlockHandler - cleanup old blocks") {
+ test("WriteAheadLogBasedBlockHandler - clean old blocks") {
withWriteAheadLogBasedBlockHandler { handler =>
val blocks = Seq.tabulate(10) { i => IteratorBlock(Iterator(1 to i)) }
storeBlocks(handler, blocks)
val preCleanupLogFiles = getWriteAheadLogFiles()
- preCleanupLogFiles.size should be > 1
+ require(preCleanupLogFiles.size > 1)
// this depends on the number of blocks inserted using generateAndStoreData()
manualClock.getTimeMillis() shouldEqual 5000L
@@ -218,6 +221,7 @@ class ReceivedBlockHandlerSuite extends FunSuite with BeforeAndAfter with Matche
/** Instantiate a WriteAheadLogBasedBlockHandler and run a code with it */
private def withWriteAheadLogBasedBlockHandler(body: WriteAheadLogBasedBlockHandler => Unit) {
+ require(WriteAheadLogUtils.getRollingIntervalSecs(conf, isDriver = false) === 1)
val receivedBlockHandler = new WriteAheadLogBasedBlockHandler(blockManager, 1,
storageLevel, conf, hadoopConf, tempDirectory.toString, manualClock)
try {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org