You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2017/03/16 20:05:39 UTC

spark git commit: [SPARK-19721][SS] Good error message for version mismatch in log files

Repository: spark
Updated Branches:
  refs/heads/master 8e8f89833 -> 2ea214dd0


[SPARK-19721][SS] Good error message for version mismatch in log files

## Problem

There are several places where we write out version identifiers in various logs for structured streaming (usually `v1`). However, in the places where we check for this, we throw a confusing error message.

## What changes were proposed in this pull request?

This patch made two major changes:
1. added a `parseVersion(...)` method, and based on this method, fixed the following places the way they did version checking (no other place needed to do this checking):
```
HDFSMetadataLog
  - CompactibleFileStreamLog  ------------> fixed with this patch
    - FileStreamSourceLog  ---------------> inherited the fix of `CompactibleFileStreamLog`
    - FileStreamSinkLog  -----------------> inherited the fix of `CompactibleFileStreamLog`
  - OffsetSeqLog  ------------------------> fixed with this patch
  - anonymous subclass in KafkaSource  ---> fixed with this patch
```

2. changed the type of `FileStreamSinkLog.VERSION`, `FileStreamSourceLog.VERSION` etc. from `String` to `Int`, so that we can identify newer versions via `version > 1` instead of `version != "v1"`
    - note this didn't break any backwards compatibility -- we are still writing out `"v1"` and reading back `"v1"`

## Exception message with this patch
```
java.lang.IllegalStateException: Failed to read log file /private/var/folders/nn/82rmvkk568sd8p3p8tb33trw0000gn/T/spark-86867b65-0069-4ef1-b0eb-d8bd258ff5b8/0. UnsupportedLogVersion: maximum supported log version is v1, but encountered v99. The log file was produced by a newer version of Spark and cannot be read by this version. Please upgrade.
	at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.get(HDFSMetadataLog.scala:202)
	at org.apache.spark.sql.execution.streaming.OffsetSeqLogSuite$$anonfun$3$$anonfun$apply$mcV$sp$2.apply(OffsetSeqLogSuite.scala:78)
	at org.apache.spark.sql.execution.streaming.OffsetSeqLogSuite$$anonfun$3$$anonfun$apply$mcV$sp$2.apply(OffsetSeqLogSuite.scala:75)
	at org.apache.spark.sql.test.SQLTestUtils$class.withTempDir(SQLTestUtils.scala:133)
	at org.apache.spark.sql.execution.streaming.OffsetSeqLogSuite.withTempDir(OffsetSeqLogSuite.scala:26)
	at org.apache.spark.sql.execution.streaming.OffsetSeqLogSuite$$anonfun$3.apply$mcV$sp(OffsetSeqLogSuite.scala:75)
	at org.apache.spark.sql.execution.streaming.OffsetSeqLogSuite$$anonfun$3.apply(OffsetSeqLogSuite.scala:75)
	at org.apache.spark.sql.execution.streaming.OffsetSeqLogSuite$$anonfun$3.apply(OffsetSeqLogSuite.scala:75)
	at org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22)
	at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
```

## How was this patch tested?

unit tests

Author: Liwei Lin <lw...@gmail.com>

Closes #17070 from lw-lin/better-msg.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2ea214dd
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2ea214dd
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2ea214dd

Branch: refs/heads/master
Commit: 2ea214dd05da929840c15891e908384cfa695ca8
Parents: 8e8f898
Author: Liwei Lin <lw...@gmail.com>
Authored: Thu Mar 16 13:05:36 2017 -0700
Committer: Shixiong Zhu <sh...@databricks.com>
Committed: Thu Mar 16 13:05:36 2017 -0700

