You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2016/01/13 01:16:26 UTC

kafka git commit: KAFKA-3063; LogRecoveryTest causes JVM to exit occasionally

Repository: kafka
Updated Branches:
  refs/heads/trunk 72eebad43 -> 836cb1963


KAFKA-3063; LogRecoveryTest causes JVM to exit occasionally

Remove deletion of tmp file in `OffsetCheckpoint`'s constructor. This delete causes unintuitive behaviour like `LogRecoveryTest` causing a `System.exit` because the test creates an instance of `OffsetCheckpoint` in order to call `read()` on it (while unexpectedly deleting a file being written by another instance of `OffsetCheckpoint`).

Also:
* Improve error-handling in `OffsetCheckpoint`
* Also include minor performance improvements in `read()`
* Minor clean-ups to `ReplicaManager` and `LogRecoveryTest`

Author: Ismael Juma <is...@juma.me.uk>

Reviewers: Ewen Cheslack-Postava <ew...@confluent.io>

Closes #759 from ijuma/kafka-3063-log-recovery-test-exits-jvm


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/836cb196
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/836cb196
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/836cb196

Branch: refs/heads/trunk
Commit: 836cb1963330a9e342379899e0fe52b72347736e
Parents: 72eebad
Author: Ismael Juma <is...@juma.me.uk>
Authored: Tue Jan 12 16:16:10 2016 -0800
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Tue Jan 12 16:16:10 2016 -0800

----------------------------------------------------------------------
 .../org/apache/kafka/common/utils/Utils.java    | 23 ++++++
 .../scala/kafka/server/OffsetCheckpoint.scala   | 82 ++++++++++----------
 .../scala/kafka/server/ReplicaManager.scala     |  6 +-
 .../unit/kafka/server/LogRecoveryTest.scala     | 18 ++---
 4 files changed, 78 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/836cb196/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
index e725722..8df54a4 100755
--- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
@@ -23,6 +23,9 @@ import java.io.StringWriter;
 import java.io.PrintWriter;
 import java.nio.ByteBuffer;
 import java.nio.MappedByteBuffer;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardCopyOption;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -659,4 +662,24 @@ public class Utils {
             return cl;
     }
 
