You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/04/08 00:04:04 UTC

kafka git commit: MINOR: Fix incorrect pattern matching on `version` in `CheckpointFile`

Repository: kafka
Updated Branches:
  refs/heads/trunk 67fc2a91a -> 82a8e83de


MINOR: Fix incorrect pattern matching on `version` in `CheckpointFile`

Also add test and refactor things a little to make testing easier.

Author: Ismael Juma <is...@juma.me.uk>

Reviewers: Ben Stopford <be...@gmail.com>, Jun Rao <ju...@gmail.com>

Closes #2822 from ijuma/hotfix-checkpoint-file


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/82a8e83d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/82a8e83d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/82a8e83d

Branch: refs/heads/trunk
Commit: 82a8e83de622f777a81e9ea0c01d6832f44e7956
Parents: 67fc2a9
Author: Ismael Juma <is...@juma.me.uk>
Authored: Sat Apr 8 01:03:49 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Sat Apr 8 01:03:49 2017 +0100

----------------------------------------------------------------------
 .../server/checkpoints/CheckpointFile.scala     | 13 +++---
 .../checkpoints/LeaderEpochCheckpointFile.scala | 49 ++++++++++----------
 .../checkpoints/OffsetCheckpointFile.scala      | 48 ++++++++++---------
 .../checkpoints/OffsetCheckpointFileTest.scala  | 15 +++++-
 4 files changed, 68 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/82a8e83d/core/src/main/scala/kafka/server/checkpoints/CheckpointFile.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/checkpoints/CheckpointFile.scala b/core/src/main/scala/kafka/server/checkpoints/CheckpointFile.scala