----------------------------------------------------------------------
 .../apache/spark/sql/kafka010/KafkaSource.scala | 14 +++----
 .../spark/sql/kafka010/KafkaSourceSuite.scala   |  9 ++++-
 .../streaming/CompactibleFileStreamLog.scala    |  9 ++---
 .../execution/streaming/FileStreamSinkLog.scala |  4 +-
 .../streaming/FileStreamSourceLog.scala         |  4 +-
 .../execution/streaming/HDFSMetadataLog.scala   | 36 ++++++++++++++++++
 .../sql/execution/streaming/OffsetSeqLog.scala  | 10 ++---
 .../CompactibleFileStreamLogSuite.scala         | 40 +++++++++++++++++---
 .../streaming/FileStreamSinkLogSuite.scala      |  8 ++--
 .../streaming/HDFSMetadataLogSuite.scala        | 27 +++++++++++++
 .../execution/streaming/OffsetSeqLogSuite.scala | 17 +++++++++
 11 files changed, 143 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/2ea214dd/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
index 92b5d91..1fb0a33 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
@@ -100,7 +100,7 @@ private[kafka010] class KafkaSource(
         override def serialize(metadata: KafkaSourceOffset, out: OutputStream): Unit = {
           out.write(0) // A zero byte is written to support Spark 2.1.0 (SPARK-19517)
           val writer = new BufferedWriter(new OutputStreamWriter(out, StandardCharsets.UTF_8))
-          writer.write(VERSION)
+          writer.write("v" + VERSION + "\n")
           writer.write(metadata.json)
           writer.flush
         }
@@ -111,13 +111,13 @@ private[kafka010] class KafkaSource(
           // HDFSMetadataLog guarantees that it never creates a partial file.
           assert(content.length != 0)
           if (content(0) == 'v') {
-            if (content.startsWith(VERSION)) {
-              KafkaSourceOffset(SerializedOffset(content.substring(VERSION.length)))
+            val indexOfNewLine = content.indexOf("\n")
+            if (indexOfNewLine > 0) {
+              val version = parseVersion(content.substring(0, indexOfNewLine), VERSION)
+              KafkaSourceOffset(SerializedOffset(content.substring(indexOfNewLine + 1)))
             } else {
-              val versionInFile = content.substring(0, content.indexOf("\n"))
               throw new IllegalStateException(
-                s"Unsupported format. Expected version is ${VERSION.stripLineEnd} " +
-                  s"but was $versionInFile. Please upgrade your Spark.")
+                s"Log file was malformed: failed to detect the log file version line.")
             }
           } else {
             // The log was generated by Spark 2.1.0
@@ -351,7 +351,7 @@ private[kafka010] object KafkaSource {
       | source option "failOnDataLoss" to "false".
     """.stripMargin
 
-  private val VERSION = "v1\n"
+  private[kafka010] val VERSION = 1
 
   def getSortedExecutorList(sc: SparkContext): Array[String] = {
     val bm = sc.env.blockManager

http://git-wip-us.apache.org/repos/asf/spark/blob/2ea214dd/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
index bf6aad6..7b6396e 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
@@ -205,7 +205,7 @@ class KafkaSourceSuite extends KafkaSourceTest {
           override def serialize(metadata: KafkaSourceOffset, out: OutputStream): Unit = {
             out.write(0)
             val writer = new BufferedWriter(new OutputStreamWriter(out, UTF_8))
-            writer.write(s"v0\n${metadata.json}")
+            writer.write(s"v99999\n${metadata.json}")
             writer.flush
           }
         }
@@ -227,7 +227,12 @@ class KafkaSourceSuite extends KafkaSourceTest {
         source.getOffset.get // Read initial offset
       }
 
-      assert(e.getMessage.contains("Please upgrade your Spark"))
+      Seq(
+        s"maximum supported log version is v${KafkaSource.VERSION}, but encountered v99999",
+        "produced by a newer version of Spark and cannot be read by this version"
+      ).foreach { message =>
+        assert(e.getMessage.contains(message))
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/2ea214dd/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala
index 5a6f9e8..408c8f8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala
@@ -40,7 +40,7 @@ import org.apache.spark.sql.SparkSession
  * doing a compaction, it will read all old log files and merge them with the new batch.
  */
 abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag](
-    metadataLogVersion: String,
+    metadataLogVersion: Int,
     sparkSession: SparkSession,
     path: String)
   extends HDFSMetadataLog[Array[T]](sparkSession, path) {
@@ -134,7 +134,7 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag](
 
   override def serialize(logData: Array[T], out: OutputStream): Unit = {
     // called inside a try-finally where the underlying stream is closed in the caller
-    out.write(metadataLogVersion.getBytes(UTF_8))
+    out.write(("v" + metadataLogVersion).getBytes(UTF_8))
     logData.foreach { data =>
       out.write('\n')
       out.write(Serialization.write(data).getBytes(UTF_8))
@@ -146,10 +146,7 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag](
     if (!lines.hasNext) {
       throw new IllegalStateException("Incomplete log file")
     }
-    val version = lines.next()
-    if (version != metadataLogVersion) {
-      throw new IllegalStateException(s"Unknown log version: ${version}")
-    }
+    val version = parseVersion(lines.next(), metadataLogVersion)
     lines.map(Serialization.read[T]).toArray
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/2ea214dd/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala
index eb6eed8..8d718b2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala
@@ -77,7 +77,7 @@ object SinkFileStatus {
  * (drops the deleted files).
  */
 class FileStreamSinkLog(
-    metadataLogVersion: String,
+    metadataLogVersion: Int,
     sparkSession: SparkSession,
     path: String)
   extends CompactibleFileStreamLog[SinkFileStatus](metadataLogVersion, sparkSession, path) {
@@ -106,7 +106,7 @@ class FileStreamSinkLog(
 }
 
 object FileStreamSinkLog {
-  val VERSION = "v1"
+  val VERSION = 1
   val DELETE_ACTION = "delete"
   val ADD_ACTION = "add"
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/2ea214dd/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala
index 81908c0..33e6a1d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala
@@ -30,7 +30,7 @@ import org.apache.spark.sql.execution.streaming.FileStreamSource.FileEntry
 import org.apache.spark.sql.internal.SQLConf
 
 class FileStreamSourceLog(
-    metadataLogVersion: String,
+    metadataLogVersion: Int,
     sparkSession: SparkSession,
     path: String)
   extends CompactibleFileStreamLog[FileEntry](metadataLogVersion, sparkSession, path) {
@@ -120,5 +120,5 @@ class FileStreamSourceLog(
 }
 
 object FileStreamSourceLog {
-  val VERSION = "v1"
+  val VERSION = 1
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/2ea214dd/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
index f9e1f7d..60ce642 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
@@ -195,6 +195,11 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
       val input = fileManager.open(batchMetadataFile)
       try {
         Some(deserialize(input))
+      } catch {
+        case ise: IllegalStateException =>
+          // re-throw the exception with the log file path added
+          throw new IllegalStateException(
+            s"Failed to read log file $batchMetadataFile. ${ise.getMessage}", ise)
       } finally {
         IOUtils.closeQuietly(input)
       }
@@ -268,6 +273,37 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
         new FileSystemManager(metadataPath, hadoopConf)
     }
   }
+
+  /**
+   * Parse the log version from the given `text` -- will throw exception when the parsed version
+   * exceeds `maxSupportedVersion`, or when `text` is malformed (such as "xyz", "v", "v-1",
+   * "v123xyz" etc.)
+   */
+  private[sql] def parseVersion(text: String, maxSupportedVersion: Int): Int = {
+    if (text.length > 0 && text(0) == 'v') {
+      val version =
+        try {
+          text.substring(1, text.length).toInt
+        } catch {
+          case _: NumberFormatException =>
+            throw new IllegalStateException(s"Log file was malformed: failed to read correct log " +
+              s"version from $text.")
+        }
+      if (version > 0) {
+        if (version > maxSupportedVersion) {
+          throw new IllegalStateException(s"UnsupportedLogVersion: maximum supported log version " +
+            s"is v${maxSupportedVersion}, but encountered v$version. The log file was produced " +
+            s"by a newer version of Spark and cannot be read by this version. Please upgrade.")
+        } else {
+          return version
+        }
+      }
+    }
+
+    // reaching here means we failed to read the correct log version
+    throw new IllegalStateException(s"Log file was malformed: failed to read correct log " +
+      s"version from $text.")
+  }
 }
 
 object HDFSMetadataLog {

http://git-wip-us.apache.org/repos/asf/spark/blob/2ea214dd/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLog.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLog.scala
index 3210d8a..4f8cd11 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLog.scala
@@ -55,10 +55,8 @@ class OffsetSeqLog(sparkSession: SparkSession, path: String)
     if (!lines.hasNext) {
       throw new IllegalStateException("Incomplete log file")
     }
-    val version = lines.next()
-    if (version != OffsetSeqLog.VERSION) {
-      throw new IllegalStateException(s"Unknown log version: ${version}")
-    }
+
+    val version = parseVersion(lines.next(), OffsetSeqLog.VERSION)
 
     // read metadata
     val metadata = lines.next().trim match {
@@ -70,7 +68,7 @@ class OffsetSeqLog(sparkSession: SparkSession, path: String)
 
   override protected def serialize(offsetSeq: OffsetSeq, out: OutputStream): Unit = {
     // called inside a try-finally where the underlying stream is closed in the caller
-    out.write(OffsetSeqLog.VERSION.getBytes(UTF_8))
+    out.write(("v" + OffsetSeqLog.VERSION).getBytes(UTF_8))
 
     // write metadata
     out.write('\n')
@@ -88,6 +86,6 @@ class OffsetSeqLog(sparkSession: SparkSession, path: String)
 }
 
 object OffsetSeqLog {
-  private val VERSION = "v1"
+  private[streaming] val VERSION = 1
   private val SERIALIZED_VOID_OFFSET = "-"
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/2ea214dd/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala
index 24d92a9..20ac06f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala
@@ -122,7 +122,7 @@ class CompactibleFileStreamLogSuite extends SparkFunSuite with SharedSQLContext
       defaultMinBatchesToRetain = 1,
       compactibleLog => {
         val logs = Array("entry_1", "entry_2", "entry_3")
-        val expected = s"""${FakeCompactibleFileStreamLog.VERSION}
+        val expected = s"""v${FakeCompactibleFileStreamLog.VERSION}
             |"entry_1"
             |"entry_2"
             |"entry_3"""".stripMargin
@@ -132,7 +132,7 @@ class CompactibleFileStreamLogSuite extends SparkFunSuite with SharedSQLContext
 
         baos.reset()
         compactibleLog.serialize(Array(), baos)
-        assert(FakeCompactibleFileStreamLog.VERSION === baos.toString(UTF_8.name()))
+        assert(s"v${FakeCompactibleFileStreamLog.VERSION}" === baos.toString(UTF_8.name()))
       })
   }
 
@@ -142,7 +142,7 @@ class CompactibleFileStreamLogSuite extends SparkFunSuite with SharedSQLContext
       defaultCompactInterval = 3,
       defaultMinBatchesToRetain = 1,
       compactibleLog => {
-        val logs = s"""${FakeCompactibleFileStreamLog.VERSION}
+        val logs = s"""v${FakeCompactibleFileStreamLog.VERSION}
             |"entry_1"
             |"entry_2"
             |"entry_3"""".stripMargin
@@ -152,10 +152,36 @@ class CompactibleFileStreamLogSuite extends SparkFunSuite with SharedSQLContext
 
         assert(Nil ===
           compactibleLog.deserialize(
-            new ByteArrayInputStream(FakeCompactibleFileStreamLog.VERSION.getBytes(UTF_8))))
+            new ByteArrayInputStream(s"v${FakeCompactibleFileStreamLog.VERSION}".getBytes(UTF_8))))
       })
   }
 
+  test("deserialization log written by future version") {
+    withTempDir { dir =>
+      def newFakeCompactibleFileStreamLog(version: Int): FakeCompactibleFileStreamLog =
+        new FakeCompactibleFileStreamLog(
+          version,
+          _fileCleanupDelayMs = Long.MaxValue, // this param does not matter here in this test case
+          _defaultCompactInterval = 3,         // this param does not matter here in this test case
+          _defaultMinBatchesToRetain = 1,      // this param does not matter here in this test case
+          spark,
+          dir.getCanonicalPath)
+
+      val writer = newFakeCompactibleFileStreamLog(version = 2)
+      val reader = newFakeCompactibleFileStreamLog(version = 1)
+      writer.add(0, Array("entry"))
+      val e = intercept[IllegalStateException] {
+        reader.get(0)
+      }
+      Seq(
+        "maximum supported log version is v1, but encountered v2",
+        "produced by a newer version of Spark and cannot be read by this version"
+      ).foreach { message =>
+        assert(e.getMessage.contains(message))
+      }
+    }
+  }
+
   test("compact") {
     withFakeCompactibleFileStreamLog(
       fileCleanupDelayMs = Long.MaxValue,
@@ -219,6 +245,7 @@ class CompactibleFileStreamLogSuite extends SparkFunSuite with SharedSQLContext
   ): Unit = {
     withTempDir { file =>
       val compactibleLog = new FakeCompactibleFileStreamLog(
+        FakeCompactibleFileStreamLog.VERSION,
         fileCleanupDelayMs,
         defaultCompactInterval,
         defaultMinBatchesToRetain,
@@ -230,17 +257,18 @@ class CompactibleFileStreamLogSuite extends SparkFunSuite with SharedSQLContext
 }
 
 object FakeCompactibleFileStreamLog {
-  val VERSION = "test_version"
+  val VERSION = 1
 }
 
 class FakeCompactibleFileStreamLog(
+    metadataLogVersion: Int,
     _fileCleanupDelayMs: Long,
     _defaultCompactInterval: Int,
     _defaultMinBatchesToRetain: Int,
     sparkSession: SparkSession,
     path: String)
   extends CompactibleFileStreamLog[String](
-    FakeCompactibleFileStreamLog.VERSION,
+    metadataLogVersion,
     sparkSession,
     path
   ) {

http://git-wip-us.apache.org/repos/asf/spark/blob/2ea214dd/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala
index 340d294..dd3a414 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala
@@ -74,7 +74,7 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext {
           action = FileStreamSinkLog.ADD_ACTION))
 
       // scalastyle:off
-      val expected = s"""$VERSION
+      val expected = s"""v$VERSION
           |{"path":"/a/b/x","size":100,"isDir":false,"modificationTime":1000,"blockReplication":1,"blockSize":10000,"action":"add"}
           |{"path":"/a/b/y","size":200,"isDir":false,"modificationTime":2000,"blockReplication":2,"blockSize":20000,"action":"delete"}
           |{"path":"/a/b/z","size":300,"isDir":false,"modificationTime":3000,"blockReplication":3,"blockSize":30000,"action":"add"}""".stripMargin
@@ -84,14 +84,14 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext {
       assert(expected === baos.toString(UTF_8.name()))
       baos.reset()
       sinkLog.serialize(Array(), baos)
-      assert(VERSION === baos.toString(UTF_8.name()))
+      assert(s"v$VERSION" === baos.toString(UTF_8.name()))
     }
   }
 
   test("deserialize") {
     withFileStreamSinkLog { sinkLog =>
       // scalastyle:off
-      val logs = s"""$VERSION
+      val logs = s"""v$VERSION
           |{"path":"/a/b/x","size":100,"isDir":false,"modificationTime":1000,"blockReplication":1,"blockSize":10000,"action":"add"}
           |{"path":"/a/b/y","size":200,"isDir":false,"modificationTime":2000,"blockReplication":2,"blockSize":20000,"action":"delete"}
           |{"path":"/a/b/z","size":300,"isDir":false,"modificationTime":3000,"blockReplication":3,"blockSize":30000,"action":"add"}""".stripMargin
@@ -125,7 +125,7 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext {
 
       assert(expected === sinkLog.deserialize(new ByteArrayInputStream(logs.getBytes(UTF_8))))
 
-      assert(Nil === sinkLog.deserialize(new ByteArrayInputStream(VERSION.getBytes(UTF_8))))
+      assert(Nil === sinkLog.deserialize(new ByteArrayInputStream(s"v$VERSION".getBytes(UTF_8))))
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/2ea214dd/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
index 55750b9..662c446 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
@@ -127,6 +127,33 @@ class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext {
     }
   }
 
+  test("HDFSMetadataLog: parseVersion") {
+    withTempDir { dir =>
+      val metadataLog = new HDFSMetadataLog[String](spark, dir.getAbsolutePath)
+      def assertLogFileMalformed(func: => Int): Unit = {
+        val e = intercept[IllegalStateException] { func }
+        assert(e.getMessage.contains(s"Log file was malformed: failed to read correct log version"))
+      }
+      assertLogFileMalformed { metadataLog.parseVersion("", 100) }
+      assertLogFileMalformed { metadataLog.parseVersion("xyz", 100) }
+      assertLogFileMalformed { metadataLog.parseVersion("v10.x", 100) }
+      assertLogFileMalformed { metadataLog.parseVersion("10", 100) }
+      assertLogFileMalformed { metadataLog.parseVersion("v0", 100) }
+      assertLogFileMalformed { metadataLog.parseVersion("v-10", 100) }
+
+      assert(metadataLog.parseVersion("v10", 10) === 10)
+      assert(metadataLog.parseVersion("v10", 100) === 10)
+
+      val e = intercept[IllegalStateException] { metadataLog.parseVersion("v200", 100) }
+      Seq(
+        "maximum supported log version is v100, but encountered v200",
+        "produced by a newer version of Spark and cannot be read by this version"
+      ).foreach { message =>
+        assert(e.getMessage.contains(message))
+      }
+    }
+  }
+
   test("HDFSMetadataLog: restart") {
     withTempDir { temp =>
       val metadataLog = new HDFSMetadataLog[String](spark, temp.getAbsolutePath)

http://git-wip-us.apache.org/repos/asf/spark/blob/2ea214dd/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala
index 5ae8b24..f7f0dad 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.streaming
 import java.io.File
 
 import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.util.stringToFile
 import org.apache.spark.sql.test.SharedSQLContext
 
 class OffsetSeqLogSuite extends SparkFunSuite with SharedSQLContext {
@@ -70,6 +71,22 @@ class OffsetSeqLogSuite extends SparkFunSuite with SharedSQLContext {
     }
   }
 
+  test("deserialization log written by future version") {
+    withTempDir { dir =>
+      stringToFile(new File(dir, "0"), "v99999")
+      val log = new OffsetSeqLog(spark, dir.getCanonicalPath)
+      val e = intercept[IllegalStateException] {
+        log.get(0)
+      }
+      Seq(
+        s"maximum supported log version is v${OffsetSeqLog.VERSION}, but encountered v99999",
+        "produced by a newer version of Spark and cannot be read by this version"
+      ).foreach { message =>
+        assert(e.getMessage.contains(message))
+      }
+    }
+  }
+
   test("read Spark 2.1.0 log format") {
     val (batchId, offsetSeq) = readFromResource("offset-log-version-2.1.0")
     assert(batchId === 0)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org