You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2016/01/13 01:16:26 UTC
kafka git commit: KAFKA-3063;
LogRecoveryTest causes JVM to exit occasionally
Repository: kafka
Updated Branches:
refs/heads/trunk 72eebad43 -> 836cb1963
KAFKA-3063; LogRecoveryTest causes JVM to exit occasionally
Remove deletion of tmp file in `OffsetCheckpoint`'s constructor. This delete causes unintuitive behaviour like `LogRecoveryTest` causing a `System.exit` because the test creates an instance of `OffsetCheckpoint` in order to call `read()` on it (while unexpectedly deleting a file being written by another instance of `OffsetCheckpoint`).
Also:
* Improve error-handling in `OffsetCheckpoint`
* Also include minor performance improvements in `read()`
* Minor clean-ups to `ReplicaManager` and `LogRecoveryTest`
Author: Ismael Juma <is...@juma.me.uk>
Reviewers: Ewen Cheslack-Postava <ew...@confluent.io>
Closes #759 from ijuma/kafka-3063-log-recovery-test-exits-jvm
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/836cb196
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/836cb196
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/836cb196
Branch: refs/heads/trunk
Commit: 836cb1963330a9e342379899e0fe52b72347736e
Parents: 72eebad
Author: Ismael Juma <is...@juma.me.uk>
Authored: Tue Jan 12 16:16:10 2016 -0800
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Tue Jan 12 16:16:10 2016 -0800
----------------------------------------------------------------------
.../org/apache/kafka/common/utils/Utils.java | 23 ++++++
.../scala/kafka/server/OffsetCheckpoint.scala | 82 ++++++++++----------
.../scala/kafka/server/ReplicaManager.scala | 6 +-
.../unit/kafka/server/LogRecoveryTest.scala | 18 ++---
4 files changed, 78 insertions(+), 51 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/836cb196/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
index e725722..8df54a4 100755
--- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
@@ -23,6 +23,9 @@ import java.io.StringWriter;
import java.io.PrintWriter;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardCopyOption;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -659,4 +662,24 @@ public class Utils {
return cl;
}
+ /**
+ * Attempts to move source to target atomically and falls back to a non-atomic move if it fails.
+ *
+ * @throws IOException if both atomic and non-atomic moves fail
+ */
+ public static void atomicMoveWithFallback(Path source, Path target) throws IOException {
+ try {
+ Files.move(source, target, StandardCopyOption.ATOMIC_MOVE);
+ } catch (IOException outer) {
+ try {
+ Files.move(source, target, StandardCopyOption.REPLACE_EXISTING);
+ log.debug("Non-atomic move of " + source + " to " + target + " succeeded after atomic move failed due to "
+ + outer.getMessage());
+ } catch (IOException inner) {
+ inner.addSuppressed(outer);
+ throw inner;
+ }
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/836cb196/core/src/main/scala/kafka/server/OffsetCheckpoint.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/OffsetCheckpoint.scala b/core/src/main/scala/kafka/server/OffsetCheckpoint.scala
index 8c5b054..fe1d823 100644
--- a/core/src/main/scala/kafka/server/OffsetCheckpoint.scala
+++ b/core/src/main/scala/kafka/server/OffsetCheckpoint.scala
@@ -16,95 +16,99 @@
*/
package kafka.server
+import java.nio.file.Paths
+import java.util.regex.Pattern
+
+import org.apache.kafka.common.utils.Utils
+
import scala.collection._
import kafka.utils.Logging
import kafka.common._
import java.io._
+object OffsetCheckpoint {
+ private val WhiteSpacesPattern = Pattern.compile("\\s+")
+ private val CurrentVersion = 0
+}
+
/**
* This class saves out a map of topic/partition=>offsets to a file
*/
class OffsetCheckpoint(val file: File) extends Logging {
+ import OffsetCheckpoint._
+ private val path = file.toPath.toAbsolutePath
+ private val tempPath = Paths.get(path.toString + ".tmp")
private val lock = new Object()
- new File(file + ".tmp").delete() // try to delete any existing temp files for cleanliness
file.createNewFile() // in case the file doesn't exist
def write(offsets: Map[TopicAndPartition, Long]) {
lock synchronized {
// write to temp file and then swap with the existing file
- val temp = new File(file.getAbsolutePath + ".tmp")
-
- val fileOutputStream = new FileOutputStream(temp)
+ val fileOutputStream = new FileOutputStream(tempPath.toFile)
val writer = new BufferedWriter(new OutputStreamWriter(fileOutputStream))
try {
- // write the current version
- writer.write(0.toString)
+ writer.write(CurrentVersion.toString)
writer.newLine()
-
- // write the number of entries
+
writer.write(offsets.size.toString)
writer.newLine()
- // write the entries
offsets.foreach { case (topicPart, offset) =>
- writer.write("%s %d %d".format(topicPart.topic, topicPart.partition, offset))
+ writer.write(s"${topicPart.topic} ${topicPart.partition} $offset")
writer.newLine()
}
-
- // flush the buffer and then fsync the underlying file
+
writer.flush()
fileOutputStream.getFD().sync()
} finally {
writer.close()
}
-
- // swap new offset checkpoint file with previous one
- if(!temp.renameTo(file)) {
- // renameTo() fails on Windows if the destination file exists.
- file.delete()
- if(!temp.renameTo(file))
- throw new IOException("File rename from %s to %s failed.".format(temp.getAbsolutePath, file.getAbsolutePath))
- }
+
+ Utils.atomicMoveWithFallback(tempPath, path)
}
}
def read(): Map[TopicAndPartition, Long] = {
+
+ def malformedLineException(line: String) =
+ throw new IOException(s"Malformed line in offset checkpoint file: $line'")
+
lock synchronized {
val reader = new BufferedReader(new FileReader(file))
+ var line: String = null
try {
- var line = reader.readLine()
- if(line == null)
+ line = reader.readLine()
+ if (line == null)
return Map.empty
val version = line.toInt
version match {
- case 0 =>
+ case CurrentVersion =>
line = reader.readLine()
- if(line == null)
+ if (line == null)
return Map.empty
val expectedSize = line.toInt
- var offsets = Map[TopicAndPartition, Long]()
+ val offsets = mutable.Map[TopicAndPartition, Long]()
line = reader.readLine()
- while(line != null) {
- val pieces = line.split("\\s+")
- if(pieces.length != 3)
- throw new IOException("Malformed line in offset checkpoint file: '%s'.".format(line))
-
- val topic = pieces(0)
- val partition = pieces(1).toInt
- val offset = pieces(2).toLong
- offsets += (TopicAndPartition(topic, partition) -> offset)
- line = reader.readLine()
+ while (line != null) {
+ WhiteSpacesPattern.split(line) match {
+ case Array(topic, partition, offset) =>
+ offsets += TopicAndPartition(topic, partition.toInt) -> offset.toLong
+ line = reader.readLine()
+ case _ => throw malformedLineException(line)
+ }
}
- if(offsets.size != expectedSize)
- throw new IOException("Expected %d entries but found only %d".format(expectedSize, offsets.size))
+ if (offsets.size != expectedSize)
+ throw new IOException(s"Expected $expectedSize entries but found only ${offsets.size}")
offsets
- case _ =>
+ case _ =>
throw new IOException("Unrecognized version of the highwatermark checkpoint file: " + version)
}
+ } catch {
+ case e: NumberFormatException => malformedLineException(line)
} finally {
reader.close()
}
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/836cb196/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 5b1276e..d1e549d 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -869,10 +869,10 @@ class ReplicaManager(val config: KafkaConfig,
// Flushes the highwatermark value for all partitions to the highwatermark file
def checkpointHighWatermarks() {
- val replicas = allPartitions.values.map(_.getReplica(config.brokerId)).collect{case Some(replica) => replica}
+ val replicas = allPartitions.values.flatMap(_.getReplica(config.brokerId))
val replicasByDir = replicas.filter(_.log.isDefined).groupBy(_.log.get.dir.getParentFile.getAbsolutePath)
- for((dir, reps) <- replicasByDir) {
- val hwms = reps.map(r => (new TopicAndPartition(r) -> r.highWatermark.messageOffset)).toMap
+ for ((dir, reps) <- replicasByDir) {
+ val hwms = reps.map(r => new TopicAndPartition(r) -> r.highWatermark.messageOffset).toMap
try {
highWatermarkCheckpoints(dir).write(hwms)
} catch {
http://git-wip-us.apache.org/repos/asf/kafka/blob/836cb196/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
index 7a434aa..d11c40f 100755
--- a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
@@ -55,9 +55,9 @@ class LogRecoveryTest extends ZooKeeperTestHarness {
val message = "hello"
var producer: Producer[Int, String] = null
- def hwFile1: OffsetCheckpoint = new OffsetCheckpoint(new File(configProps1.logDirs(0), ReplicaManager.HighWatermarkFilename))
- def hwFile2: OffsetCheckpoint = new OffsetCheckpoint(new File(configProps2.logDirs(0), ReplicaManager.HighWatermarkFilename))
- var servers: Seq[KafkaServer] = Seq.empty[KafkaServer]
+ def hwFile1 = new OffsetCheckpoint(new File(configProps1.logDirs(0), ReplicaManager.HighWatermarkFilename))
+ def hwFile2 = new OffsetCheckpoint(new File(configProps2.logDirs(0), ReplicaManager.HighWatermarkFilename))
+ var servers = Seq.empty[KafkaServer]
// Some tests restart the brokers then produce more data. But since test brokers use random ports, we need
// to use a new producer that knows the new ports
@@ -81,7 +81,7 @@ class LogRecoveryTest extends ZooKeeperTestHarness {
servers = List(server1, server2)
// create topic with 1 partition, 2 replicas, one on each broker
- createTopic(zkUtils, topic, partitionReplicaAssignment = Map(0->Seq(0,1)), servers = servers)
+ createTopic(zkUtils, topic, partitionReplicaAssignment = Map(0 -> Seq(0,1)), servers = servers)
// create the producer
updateProducer()
@@ -90,7 +90,7 @@ class LogRecoveryTest extends ZooKeeperTestHarness {
@After
override def tearDown() {
producer.close()
- for(server <- servers) {
+ for (server <- servers) {
server.shutdown()
CoreUtils.rm(server.config.logDirs(0))
}
@@ -107,7 +107,7 @@ class LogRecoveryTest extends ZooKeeperTestHarness {
server2.replicaManager.getReplica(topic, 0).get.highWatermark.messageOffset == numMessages,
"Failed to update high watermark for follower after timeout")
- servers.foreach(server => server.replicaManager.checkpointHighWatermarks())
+ servers.foreach(_.replicaManager.checkpointHighWatermarks())
val leaderHW = hwFile1.read.getOrElse(TopicAndPartition(topic, 0), 0L)
assertEquals(numMessages, leaderHW)
val followerHW = hwFile2.read.getOrElse(TopicAndPartition(topic, 0), 0L)
@@ -160,7 +160,7 @@ class LogRecoveryTest extends ZooKeeperTestHarness {
server2.replicaManager.getReplica(topic, 0).get.highWatermark.messageOffset == hw,
"Failed to update high watermark for follower after timeout")
// shutdown the servers to allow the hw to be checkpointed
- servers.foreach(server => server.shutdown())
+ servers.foreach(_.shutdown())
assertEquals(hw, hwFile1.read.getOrElse(TopicAndPartition(topic, 0), 0L))
assertEquals(hw, hwFile2.read.getOrElse(TopicAndPartition(topic, 0), 0L))
}
@@ -174,7 +174,7 @@ class LogRecoveryTest extends ZooKeeperTestHarness {
server2.replicaManager.getReplica(topic, 0).get.highWatermark.messageOffset == hw,
"Failed to update high watermark for follower after timeout")
// shutdown the servers to allow the hw to be checkpointed
- servers.foreach(server => server.shutdown())
+ servers.foreach(_.shutdown())
val leaderHW = hwFile1.read.getOrElse(TopicAndPartition(topic, 0), 0L)
assertEquals(hw, leaderHW)
val followerHW = hwFile2.read.getOrElse(TopicAndPartition(topic, 0), 0L)
@@ -224,7 +224,7 @@ class LogRecoveryTest extends ZooKeeperTestHarness {
server1.replicaManager.getReplica(topic, 0).get.highWatermark.messageOffset == hw,
"Failed to update high watermark for follower after timeout")
// shutdown the servers to allow the hw to be checkpointed
- servers.foreach(server => server.shutdown())
+ servers.foreach(_.shutdown())
assertEquals(hw, hwFile1.read.getOrElse(TopicAndPartition(topic, 0), 0L))
assertEquals(hw, hwFile2.read.getOrElse(TopicAndPartition(topic, 0), 0L))
}