+    /**
+     * Attempts to move source to target atomically and falls back to a non-atomic move if it fails.
+     *
+     * @throws IOException if both atomic and non-atomic moves fail
+     */
+    public static void atomicMoveWithFallback(Path source, Path target) throws IOException {
+        try {
+            Files.move(source, target, StandardCopyOption.ATOMIC_MOVE);
+        } catch (IOException outer) {
+            try {
+                Files.move(source, target, StandardCopyOption.REPLACE_EXISTING);
+                log.debug("Non-atomic move of " + source + " to " + target + " succeeded after atomic move failed due to "
+                        + outer.getMessage());
+            } catch (IOException inner) {
+                inner.addSuppressed(outer);
+                throw inner;
+            }
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/836cb196/core/src/main/scala/kafka/server/OffsetCheckpoint.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/OffsetCheckpoint.scala b/core/src/main/scala/kafka/server/OffsetCheckpoint.scala
index 8c5b054..fe1d823 100644
--- a/core/src/main/scala/kafka/server/OffsetCheckpoint.scala
+++ b/core/src/main/scala/kafka/server/OffsetCheckpoint.scala
@@ -16,95 +16,99 @@
  */
 package kafka.server
 
+import java.nio.file.Paths
+import java.util.regex.Pattern
+
+import org.apache.kafka.common.utils.Utils
+
 import scala.collection._
 import kafka.utils.Logging
 import kafka.common._
 import java.io._
 
+object OffsetCheckpoint {
+  private val WhiteSpacesPattern = Pattern.compile("\\s+")
+  private val CurrentVersion = 0
+}
+
 /**
  * This class saves out a map of topic/partition=>offsets to a file
  */
 class OffsetCheckpoint(val file: File) extends Logging {
+  import OffsetCheckpoint._
+  private val path = file.toPath.toAbsolutePath
+  private val tempPath = Paths.get(path.toString + ".tmp")
   private val lock = new Object()
-  new File(file + ".tmp").delete() // try to delete any existing temp files for cleanliness
   file.createNewFile() // in case the file doesn't exist
 
   def write(offsets: Map[TopicAndPartition, Long]) {
     lock synchronized {
       // write to temp file and then swap with the existing file
-      val temp = new File(file.getAbsolutePath + ".tmp")
-
-      val fileOutputStream = new FileOutputStream(temp)
+      val fileOutputStream = new FileOutputStream(tempPath.toFile)
       val writer = new BufferedWriter(new OutputStreamWriter(fileOutputStream))
       try {
-        // write the current version
-        writer.write(0.toString)
+        writer.write(CurrentVersion.toString)
         writer.newLine()
-      
-        // write the number of entries
+
         writer.write(offsets.size.toString)
         writer.newLine()
 
-        // write the entries
         offsets.foreach { case (topicPart, offset) =>
-          writer.write("%s %d %d".format(topicPart.topic, topicPart.partition, offset))
+          writer.write(s"${topicPart.topic} ${topicPart.partition} $offset")
           writer.newLine()
         }
-      
-        // flush the buffer and then fsync the underlying file
+
         writer.flush()
         fileOutputStream.getFD().sync()
       } finally {
         writer.close()
       }
-      
-      // swap new offset checkpoint file with previous one
-      if(!temp.renameTo(file)) {
-        // renameTo() fails on Windows if the destination file exists.
-        file.delete()
-        if(!temp.renameTo(file))
-          throw new IOException("File rename from %s to %s failed.".format(temp.getAbsolutePath, file.getAbsolutePath))
-      }
+
+      Utils.atomicMoveWithFallback(tempPath, path)
     }
   }
 
   def read(): Map[TopicAndPartition, Long] = {
+
+    def malformedLineException(line: String) =
+      throw new IOException(s"Malformed line in offset checkpoint file: $line'")
+
     lock synchronized {
       val reader = new BufferedReader(new FileReader(file))
+      var line: String = null
       try {
-        var line = reader.readLine()
-        if(line == null)
+        line = reader.readLine()
+        if (line == null)
           return Map.empty
         val version = line.toInt
         version match {
-          case 0 =>
+          case CurrentVersion =>
             line = reader.readLine()
-            if(line == null)
+            if (line == null)
               return Map.empty
             val expectedSize = line.toInt
-            var offsets = Map[TopicAndPartition, Long]()
+            val offsets = mutable.Map[TopicAndPartition, Long]()
             line = reader.readLine()
-            while(line != null) {
-              val pieces = line.split("\\s+")
-              if(pieces.length != 3)
-                throw new IOException("Malformed line in offset checkpoint file: '%s'.".format(line))
-              
-              val topic = pieces(0)
-              val partition = pieces(1).toInt
-              val offset = pieces(2).toLong
-              offsets += (TopicAndPartition(topic, partition) -> offset)
-              line = reader.readLine()
+            while (line != null) {
+              WhiteSpacesPattern.split(line) match {
+                case Array(topic, partition, offset) =>
+                  offsets += TopicAndPartition(topic, partition.toInt) -> offset.toLong
+                  line = reader.readLine()
+                case _ => throw malformedLineException(line)
+              }
             }
-            if(offsets.size != expectedSize)
-              throw new IOException("Expected %d entries but found only %d".format(expectedSize, offsets.size))
+            if (offsets.size != expectedSize)
+              throw new IOException(s"Expected $expectedSize entries but found only ${offsets.size}")
             offsets
-          case _ => 
+          case _ =>
             throw new IOException("Unrecognized version of the highwatermark checkpoint file: " + version)
         }
+      } catch {
+        case e: NumberFormatException => malformedLineException(line)
       } finally {
         reader.close()
       }
     }
   }
   
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/836cb196/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 5b1276e..d1e549d 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -869,10 +869,10 @@ class ReplicaManager(val config: KafkaConfig,
 
   // Flushes the highwatermark value for all partitions to the highwatermark file
   def checkpointHighWatermarks() {
-    val replicas = allPartitions.values.map(_.getReplica(config.brokerId)).collect{case Some(replica) => replica}
+    val replicas = allPartitions.values.flatMap(_.getReplica(config.brokerId))
     val replicasByDir = replicas.filter(_.log.isDefined).groupBy(_.log.get.dir.getParentFile.getAbsolutePath)
-    for((dir, reps) <- replicasByDir) {
-      val hwms = reps.map(r => (new TopicAndPartition(r) -> r.highWatermark.messageOffset)).toMap
+    for ((dir, reps) <- replicasByDir) {
+      val hwms = reps.map(r => new TopicAndPartition(r) -> r.highWatermark.messageOffset).toMap
       try {
         highWatermarkCheckpoints(dir).write(hwms)
       } catch {

http://git-wip-us.apache.org/repos/asf/kafka/blob/836cb196/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
index 7a434aa..d11c40f 100755
--- a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
@@ -55,9 +55,9 @@ class LogRecoveryTest extends ZooKeeperTestHarness {
   val message = "hello"
 
   var producer: Producer[Int, String] = null
-  def hwFile1: OffsetCheckpoint = new OffsetCheckpoint(new File(configProps1.logDirs(0), ReplicaManager.HighWatermarkFilename))
-  def hwFile2: OffsetCheckpoint = new OffsetCheckpoint(new File(configProps2.logDirs(0), ReplicaManager.HighWatermarkFilename))
-  var servers: Seq[KafkaServer] = Seq.empty[KafkaServer]
+  def hwFile1 = new OffsetCheckpoint(new File(configProps1.logDirs(0), ReplicaManager.HighWatermarkFilename))
+  def hwFile2 = new OffsetCheckpoint(new File(configProps2.logDirs(0), ReplicaManager.HighWatermarkFilename))
+  var servers = Seq.empty[KafkaServer]
 
   // Some tests restart the brokers then produce more data. But since test brokers use random ports, we need
   // to use a new producer that knows the new ports
@@ -81,7 +81,7 @@ class LogRecoveryTest extends ZooKeeperTestHarness {
     servers = List(server1, server2)
 
     // create topic with 1 partition, 2 replicas, one on each broker
-    createTopic(zkUtils, topic, partitionReplicaAssignment = Map(0->Seq(0,1)), servers = servers)
+    createTopic(zkUtils, topic, partitionReplicaAssignment = Map(0 -> Seq(0,1)), servers = servers)
 
     // create the producer
     updateProducer()
@@ -90,7 +90,7 @@ class LogRecoveryTest extends ZooKeeperTestHarness {
   @After
   override def tearDown() {
     producer.close()
-    for(server <- servers) {
+    for (server <- servers) {
       server.shutdown()
       CoreUtils.rm(server.config.logDirs(0))
     }
@@ -107,7 +107,7 @@ class LogRecoveryTest extends ZooKeeperTestHarness {
       server2.replicaManager.getReplica(topic, 0).get.highWatermark.messageOffset == numMessages,
       "Failed to update high watermark for follower after timeout")
 
-    servers.foreach(server => server.replicaManager.checkpointHighWatermarks())
+    servers.foreach(_.replicaManager.checkpointHighWatermarks())
     val leaderHW = hwFile1.read.getOrElse(TopicAndPartition(topic, 0), 0L)
     assertEquals(numMessages, leaderHW)
     val followerHW = hwFile2.read.getOrElse(TopicAndPartition(topic, 0), 0L)
@@ -160,7 +160,7 @@ class LogRecoveryTest extends ZooKeeperTestHarness {
       server2.replicaManager.getReplica(topic, 0).get.highWatermark.messageOffset == hw,
       "Failed to update high watermark for follower after timeout")
     // shutdown the servers to allow the hw to be checkpointed
-    servers.foreach(server => server.shutdown())
+    servers.foreach(_.shutdown())
     assertEquals(hw, hwFile1.read.getOrElse(TopicAndPartition(topic, 0), 0L))
     assertEquals(hw, hwFile2.read.getOrElse(TopicAndPartition(topic, 0), 0L))
   }
@@ -174,7 +174,7 @@ class LogRecoveryTest extends ZooKeeperTestHarness {
       server2.replicaManager.getReplica(topic, 0).get.highWatermark.messageOffset == hw,
       "Failed to update high watermark for follower after timeout")
     // shutdown the servers to allow the hw to be checkpointed
-    servers.foreach(server => server.shutdown())
+    servers.foreach(_.shutdown())
     val leaderHW = hwFile1.read.getOrElse(TopicAndPartition(topic, 0), 0L)
     assertEquals(hw, leaderHW)
     val followerHW = hwFile2.read.getOrElse(TopicAndPartition(topic, 0), 0L)
@@ -224,7 +224,7 @@ class LogRecoveryTest extends ZooKeeperTestHarness {
       server1.replicaManager.getReplica(topic, 0).get.highWatermark.messageOffset == hw,
       "Failed to update high watermark for follower after timeout")
     // shutdown the servers to allow the hw to be checkpointed
-    servers.foreach(server => server.shutdown())
+    servers.foreach(_.shutdown())
     assertEquals(hw, hwFile1.read.getOrElse(TopicAndPartition(topic, 0), 0L))
     assertEquals(hw, hwFile2.read.getOrElse(TopicAndPartition(topic, 0), 0L))
   }