You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/04/08 00:04:04 UTC
kafka git commit: MINOR: Fix incorrect pattern matching on `version`
in `CheckpointFile`
Repository: kafka
Updated Branches:
refs/heads/trunk 67fc2a91a -> 82a8e83de
MINOR: Fix incorrect pattern matching on `version` in `CheckpointFile`
Also add test and refactor things a little to make testing easier.
Author: Ismael Juma <is...@juma.me.uk>
Reviewers: Ben Stopford <be...@gmail.com>, Jun Rao <ju...@gmail.com>
Closes #2822 from ijuma/hotfix-checkpoint-file
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/82a8e83d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/82a8e83d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/82a8e83d
Branch: refs/heads/trunk
Commit: 82a8e83de622f777a81e9ea0c01d6832f44e7956
Parents: 67fc2a9
Author: Ismael Juma <is...@juma.me.uk>
Authored: Sat Apr 8 01:03:49 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Sat Apr 8 01:03:49 2017 +0100
----------------------------------------------------------------------
.../server/checkpoints/CheckpointFile.scala | 13 +++---
.../checkpoints/LeaderEpochCheckpointFile.scala | 49 ++++++++++----------
.../checkpoints/OffsetCheckpointFile.scala | 48 ++++++++++---------
.../checkpoints/OffsetCheckpointFileTest.scala | 15 +++++-
4 files changed, 68 insertions(+), 57 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/82a8e83d/core/src/main/scala/kafka/server/checkpoints/CheckpointFile.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/checkpoints/CheckpointFile.scala b/core/src/main/scala/kafka/server/checkpoints/CheckpointFile.scala
index c7e95d9..cc50620 100644
--- a/core/src/main/scala/kafka/server/checkpoints/CheckpointFile.scala
+++ b/core/src/main/scala/kafka/server/checkpoints/CheckpointFile.scala
@@ -59,7 +59,7 @@ class CheckpointFile[T](val file: File, version: Int, formatter: CheckpointFileF
} catch {
case e: FileNotFoundException =>
if (FileSystems.getDefault.isReadOnly) {
- fatal("Halting writes to offset checkpoint file because the underlying file system is inaccessible : ", e)
+ fatal(s"Halting writes to checkpoint file (${file.getAbsolutePath}) because the underlying file system is inaccessible: ", e)
Exit.halt(1)
}
throw e
@@ -73,7 +73,7 @@ class CheckpointFile[T](val file: File, version: Int, formatter: CheckpointFileF
def read(): Seq[T] = {
def malformedLineException(line: String) =
- new IOException(s"Malformed line in offset checkpoint file: $line'")
+ new IOException(s"Malformed line in checkpoint file (${file.getAbsolutePath}): $line'")
lock synchronized {
val reader = new BufferedReader(new InputStreamReader(new FileInputStream(file), StandardCharsets.UTF_8))
@@ -82,9 +82,8 @@ class CheckpointFile[T](val file: File, version: Int, formatter: CheckpointFileF
line = reader.readLine()
if (line == null)
return Seq.empty
- val version = line.toInt
- version match {
- case version =>
+ line.toInt match {
+ case fileVersion if fileVersion == version =>
line = reader.readLine()
if (line == null)
return Seq.empty
@@ -101,10 +100,10 @@ class CheckpointFile[T](val file: File, version: Int, formatter: CheckpointFileF
}
}
if (entries.size != expectedSize)
- throw new IOException(s"Expected $expectedSize entries but found only ${entries.size}")
+ throw new IOException(s"Expected $expectedSize entries in checkpoint file (${file.getAbsolutePath}), but found only ${entries.size}")
entries
case _ =>
- throw new IOException("Unrecognized version of the highwatermark checkpoint file: " + version)
+ throw new IOException(s"Unrecognized version of the checkpoint file (${file.getAbsolutePath}): " + version)
}
} catch {
case _: NumberFormatException => throw malformedLineException(line)
http://git-wip-us.apache.org/repos/asf/kafka/blob/82a8e83d/core/src/main/scala/kafka/server/checkpoints/LeaderEpochCheckpointFile.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/checkpoints/LeaderEpochCheckpointFile.scala b/core/src/main/scala/kafka/server/checkpoints/LeaderEpochCheckpointFile.scala
index 9de7564..d32d30f 100644
--- a/core/src/main/scala/kafka/server/checkpoints/LeaderEpochCheckpointFile.scala
+++ b/core/src/main/scala/kafka/server/checkpoints/LeaderEpochCheckpointFile.scala
@@ -19,7 +19,6 @@ package kafka.server.checkpoints
import java.io._
import java.util.regex.Pattern
-import kafka.server.checkpoints.LeaderEpochCheckpointConstants.{CurrentVersion, WhiteSpacesPattern}
import kafka.server.epoch.EpochEntry
import scala.collection._
@@ -31,37 +30,37 @@ trait LeaderEpochCheckpoint {
object LeaderEpochFile {
private val LeaderEpochCheckpointFilename = "leader-epoch-checkpoint"
- def newFile(dir: File) = {new File(dir, LeaderEpochCheckpointFilename)}
+ def newFile(dir: File): File = new File(dir, LeaderEpochCheckpointFilename)
}
-private object LeaderEpochCheckpointConstants {
- val WhiteSpacesPattern = Pattern.compile("\\s+")
- val CurrentVersion = 0
+object LeaderEpochCheckpointFile {
+ private val WhiteSpacesPattern = Pattern.compile("\\s+")
+ private val CurrentVersion = 0
+
+ object Formatter extends CheckpointFileFormatter[EpochEntry] {
+
+ override def toLine(entry: EpochEntry): String = s"${entry.epoch} ${entry.startOffset}"
+
+ override def fromLine(line: String): Option[EpochEntry] = {
+ WhiteSpacesPattern.split(line) match {
+ case Array(epoch, offset) =>
+ Some(EpochEntry(epoch.toInt, offset.toLong))
+ case _ => None
+ }
+ }
+
+ }
}
/**
* This class persists a map of (LeaderEpoch => Offsets) to a file (for a certain replica)
*/
-class LeaderEpochCheckpointFile(val file: File) extends CheckpointFileFormatter[EpochEntry] with LeaderEpochCheckpoint {
- val checkpoint = new CheckpointFile[EpochEntry](file, CurrentVersion, this)
+class LeaderEpochCheckpointFile(val file: File) extends LeaderEpochCheckpoint {
+ import LeaderEpochCheckpointFile._
- override def toLine(entry: EpochEntry): String = {
- s"${entry.epoch} ${entry.startOffset}"
- }
+ val checkpoint = new CheckpointFile[EpochEntry](file, CurrentVersion, Formatter)
- override def fromLine(line: String): Option[EpochEntry] = {
- WhiteSpacesPattern.split(line) match {
- case Array(epoch, offset) =>
- Some(EpochEntry(epoch.toInt, offset.toLong))
- case _ => None
- }
- }
-
- def write(epochs: Seq[EpochEntry]) = {
- checkpoint.write(epochs)
- }
+ def write(epochs: Seq[EpochEntry]): Unit = checkpoint.write(epochs)
- def read(): Seq[EpochEntry] = {
- checkpoint.read()
- }
-}
\ No newline at end of file
+ def read(): Seq[EpochEntry] = checkpoint.read()
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/82a8e83d/core/src/main/scala/kafka/server/checkpoints/OffsetCheckpointFile.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/checkpoints/OffsetCheckpointFile.scala b/core/src/main/scala/kafka/server/checkpoints/OffsetCheckpointFile.scala
index 12ec986..5f5dc97 100644
--- a/core/src/main/scala/kafka/server/checkpoints/OffsetCheckpointFile.scala
+++ b/core/src/main/scala/kafka/server/checkpoints/OffsetCheckpointFile.scala
@@ -18,13 +18,29 @@ package kafka.server.checkpoints
import java.io._
import java.util.regex.Pattern
+
import kafka.server.epoch.EpochEntry
import org.apache.kafka.common.TopicPartition
+
import scala.collection._
-private object OffsetCheckpointConstants {
- val WhiteSpacesPattern = Pattern.compile("\\s+")
- val CurrentVersion = 0
+object OffsetCheckpointFile {
+ private val WhiteSpacesPattern = Pattern.compile("\\s+")
+ private[checkpoints] val CurrentVersion = 0
+
+ object Formatter extends CheckpointFileFormatter[(TopicPartition, Long)] {
+ override def toLine(entry: (TopicPartition, Long)): String = {
+ s"${entry._1.topic} ${entry._1.partition} ${entry._2}"
+ }
+
+ override def fromLine(line: String): Option[(TopicPartition, Long)] = {
+ WhiteSpacesPattern.split(line) match {
+ case Array(topic, partition, offset) =>
+ Some(new TopicPartition(topic, partition.toInt), offset.toLong)
+ case _ => None
+ }
+ }
+ }
}
trait OffsetCheckpoint {
@@ -35,26 +51,12 @@ trait OffsetCheckpoint {
/**
* This class persists a map of (Partition => Offsets) to a file (for a certain replica)
*/
-class OffsetCheckpointFile(val f: File) extends CheckpointFileFormatter[(TopicPartition, Long)] {
- val checkpoint = new CheckpointFile[(TopicPartition, Long)](f, OffsetCheckpointConstants.CurrentVersion, this)
+class OffsetCheckpointFile(val f: File) {
+ val checkpoint = new CheckpointFile[(TopicPartition, Long)](f, OffsetCheckpointFile.CurrentVersion,
+ OffsetCheckpointFile.Formatter)
- override def toLine(entry: (TopicPartition, Long)): String = {
- s"${entry._1.topic} ${entry._1.partition} ${entry._2}"
- }
+ def write(offsets: Map[TopicPartition, Long]): Unit = checkpoint.write(offsets.toSeq)
- override def fromLine(line: String): Option[(TopicPartition, Long)] = {
- OffsetCheckpointConstants.WhiteSpacesPattern.split(line) match {
- case Array(topic, partition, offset) =>
- Some(new TopicPartition(topic, partition.toInt), offset.toLong)
- case _ => None
- }
- }
+ def read(): Map[TopicPartition, Long] = checkpoint.read().toMap
- def write(offsets: Map[TopicPartition, Long]) = {
- checkpoint.write(offsets.toSeq)
- }
-
- def read(): Map[TopicPartition, Long] = {
- checkpoint.read().toMap
- }
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/82a8e83d/core/src/test/scala/unit/kafka/server/checkpoints/OffsetCheckpointFileTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/checkpoints/OffsetCheckpointFileTest.scala b/core/src/test/scala/unit/kafka/server/checkpoints/OffsetCheckpointFileTest.scala
index cc49ccf..4daaa72 100644
--- a/core/src/test/scala/unit/kafka/server/checkpoints/OffsetCheckpointFileTest.scala
+++ b/core/src/test/scala/unit/kafka/server/checkpoints/OffsetCheckpointFileTest.scala
@@ -14,8 +14,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package unit.kafka.server.checkpoints
-import kafka.server.checkpoints.{OffsetCheckpointFile}
+package kafka.server.checkpoints
+
+import java.io.IOException
+
import kafka.utils.{Logging, TestUtils}
import org.apache.kafka.common.TopicPartition
import org.junit.Assert._
@@ -86,4 +88,13 @@ class OffsetCheckpointFileTest extends JUnitSuite with Logging {
//Then
assertEquals(Map(), checkpoint.read())
}
+
+ @Test(expected = classOf[IOException])
+ def shouldThrowIfVersionIsNotRecognised(): Unit = {
+ val checkpointFile = new CheckpointFile(TestUtils.tempFile(), OffsetCheckpointFile.CurrentVersion + 1,
+ OffsetCheckpointFile.Formatter)
+ checkpointFile.write(Seq(new TopicPartition("foo", 5) -> 10L))
+ new OffsetCheckpointFile(checkpointFile.file).read()
+ }
+
}