index c7e95d9..cc50620 100644
--- a/core/src/main/scala/kafka/server/checkpoints/CheckpointFile.scala
+++ b/core/src/main/scala/kafka/server/checkpoints/CheckpointFile.scala
@@ -59,7 +59,7 @@ class CheckpointFile[T](val file: File, version: Int, formatter: CheckpointFileF
       } catch {
         case e: FileNotFoundException =>
           if (FileSystems.getDefault.isReadOnly) {
-            fatal("Halting writes to offset checkpoint file because the underlying file system is inaccessible : ", e)
+            fatal(s"Halting writes to checkpoint file (${file.getAbsolutePath}) because the underlying file system is inaccessible: ", e)
             Exit.halt(1)
           }
           throw e
@@ -73,7 +73,7 @@ class CheckpointFile[T](val file: File, version: Int, formatter: CheckpointFileF
 
   def read(): Seq[T] = {
     def malformedLineException(line: String) =
-      new IOException(s"Malformed line in offset checkpoint file: $line'")
+      new IOException(s"Malformed line in checkpoint file (${file.getAbsolutePath}): $line'")
 
     lock synchronized {
       val reader = new BufferedReader(new InputStreamReader(new FileInputStream(file), StandardCharsets.UTF_8))
@@ -82,9 +82,8 @@ class CheckpointFile[T](val file: File, version: Int, formatter: CheckpointFileF
         line = reader.readLine()
         if (line == null)
           return Seq.empty
-        val version = line.toInt
-        version match {
-          case version =>
+        line.toInt match {
+          case fileVersion if fileVersion == version =>
             line = reader.readLine()
             if (line == null)
               return Seq.empty
@@ -101,10 +100,10 @@ class CheckpointFile[T](val file: File, version: Int, formatter: CheckpointFileF
               }
             }
             if (entries.size != expectedSize)
-              throw new IOException(s"Expected $expectedSize entries but found only ${entries.size}")
+              throw new IOException(s"Expected $expectedSize entries in checkpoint file (${file.getAbsolutePath}), but found only ${entries.size}")
             entries
           case _ =>
-            throw new IOException("Unrecognized version of the highwatermark checkpoint file: " + version)
+            throw new IOException(s"Unrecognized version of the checkpoint file (${file.getAbsolutePath}): " + version)
         }
       } catch {
         case _: NumberFormatException => throw malformedLineException(line)

http://git-wip-us.apache.org/repos/asf/kafka/blob/82a8e83d/core/src/main/scala/kafka/server/checkpoints/LeaderEpochCheckpointFile.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/checkpoints/LeaderEpochCheckpointFile.scala b/core/src/main/scala/kafka/server/checkpoints/LeaderEpochCheckpointFile.scala
index 9de7564..d32d30f 100644
--- a/core/src/main/scala/kafka/server/checkpoints/LeaderEpochCheckpointFile.scala
+++ b/core/src/main/scala/kafka/server/checkpoints/LeaderEpochCheckpointFile.scala
@@ -19,7 +19,6 @@ package kafka.server.checkpoints
 import java.io._
 import java.util.regex.Pattern
 
-import kafka.server.checkpoints.LeaderEpochCheckpointConstants.{CurrentVersion, WhiteSpacesPattern}
 import kafka.server.epoch.EpochEntry
 
 import scala.collection._
@@ -31,37 +30,37 @@ trait LeaderEpochCheckpoint {
 
 object LeaderEpochFile {
   private val LeaderEpochCheckpointFilename = "leader-epoch-checkpoint"
-  def newFile(dir: File) = {new File(dir, LeaderEpochCheckpointFilename)}
+  def newFile(dir: File): File = new File(dir, LeaderEpochCheckpointFilename)
 }
 
-private object LeaderEpochCheckpointConstants {
-  val WhiteSpacesPattern = Pattern.compile("\\s+")
-  val CurrentVersion = 0
+object LeaderEpochCheckpointFile {
+  private val WhiteSpacesPattern = Pattern.compile("\\s+")
+  private val CurrentVersion = 0
+
+  object Formatter extends CheckpointFileFormatter[EpochEntry] {
+
+    override def toLine(entry: EpochEntry): String = s"${entry.epoch} ${entry.startOffset}"
+
+    override def fromLine(line: String): Option[EpochEntry] = {
+      WhiteSpacesPattern.split(line) match {
+        case Array(epoch, offset) =>
+          Some(EpochEntry(epoch.toInt, offset.toLong))
+        case _ => None
+      }
+    }
+
+  }
 }
 
 /**
   * This class persists a map of (LeaderEpoch => Offsets) to a file (for a certain replica)
   */
-class LeaderEpochCheckpointFile(val file: File) extends CheckpointFileFormatter[EpochEntry] with LeaderEpochCheckpoint {
-  val checkpoint = new CheckpointFile[EpochEntry](file, CurrentVersion, this)
+class LeaderEpochCheckpointFile(val file: File) extends LeaderEpochCheckpoint {
+  import LeaderEpochCheckpointFile._
 
-  override def toLine(entry: EpochEntry): String = {
-    s"${entry.epoch} ${entry.startOffset}"
-  }
+  val checkpoint = new CheckpointFile[EpochEntry](file, CurrentVersion, Formatter)
 
-  override def fromLine(line: String): Option[EpochEntry] = {
-    WhiteSpacesPattern.split(line) match {
-      case Array(epoch, offset) =>
-        Some(EpochEntry(epoch.toInt, offset.toLong))
-      case _ => None
-    }
-  }
-
-  def write(epochs: Seq[EpochEntry]) = {
-    checkpoint.write(epochs)
-  }
+  def write(epochs: Seq[EpochEntry]): Unit = checkpoint.write(epochs)
 
-  def read(): Seq[EpochEntry] = {
-    checkpoint.read()
-  }
-}
\ No newline at end of file
+  def read(): Seq[EpochEntry] = checkpoint.read()
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/82a8e83d/core/src/main/scala/kafka/server/checkpoints/OffsetCheckpointFile.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/checkpoints/OffsetCheckpointFile.scala b/core/src/main/scala/kafka/server/checkpoints/OffsetCheckpointFile.scala
index 12ec986..5f5dc97 100644
--- a/core/src/main/scala/kafka/server/checkpoints/OffsetCheckpointFile.scala
+++ b/core/src/main/scala/kafka/server/checkpoints/OffsetCheckpointFile.scala
@@ -18,13 +18,29 @@ package kafka.server.checkpoints
 
 import java.io._
 import java.util.regex.Pattern
+
 import kafka.server.epoch.EpochEntry
 import org.apache.kafka.common.TopicPartition
+
 import scala.collection._
 
-private object OffsetCheckpointConstants {
-  val WhiteSpacesPattern = Pattern.compile("\\s+")
-  val CurrentVersion = 0
+object OffsetCheckpointFile {
+  private val WhiteSpacesPattern = Pattern.compile("\\s+")
+  private[checkpoints] val CurrentVersion = 0
+
+  object Formatter extends CheckpointFileFormatter[(TopicPartition, Long)] {
+    override def toLine(entry: (TopicPartition, Long)): String = {
+      s"${entry._1.topic} ${entry._1.partition} ${entry._2}"
+    }
+
+    override def fromLine(line: String): Option[(TopicPartition, Long)] = {
+      WhiteSpacesPattern.split(line) match {
+        case Array(topic, partition, offset) =>
+          Some(new TopicPartition(topic, partition.toInt), offset.toLong)
+        case _ => None
+      }
+    }
+  }
 }
 
 trait OffsetCheckpoint {
@@ -35,26 +51,12 @@ trait OffsetCheckpoint {
 /**
   * This class persists a map of (Partition => Offsets) to a file (for a certain replica)
   */
-class OffsetCheckpointFile(val f: File) extends CheckpointFileFormatter[(TopicPartition, Long)] {
-  val checkpoint = new CheckpointFile[(TopicPartition, Long)](f, OffsetCheckpointConstants.CurrentVersion, this)
+class OffsetCheckpointFile(val f: File) {
+  val checkpoint = new CheckpointFile[(TopicPartition, Long)](f, OffsetCheckpointFile.CurrentVersion,
+    OffsetCheckpointFile.Formatter)
 
-  override def toLine(entry: (TopicPartition, Long)): String = {
-    s"${entry._1.topic} ${entry._1.partition} ${entry._2}"
-  }
+  def write(offsets: Map[TopicPartition, Long]): Unit = checkpoint.write(offsets.toSeq)
 
-  override def fromLine(line: String): Option[(TopicPartition, Long)] = {
-    OffsetCheckpointConstants.WhiteSpacesPattern.split(line) match {
-      case Array(topic, partition, offset) =>
-        Some(new TopicPartition(topic, partition.toInt), offset.toLong)
-      case _ => None
-    }
-  }
+  def read(): Map[TopicPartition, Long] = checkpoint.read().toMap
 
-  def write(offsets: Map[TopicPartition, Long]) = {
-    checkpoint.write(offsets.toSeq)
-  }
-
-  def read(): Map[TopicPartition, Long] = {
-    checkpoint.read().toMap
-  }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/82a8e83d/core/src/test/scala/unit/kafka/server/checkpoints/OffsetCheckpointFileTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/checkpoints/OffsetCheckpointFileTest.scala b/core/src/test/scala/unit/kafka/server/checkpoints/OffsetCheckpointFileTest.scala
index cc49ccf..4daaa72 100644
--- a/core/src/test/scala/unit/kafka/server/checkpoints/OffsetCheckpointFileTest.scala
+++ b/core/src/test/scala/unit/kafka/server/checkpoints/OffsetCheckpointFileTest.scala
@@ -14,8 +14,10 @@
   * See the License for the specific language governing permissions and
   * limitations under the License.
   */
-package unit.kafka.server.checkpoints
-import kafka.server.checkpoints.{OffsetCheckpointFile}
+package kafka.server.checkpoints
+
+import java.io.IOException
+
 import kafka.utils.{Logging, TestUtils}
 import org.apache.kafka.common.TopicPartition
 import org.junit.Assert._
@@ -86,4 +88,13 @@ class OffsetCheckpointFileTest extends JUnitSuite with Logging {
     //Then
     assertEquals(Map(), checkpoint.read())
   }
+
+  @Test(expected = classOf[IOException])
+  def shouldThrowIfVersionIsNotRecognised(): Unit = {
+    val checkpointFile = new CheckpointFile(TestUtils.tempFile(), OffsetCheckpointFile.CurrentVersion + 1,
+      OffsetCheckpointFile.Formatter)
+    checkpointFile.write(Seq(new TopicPartition("foo", 5) -> 10L))
+    new OffsetCheckpointFile(checkpointFile.file).read()
+  }
+
 }