You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2016/11/09 23:03:26 UTC

spark git commit: [SPARK-17829][SQL] Stable format for offset log

Repository: spark
Updated Branches:
  refs/heads/master 64fbdf1aa -> 3f62e1b5d


[SPARK-17829][SQL] Stable format for offset log

## What changes were proposed in this pull request?

Currently we use java serialization for the WAL that stores the offsets contained in each batch. This has two main issues:
It can break across spark releases (though this is not the only thing preventing us from upgrading a running query)
It is unnecessarily opaque to the user.
I'd propose we require offsets to provide a user readable serialization and use that instead. JSON is probably a good option.
## How was this patch tested?

Tests were added for KafkaSourceOffset in [KafkaSourceOffsetSuite](external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala) and for LongOffset in [OffsetSuite](sql/core/src/test/scala/org/apache/spark/sql/streaming/OffsetSuite.scala)

Please review https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark before opening a pull request.

zsxwing marmbrus

Author: Tyson Condie <tc...@gmail.com>
Author: Tyson Condie <tc...@clash.local>

Closes #15626 from tcondie/spark-8360.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3f62e1b5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3f62e1b5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3f62e1b5

Branch: refs/heads/master
Commit: 3f62e1b5d9e75dc07bac3aa4db3e8d0615cc3cc3
Parents: 64fbdf1
Author: Tyson Condie <tc...@gmail.com>
Authored: Wed Nov 9 15:03:22 2016 -0800
Committer: Shixiong Zhu <sh...@databricks.com>
Committed: Wed Nov 9 15:03:22 2016 -0800

----------------------------------------------------------------------
 .../apache/spark/sql/kafka010/JsonUtils.scala   |  2 -
 .../apache/spark/sql/kafka010/KafkaSource.scala | 19 ++++-
 .../spark/sql/kafka010/KafkaSourceOffset.scala  | 14 +++-
 .../sql/kafka010/KafkaSourceOffsetSuite.scala   | 55 +++++++++++++-
 python/pyspark/sql/streaming.py                 | 12 +--
 .../streaming/CompactibleFileStreamLog.scala    | 23 +++---
 .../execution/streaming/CompositeOffset.scala   | 50 ------------
 .../execution/streaming/FileStreamSinkLog.scala |  8 --
 .../execution/streaming/FileStreamSource.scala  |  4 +-
 .../streaming/FileStreamSourceLog.scala         |  8 --
 .../execution/streaming/HDFSMetadataLog.scala   | 22 +++---
 .../sql/execution/streaming/LongOffset.scala    | 21 ++++-
 .../spark/sql/execution/streaming/Offset.scala  | 36 ++++++++-
 .../sql/execution/streaming/OffsetSeq.scala     | 53 +++++++++++++
 .../sql/execution/streaming/OffsetSeqLog.scala  | 80 ++++++++++++++++++++
 .../spark/sql/execution/streaming/Source.scala  |  8 ++
 .../execution/streaming/StreamExecution.scala   | 11 ++-
 .../execution/streaming/StreamProgress.scala    |  4 +-
 .../spark/sql/execution/streaming/memory.scala  | 32 ++++----
 .../spark/sql/execution/streaming/socket.scala  | 25 +++---
 .../sql/streaming/StreamingQueryException.scala |  6 +-
 .../sql/streaming/StreamingQueryStatus.scala    |  6 +-
 .../execution/streaming/OffsetSeqLogSuite.scala | 63 +++++++++++++++
 .../spark/sql/streaming/OffsetSuite.scala       | 24 ++----
 .../streaming/StreamingQueryStatusSuite.scala   | 16 ++--
 .../sql/streaming/StreamingQuerySuite.scala     | 38 +++++-----
 26 files changed, 446 insertions(+), 194 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/3f62e1b5/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala
index 40d568a..13d7170 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala
@@ -17,8 +17,6 @@
 
 package org.apache.spark.sql.kafka010
 
-import java.io.Writer
-
 import scala.collection.mutable.HashMap
 import scala.util.control.NonFatal
 

http://git-wip-us.apache.org/repos/asf/spark/blob/3f62e1b5/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
index b21508c..5bcc512 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
@@ -18,6 +18,8 @@
 package org.apache.spark.sql.kafka010
 
 import java.{util => ju}
+import java.io._
+import java.nio.charset.StandardCharsets
 
 import scala.collection.JavaConverters._
 import scala.util.control.NonFatal
@@ -114,7 +116,22 @@ private[kafka010] case class KafkaSource(
    * `KafkaConsumer.poll` may hang forever (KAFKA-1894).
    */
   private lazy val initialPartitionOffsets = {
-    val metadataLog = new HDFSMetadataLog[KafkaSourceOffset](sqlContext.sparkSession, metadataPath)
+    val metadataLog =
+      new HDFSMetadataLog[KafkaSourceOffset](sqlContext.sparkSession, metadataPath) {
+        override def serialize(metadata: KafkaSourceOffset, out: OutputStream): Unit = {
+          val bytes = metadata.json.getBytes(StandardCharsets.UTF_8)
+          out.write(bytes.length)
+          out.write(bytes)
+        }
+
+        override def deserialize(in: InputStream): KafkaSourceOffset = {
+          val length = in.read()
+          val bytes = new Array[Byte](length)
+          in.read(bytes)
+          KafkaSourceOffset(SerializedOffset(new String(bytes, StandardCharsets.UTF_8)))
+        }
+      }
+
     metadataLog.get(0).getOrElse {
       val offsets = startingOffsets match {
         case EarliestOffsets => KafkaSourceOffset(fetchEarliestOffsets())

http://git-wip-us.apache.org/repos/asf/spark/blob/3f62e1b5/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceOffset.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceOffset.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceOffset.scala
index b5ade98..b5da415 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceOffset.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceOffset.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.kafka010
 
 import org.apache.kafka.common.TopicPartition
 
-import org.apache.spark.sql.execution.streaming.Offset
+import org.apache.spark.sql.execution.streaming.{Offset, SerializedOffset}
 
 /**
  * An [[Offset]] for the [[KafkaSource]]. This one tracks all partitions of subscribed topics and
@@ -27,9 +27,8 @@ import org.apache.spark.sql.execution.streaming.Offset
  */
 private[kafka010]
 case class KafkaSourceOffset(partitionToOffsets: Map[TopicPartition, Long]) extends Offset {
-  override def toString(): String = {
-    partitionToOffsets.toSeq.sortBy(_._1.toString).mkString("[", ", ", "]")
-  }
+
+  override val json = JsonUtils.partitionOffsets(partitionToOffsets)
 }
 
 /** Companion object of the [[KafkaSourceOffset]] */
@@ -38,6 +37,7 @@ private[kafka010] object KafkaSourceOffset {
   def getPartitionOffsets(offset: Offset): Map[TopicPartition, Long] = {
     offset match {
       case o: KafkaSourceOffset => o.partitionToOffsets
+      case so: SerializedOffset => KafkaSourceOffset(so).partitionToOffsets
       case _ =>
         throw new IllegalArgumentException(
           s"Invalid conversion from offset of ${offset.getClass} to KafkaSourceOffset")
@@ -51,4 +51,10 @@ private[kafka010] object KafkaSourceOffset {
   def apply(offsetTuples: (String, Int, Long)*): KafkaSourceOffset = {
     KafkaSourceOffset(offsetTuples.map { case(t, p, o) => (new TopicPartition(t, p), o) }.toMap)
   }
+
+  /**
+   * Returns [[KafkaSourceOffset]] from a JSON [[SerializedOffset]]
+   */
+  def apply(offset: SerializedOffset): KafkaSourceOffset =
+    KafkaSourceOffset(JsonUtils.partitionOffsets(offset.json))
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/3f62e1b5/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala
index 7056a41..881018f 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala
@@ -17,9 +17,13 @@
 
 package org.apache.spark.sql.kafka010
 
+import java.io.File
+
+import org.apache.spark.sql.execution.streaming._
 import org.apache.spark.sql.streaming.OffsetSuite
+import org.apache.spark.sql.test.SharedSQLContext
 
-class KafkaSourceOffsetSuite extends OffsetSuite {
+class KafkaSourceOffsetSuite extends OffsetSuite with SharedSQLContext {
 
   compare(
     one = KafkaSourceOffset(("t", 0, 1L)),
@@ -36,4 +40,53 @@ class KafkaSourceOffsetSuite extends OffsetSuite {
   compare(
     one = KafkaSourceOffset(("t", 0, 1L)),
     two = KafkaSourceOffset(("t", 0, 2L), ("t", 1, 1L)))
+
+
+  val kso1 = KafkaSourceOffset(("t", 0, 1L))
+  val kso2 = KafkaSourceOffset(("t", 0, 2L), ("t", 1, 3L))
+  val kso3 = KafkaSourceOffset(("t", 0, 2L), ("t", 1, 3L), ("t", 1, 4L))
+
+  compare(KafkaSourceOffset(SerializedOffset(kso1.json)),
+    KafkaSourceOffset(SerializedOffset(kso2.json)))
+
+  test("basic serialization - deserialization") {
+    assert(KafkaSourceOffset.getPartitionOffsets(kso1) ==
+      KafkaSourceOffset.getPartitionOffsets(SerializedOffset(kso1.json)))
+  }
+
+
+  testWithUninterruptibleThread("OffsetSeqLog serialization - deserialization") {
+    withTempDir { temp =>
+      // use non-existent directory to test whether log make the dir
+      val dir = new File(temp, "dir")
+      val metadataLog = new OffsetSeqLog(spark, dir.getAbsolutePath)
+      val batch0 = OffsetSeq.fill(kso1)
+      val batch1 = OffsetSeq.fill(kso2, kso3)
+
+      val batch0Serialized = OffsetSeq.fill(batch0.offsets.flatMap(_.map(o =>
+        SerializedOffset(o.json))): _*)
+
+      val batch1Serialized = OffsetSeq.fill(batch1.offsets.flatMap(_.map(o =>
+        SerializedOffset(o.json))): _*)
+
+      assert(metadataLog.add(0, batch0))
+      assert(metadataLog.getLatest() === Some(0 -> batch0Serialized))
+      assert(metadataLog.get(0) === Some(batch0Serialized))
+
+      assert(metadataLog.add(1, batch1))
+      assert(metadataLog.get(0) === Some(batch0Serialized))
+      assert(metadataLog.get(1) === Some(batch1Serialized))
+      assert(metadataLog.getLatest() === Some(1 -> batch1Serialized))
+      assert(metadataLog.get(None, Some(1)) ===
+        Array(0 -> batch0Serialized, 1 -> batch1Serialized))
+
+      // Adding the same batch does nothing
+      metadataLog.add(1, OffsetSeq.fill(LongOffset(3)))
+      assert(metadataLog.get(0) === Some(batch0Serialized))
+      assert(metadataLog.get(1) === Some(batch1Serialized))
+      assert(metadataLog.getLatest() === Some(1 -> batch1Serialized))
+      assert(metadataLog.get(None, Some(1)) ===
+        Array(0 -> batch0Serialized, 1 -> batch1Serialized))
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/3f62e1b5/python/pyspark/sql/streaming.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py
index 1c94413..f326f16 100644
--- a/python/pyspark/sql/streaming.py
+++ b/python/pyspark/sql/streaming.py
@@ -220,7 +220,7 @@ class StreamingQueryStatus(object):
                 triggerId: 5
             Source statuses [1 source]:
                 Source 1 - MySource1
-                    Available offset: #0
+                    Available offset: 0
                     Input rate: 15.5 rows/sec
                     Processing rate: 23.5 rows/sec
                     Trigger details:
@@ -228,7 +228,7 @@ class StreamingQueryStatus(object):
                         latency.getOffset.source: 10
                         latency.getBatch.source: 20
             Sink status - MySink
-                Committed offsets: [#1, -]
+                Committed offsets: [1, -]
         """
         return self._jsqs.toString()
 
@@ -366,7 +366,7 @@ class SourceStatus(object):
 
         >>> print(sqs.sourceStatuses[0])
         Status of source MySource1
-            Available offset: #0
+            Available offset: 0
             Input rate: 15.5 rows/sec
             Processing rate: 23.5 rows/sec
             Trigger details:
@@ -396,7 +396,7 @@ class SourceStatus(object):
         Description of the current offset if known.
 
         >>> sqs.sourceStatuses[0].offsetDesc
-        u'#0'
+        u'0'
         """
         return self._jss.offsetDesc()
 
@@ -457,7 +457,7 @@ class SinkStatus(object):
 
         >>> print(sqs.sinkStatus)
         Status of sink MySink
-            Committed offsets: [#1, -]
+            Committed offsets: [1, -]
         """
         return self._jss.toString()
 
@@ -481,7 +481,7 @@ class SinkStatus(object):
         Description of the current offsets up to which data has been written by the sink.
 
         >>> sqs.sinkStatus.offsetDesc
-        u'[#1, -]'
+        u'[1, -]'
         """
         return self._jss.offsetDesc()
 

http://git-wip-us.apache.org/repos/asf/spark/blob/3f62e1b5/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala
index b26edee..8af3db1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala
@@ -24,6 +24,8 @@ import scala.io.{Source => IOSource}
 import scala.reflect.ClassTag
 
 import org.apache.hadoop.fs.{Path, PathFilter}
+import org.json4s.NoTypeHints
+import org.json4s.jackson.Serialization
 
 import org.apache.spark.sql.SparkSession
 
@@ -37,7 +39,7 @@ import org.apache.spark.sql.SparkSession
  * compact log files every 10 batches by default into a big file. When
  * doing a compaction, it will read all old log files and merge them with the new batch.
  */
-abstract class CompactibleFileStreamLog[T: ClassTag](
+abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag](
     metadataLogVersion: String,
     sparkSession: SparkSession,
     path: String)
@@ -45,6 +47,11 @@ abstract class CompactibleFileStreamLog[T: ClassTag](
 
   import CompactibleFileStreamLog._
 
+  private implicit val formats = Serialization.formats(NoTypeHints)
+
+  /** Needed to serialize type T into JSON when using Jackson */
+  private implicit val manifest = Manifest.classType[T](implicitly[ClassTag[T]].runtimeClass)
+
   /**
    * If we delete the old files after compaction at once, there is a race condition in S3: other
    * processes may see the old files are deleted but still cannot see the compaction file using
@@ -59,16 +66,6 @@ abstract class CompactibleFileStreamLog[T: ClassTag](
   protected def compactInterval: Int
 
   /**
-   * Serialize the data into encoded string.
-   */
-  protected def serializeData(t: T): String
-
-  /**
-   * Deserialize the string into data object.
-   */
-  protected def deserializeData(encodedString: String): T
-
-  /**
    * Filter out the obsolete logs.
    */
   def compactLogs(logs: Seq[T]): Seq[T]
@@ -99,7 +96,7 @@ abstract class CompactibleFileStreamLog[T: ClassTag](
     out.write(metadataLogVersion.getBytes(UTF_8))
     logData.foreach { data =>
       out.write('\n')
-      out.write(serializeData(data).getBytes(UTF_8))
+      out.write(Serialization.write(data).getBytes(UTF_8))
     }
   }
 
@@ -112,7 +109,7 @@ abstract class CompactibleFileStreamLog[T: ClassTag](
     if (version != metadataLogVersion) {
       throw new IllegalStateException(s"Unknown log version: ${version}")
     }
-    lines.map(deserializeData).toArray
+    lines.map(Serialization.read[T]).toArray
   }
 
   override def add(batchId: Long, logs: Array[T]): Boolean = {

http://git-wip-us.apache.org/repos/asf/spark/blob/3f62e1b5/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompositeOffset.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompositeOffset.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompositeOffset.scala
deleted file mode 100644
index ebc6ee8..0000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompositeOffset.scala
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.streaming
-
-/**
- * An ordered collection of offsets, used to track the progress of processing data from one or more
- * [[Source]]s that are present in a streaming query. This is similar to simplified, single-instance
- * vector clock that must progress linearly forward.
- */
-case class CompositeOffset(offsets: Seq[Option[Offset]]) extends Offset {
-  /**
-   * Unpacks an offset into [[StreamProgress]] by associating each offset with the order list of
-   * sources.
-   *
-   * This method is typically used to associate a serialized offset with actual sources (which
-   * cannot be serialized).
-   */
-  def toStreamProgress(sources: Seq[Source]): StreamProgress = {
-    assert(sources.size == offsets.size)
-    new StreamProgress ++ sources.zip(offsets).collect { case (s, Some(o)) => (s, o) }
-  }
-
-  override def toString: String =
-    offsets.map(_.map(_.toString).getOrElse("-")).mkString("[", ", ", "]")
-}
-
-object CompositeOffset {
-  /**
-   * Returns a [[CompositeOffset]] with a variable sequence of offsets.
-   * `nulls` in the sequence are converted to `None`s.
-   */
-  def fill(offsets: Offset*): CompositeOffset = {
-    CompositeOffset(offsets.map(Option(_)))
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/3f62e1b5/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala
index f9e2416..b4f1415 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala
@@ -93,14 +93,6 @@ class FileStreamSinkLog(
     s"Please set ${SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key} (was $compactInterval) " +
       "to a positive value.")
 
-  protected override def serializeData(data: SinkFileStatus): String = {
-    write(data)
-  }
-
-  protected override def deserializeData(encodedString: String): SinkFileStatus = {
-    read[SinkFileStatus](encodedString)
-  }
-
   override def compactLogs(logs: Seq[SinkFileStatus]): Seq[SinkFileStatus] = {
     val deletedFiles = logs.filter(_.action == FileStreamSinkLog.DELETE_ACTION).map(_.path).toSet
     if (deletedFiles.isEmpty) {

http://git-wip-us.apache.org/repos/asf/spark/blob/3f62e1b5/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
index 680df01..8494aef 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
@@ -131,8 +131,8 @@ class FileStreamSource(
    * Returns the data that is between the offsets (`start`, `end`].
    */
   override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
-    val startId = start.map(_.asInstanceOf[LongOffset].offset).getOrElse(-1L)
-    val endId = end.asInstanceOf[LongOffset].offset
+    val startId = start.flatMap(LongOffset.convert(_)).getOrElse(LongOffset(-1L)).offset
+    val endId = LongOffset.convert(end).getOrElse(LongOffset(0)).offset
 
     assert(startId <= endId)
     val files = metadataLog.get(Some(startId + 1), Some(endId)).flatMap(_._2)

http://git-wip-us.apache.org/repos/asf/spark/blob/3f62e1b5/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala
index 4681f2b..fe81b15 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala
@@ -60,14 +60,6 @@ class FileStreamSourceLog(
     }
   }
 
-  protected override def serializeData(data: FileEntry): String = {
-    Serialization.write(data)
-  }
-
-  protected override def deserializeData(encodedString: String): FileEntry = {
-    Serialization.read[FileEntry](encodedString)
-  }
-
   def compactLogs(logs: Seq[FileEntry]): Seq[FileEntry] = {
     logs
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/3f62e1b5/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
index 9a0f87c..db7057d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
@@ -17,7 +17,8 @@
 
 package org.apache.spark.sql.execution.streaming
 
-import java.io.{FileNotFoundException, InputStream, IOException, OutputStream}
+import java.io._
+import java.nio.charset.StandardCharsets
 import java.util.{ConcurrentModificationException, EnumSet, UUID}
 
 import scala.reflect.ClassTag
@@ -26,9 +27,10 @@ import org.apache.commons.io.IOUtils
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs._
 import org.apache.hadoop.fs.permission.FsPermission
+import org.json4s.NoTypeHints
+import org.json4s.jackson.Serialization
 
 import org.apache.spark.internal.Logging
-import org.apache.spark.serializer.JavaSerializer
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.util.UninterruptibleThread
 
@@ -44,9 +46,14 @@ import org.apache.spark.util.UninterruptibleThread
  * Note: [[HDFSMetadataLog]] doesn't support S3-like file systems as they don't guarantee listing
  * files in a directory always shows the latest files.
  */
-class HDFSMetadataLog[T: ClassTag](sparkSession: SparkSession, path: String)
+class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path: String)
   extends MetadataLog[T] with Logging {
 
+  private implicit val formats = Serialization.formats(NoTypeHints)
+
+  /** Needed to serialize type T into JSON when using Jackson */
+  private implicit val manifest = Manifest.classType[T](implicitly[ClassTag[T]].runtimeClass)
+
   // Avoid serializing generic sequences, see SPARK-17372
   require(implicitly[ClassTag[T]].runtimeClass != classOf[Seq[_]],
     "Should not create a log with type Seq, use Arrays instead - see SPARK-17372")
@@ -67,8 +74,6 @@ class HDFSMetadataLog[T: ClassTag](sparkSession: SparkSession, path: String)
     override def accept(path: Path): Boolean = isBatchFile(path)
   }
 
-  private val serializer = new JavaSerializer(sparkSession.sparkContext.conf).newInstance()
-
   protected def batchIdToPath(batchId: Long): Path = {
     new Path(metadataPath, batchId.toString)
   }
@@ -88,14 +93,13 @@ class HDFSMetadataLog[T: ClassTag](sparkSession: SparkSession, path: String)
 
   protected def serialize(metadata: T, out: OutputStream): Unit = {
     // called inside a try-finally where the underlying stream is closed in the caller
-    val outStream = serializer.serializeStream(out)
-    outStream.writeObject(metadata)
+    Serialization.write(metadata, out)
   }
 
   protected def deserialize(in: InputStream): T = {
     // called inside a try-finally where the underlying stream is closed in the caller
-    val inStream = serializer.deserializeStream(in)
-    inStream.readObject[T]()
+    val reader = new InputStreamReader(in, StandardCharsets.UTF_8)
+    Serialization.read[T](reader)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/3f62e1b5/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/LongOffset.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/LongOffset.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/LongOffset.scala
index c5e8827..5f0b195 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/LongOffset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/LongOffset.scala
@@ -22,8 +22,27 @@ package org.apache.spark.sql.execution.streaming
  */
 case class LongOffset(offset: Long) extends Offset {
 
+  override val json = offset.toString
+
   def +(increment: Long): LongOffset = new LongOffset(offset + increment)
   def -(decrement: Long): LongOffset = new LongOffset(offset - decrement)
+}
+
+object LongOffset {
+
+  /**
+   * LongOffset factory from serialized offset.
+   * @return new LongOffset
+   */
+  def apply(offset: SerializedOffset) : LongOffset = new LongOffset(offset.json.toLong)
 
-  override def toString: String = s"#$offset"
+  /**
+   * Convert generic Offset to LongOffset if possible.
+   * @return converted LongOffset
+   */
+  def convert(offset: Offset): Option[LongOffset] = offset match {
+    case lo: LongOffset => Some(lo)
+    case so: SerializedOffset => Some(LongOffset(so))
+    case _ => None
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/3f62e1b5/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Offset.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Offset.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Offset.scala
index 1f52abf..4efcee0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Offset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Offset.scala
@@ -23,4 +23,38 @@ package org.apache.spark.sql.execution.streaming
  * ordering of two [[Offset]] instances.  We do assume that if two offsets are `equal` then no
  * new data has arrived.
  */
-trait Offset extends Serializable {}
+abstract class Offset {
+
+  /**
+   * Equality based on JSON string representation. We leverage the
+   * JSON representation for normalization between the Offset's
+   * in memory and on disk representations.
+   */
+  override def equals(obj: Any): Boolean = obj match {
+    case o: Offset => this.json == o.json
+    case _ => false
+  }
+
+  override def hashCode(): Int = this.json.hashCode
+
+  override def toString(): String = this.json.toString
+
+  /**
+   * A JSON-serialized representation of an Offset that is
+   * used for saving offsets to the offset log.
+   * Note: We assume that equivalent/equal offsets serialize to
+   * identical JSON strings.
+   *
+   * @return JSON string encoding
+   */
+  def json: String
+}
+
+/**
+ * Used when loading a JSON serialized offset from external storage.
+ * We are currently not responsible for converting JSON serialized
+ * data into an internal (i.e., object) representation. Sources should
+ * define a factory method in their source Offset companion objects
+ * that accepts a [[SerializedOffset]] for doing the conversion.
+ */
+case class SerializedOffset(override val json: String) extends Offset

http://git-wip-us.apache.org/repos/asf/spark/blob/3f62e1b5/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala
new file mode 100644
index 0000000..a4e1fe6
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+
+/**
+ * An ordered collection of offsets, used to track the progress of processing data from one or more
+ * [[Source]]s that are present in a streaming query. This is similar to simplified, single-instance
+ * vector clock that must progress linearly forward.
+ */
+case class OffsetSeq(offsets: Seq[Option[Offset]]) {
+
+  /**
+   * Unpacks an offset into [[StreamProgress]] by associating each offset with the order list of
+   * sources.
+   *
+   * This method is typically used to associate a serialized offset with actual sources (which
+   * cannot be serialized).
+   */
+  def toStreamProgress(sources: Seq[Source]): StreamProgress = {
+    assert(sources.size == offsets.size)
+    new StreamProgress ++ sources.zip(offsets).collect { case (s, Some(o)) => (s, o) }
+  }
+
+  override def toString: String =
+    offsets.map(_.map(_.json).getOrElse("-")).mkString("[", ", ", "]")
+}
+
+object OffsetSeq {
+
+  /**
+   * Returns a [[OffsetSeq]] with a variable sequence of offsets.
+   * `nulls` in the sequence are converted to `None`s.
+   */
+  def fill(offsets: Offset*): OffsetSeq = {
+    OffsetSeq(offsets.map(Option(_)))
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/3f62e1b5/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLog.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLog.scala
new file mode 100644
index 0000000..d1c9d95
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLog.scala
@@ -0,0 +1,80 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.spark.sql.execution.streaming
+
+
+import java.io.{InputStream, OutputStream}
+import java.nio.charset.StandardCharsets._
+
+import scala.io.{Source => IOSource}
+
+import org.apache.spark.sql.SparkSession
+
+/**
+ * This class is used to log offsets to persistent files in HDFS.
+ * Each file corresponds to a specific batch of offsets. The file
+ * format contain a version string in the first line, followed
+ * by a the JSON string representation of the offsets separated
+ * by a newline character. If a source offset is missing, then
+ * that line will contain a string value defined in the
+ * SERIALIZED_VOID_OFFSET variable in [[OffsetSeqLog]] companion object.
+ * For instance, when dealine wiht [[LongOffset]] types:
+ *   v1   // version 1
+ *   {0}  // LongOffset 0
+ *   {3}  // LongOffset 3
+ *   -    // No offset for this source i.e., an invalid JSON string
+ *   {2}  // LongOffset 2
+ *   ...
+ */
+class OffsetSeqLog(sparkSession: SparkSession, path: String)
+  extends HDFSMetadataLog[OffsetSeq](sparkSession, path) {
+
+  override protected def deserialize(in: InputStream): OffsetSeq = {
+    // called inside a try-finally where the underlying stream is closed in the caller
+    def parseOffset(value: String): Offset = value match {
+      case OffsetSeqLog.SERIALIZED_VOID_OFFSET => null
+      case json => SerializedOffset(json)
+    }
+    val lines = IOSource.fromInputStream(in, UTF_8.name()).getLines()
+    if (!lines.hasNext) {
+      throw new IllegalStateException("Incomplete log file")
+    }
+    val version = lines.next()
+    if (version != OffsetSeqLog.VERSION) {
+      throw new IllegalStateException(s"Unknown log version: ${version}")
+    }
+    OffsetSeq.fill(lines.map(parseOffset).toArray: _*)
+  }
+
+  override protected def serialize(metadata: OffsetSeq, out: OutputStream): Unit = {
+    // called inside a try-finally where the underlying stream is closed in the caller
+    out.write(OffsetSeqLog.VERSION.getBytes(UTF_8))
+    metadata.offsets.map(_.map(_.json)).foreach { offset =>
+      out.write('\n')
+      offset match {
+        case Some(json: String) => out.write(json.getBytes(UTF_8))
+        case None => out.write(OffsetSeqLog.SERIALIZED_VOID_OFFSET.getBytes(UTF_8))
+      }
+    }
+  }
+}
+
+object OffsetSeqLog {
+  private val VERSION = "v1"
+  private val SERIALIZED_VOID_OFFSET = "-"
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/3f62e1b5/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala
index f3bd5bf..75ffe90 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala
@@ -45,6 +45,14 @@ trait Source  {
    * Higher layers will always call this method with a value of `start` greater than or equal
    * to the last value passed to `commit` and a value of `end` less than or equal to the
    * last value returned by `getOffset`
+   *
+   * It is possible for the [[Offset]] type to be a [[SerializedOffset]] when it was
+   * obtained from the log. Moreover, [[StreamExecution]] only compares the [[Offset]]
+   * JSON representation to determine if the two objects are equal. This could have
+   * ramifications when upgrading [[Offset]] JSON formats i.e., two equivalent [[Offset]]
+   * objects could differ between version. Consequently, [[StreamExecution]] may call
+   * this method with two such equivalent [[Offset]] objects. In which case, the [[Source]]
+   * should return an empty [[DataFrame]]
    */
   def getBatch(start: Option[Offset], end: Offset): DataFrame
 

http://git-wip-us.apache.org/repos/asf/spark/blob/3f62e1b5/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 37af1a5..57e89f8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -34,7 +34,6 @@ import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
 import org.apache.spark.sql.catalyst.util._
 import org.apache.spark.sql.execution.{QueryExecution, SparkPlan}
 import org.apache.spark.sql.execution.command.ExplainCommand
-import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.streaming._
 import org.apache.spark.util.{Clock, UninterruptibleThread, Utils}
 
@@ -149,7 +148,7 @@ class StreamExecution(
    * processing is done.  Thus, the Nth record in this log indicated data that is currently being
    * processed and the N-1th entry indicates which offsets have been durably committed to the sink.
    */
-  val offsetLog = new HDFSMetadataLog[CompositeOffset](sparkSession, checkpointFile("offsets"))
+  val offsetLog = new OffsetSeqLog(sparkSession, checkpointFile("offsets"))
 
   /** Whether the query is currently active or not */
   override def isActive: Boolean = state == ACTIVE
@@ -249,7 +248,7 @@ class StreamExecution(
           this,
           s"Query $name terminated with exception: ${e.getMessage}",
           e,
-          Some(committedOffsets.toCompositeOffset(sources)))
+          Some(committedOffsets.toOffsetSeq(sources)))
         logError(s"Query $name terminated with error", e)
         // Rethrow the fatal errors to allow the user using `Thread.UncaughtExceptionHandler` to
         // handle them
@@ -343,7 +342,7 @@ class StreamExecution(
     }
     if (hasNewData) {
       reportTimeTaken(OFFSET_WAL_WRITE_LATENCY) {
-        assert(offsetLog.add(currentBatchId, availableOffsets.toCompositeOffset(sources)),
+        assert(offsetLog.add(currentBatchId, availableOffsets.toOffsetSeq(sources)),
           s"Concurrent update to the log. Multiple streaming jobs detected for $currentBatchId")
         logInfo(s"Committed offsets for batch $currentBatchId.")
 
@@ -684,14 +683,14 @@ class StreamExecution(
     val sourceStatuses = sources.map { s =>
       SourceStatus(
         s.toString,
-        localAvailableOffsets.get(s).map(_.toString).getOrElse("-"), // TODO: use json if available
+        localAvailableOffsets.get(s).map(_.json).getOrElse("-"),
         streamMetrics.currentSourceInputRate(s),
         streamMetrics.currentSourceProcessingRate(s),
         streamMetrics.currentSourceTriggerDetails(s))
     }.toArray
     val sinkStatus = SinkStatus(
       sink.toString,
-      committedOffsets.toCompositeOffset(sources).toString)
+      committedOffsets.toOffsetSeq(sources).toString)
 
     currentStatus =
       StreamingQueryStatus(

http://git-wip-us.apache.org/repos/asf/spark/blob/3f62e1b5/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamProgress.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamProgress.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamProgress.scala
index db0bd9e..05a6547 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamProgress.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamProgress.scala
@@ -26,8 +26,8 @@ class StreamProgress(
     val baseMap: immutable.Map[Source, Offset] = new immutable.HashMap[Source, Offset])
   extends scala.collection.immutable.Map[Source, Offset] {
 
-  def toCompositeOffset(source: Seq[Source]): CompositeOffset = {
-    CompositeOffset(source.map(get))
+  def toOffsetSeq(source: Seq[Source]): OffsetSeq = {
+    OffsetSeq(source.map(get))
   }
 
   override def toString: String =

http://git-wip-us.apache.org/repos/asf/spark/blob/3f62e1b5/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
index 613c7cc..582b548 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
@@ -106,8 +106,8 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext)
   override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
     // Compute the internal batch numbers to fetch: [startOrdinal, endOrdinal)
     val startOrdinal =
-      start.map(_.asInstanceOf[LongOffset]).getOrElse(LongOffset(-1)).offset.toInt + 1
-    val endOrdinal = end.asInstanceOf[LongOffset].offset.toInt + 1
+      start.flatMap(LongOffset.convert).getOrElse(LongOffset(-1)).offset.toInt + 1
+    val endOrdinal = LongOffset.convert(end).getOrElse(LongOffset(-1)).offset.toInt + 1
 
     // Internal buffer only holds the batches after lastCommittedOffset.
     val newBlocks = synchronized {
@@ -127,19 +127,21 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext)
   }
 
   override def commit(end: Offset): Unit = synchronized {
-    end match {
-      case newOffset: LongOffset =>
-        val offsetDiff = (newOffset.offset - lastOffsetCommitted.offset).toInt
-
-        if (offsetDiff < 0) {
-          sys.error(s"Offsets committed out of order: $lastOffsetCommitted followed by $end")
-        }
-
-        batches.trimStart(offsetDiff)
-        lastOffsetCommitted = newOffset
-      case _ =>
-        sys.error(s"MemoryStream.commit() received an offset ($end) that did not originate with " +
-          "an instance of this class")
+    def check(newOffset: LongOffset): Unit = {
+      val offsetDiff = (newOffset.offset - lastOffsetCommitted.offset).toInt
+
+      if (offsetDiff < 0) {
+        sys.error(s"Offsets committed out of order: $lastOffsetCommitted followed by $end")
+      }
+
+      batches.trimStart(offsetDiff)
+      lastOffsetCommitted = newOffset
+    }
+
+    LongOffset.convert(end) match {
+      case Some(lo) => check(lo)
+      case None => sys.error(s"MemoryStream.commit() received an offset ($end) " +
+        "that did not originate with an instance of this class")
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/3f62e1b5/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala
index 042977f..900d92b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala
@@ -116,8 +116,8 @@ class TextSocketSource(host: String, port: Int, includeTimestamp: Boolean, sqlCo
   /** Returns the data that is between the offsets (`start`, `end`]. */
   override def getBatch(start: Option[Offset], end: Offset): DataFrame = synchronized {
     val startOrdinal =
-      start.map(_.asInstanceOf[LongOffset]).getOrElse(LongOffset(-1)).offset.toInt + 1
-    val endOrdinal = end.asInstanceOf[LongOffset].offset.toInt + 1
+      start.flatMap(LongOffset.convert).getOrElse(LongOffset(-1)).offset.toInt + 1
+    val endOrdinal = LongOffset.convert(end).getOrElse(LongOffset(-1)).offset.toInt + 1
 
     // Internal buffer only holds the batches after lastOffsetCommitted
     val rawList = synchronized {
@@ -140,20 +140,19 @@ class TextSocketSource(host: String, port: Int, includeTimestamp: Boolean, sqlCo
   }
 
   override def commit(end: Offset): Unit = synchronized {
-    if (end.isInstanceOf[LongOffset]) {
-      val newOffset = end.asInstanceOf[LongOffset]
-      val offsetDiff = (newOffset.offset - lastOffsetCommitted.offset).toInt
-
-      if (offsetDiff < 0) {
-        sys.error(s"Offsets committed out of order: $lastOffsetCommitted followed by $end")
-      }
-
-      batches.trimStart(offsetDiff)
-      lastOffsetCommitted = newOffset
-    } else {
+    val newOffset = LongOffset.convert(end).getOrElse(
       sys.error(s"TextSocketStream.commit() received an offset ($end) that did not " +
         s"originate with an instance of this class")
+    )
+
+    val offsetDiff = (newOffset.offset - lastOffsetCommitted.offset).toInt
+
+    if (offsetDiff < 0) {
+      sys.error(s"Offsets committed out of order: $lastOffsetCommitted followed by $end")
     }
+
+    batches.trimStart(offsetDiff)
+    lastOffsetCommitted = newOffset
   }
 
   /** Stop this source. */

http://git-wip-us.apache.org/repos/asf/spark/blob/3f62e1b5/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala
index bd3e5a5..0a58142 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.sql.streaming
 
 import org.apache.spark.annotation.Experimental
-import org.apache.spark.sql.execution.streaming.{Offset, StreamExecution}
+import org.apache.spark.sql.execution.streaming.{Offset, OffsetSeq, StreamExecution}
 
 /**
  * :: Experimental ::
@@ -36,8 +36,8 @@ class StreamingQueryException private[sql](
     @transient val query: StreamingQuery,
     val message: String,
     val cause: Throwable,
-    val startOffset: Option[Offset] = None,
-    val endOffset: Option[Offset] = None)
+    val startOffset: Option[OffsetSeq] = None,
+    val endOffset: Option[OffsetSeq] = None)
   extends Exception(message, cause) {
 
   /** Time when the exception occurred */

http://git-wip-us.apache.org/repos/asf/spark/blob/3f62e1b5/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryStatus.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryStatus.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryStatus.scala
index a50b0d9..99c7729 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryStatus.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryStatus.scala
@@ -27,7 +27,7 @@ import org.json4s.JsonDSL._
 import org.json4s.jackson.JsonMethods._
 
 import org.apache.spark.annotation.Experimental
-import org.apache.spark.sql.execution.streaming.{CompositeOffset, LongOffset}
+import org.apache.spark.sql.execution.streaming.{LongOffset, OffsetSeq}
 import org.apache.spark.util.JsonProtocol
 
 /**
@@ -140,7 +140,7 @@ private[sql] object StreamingQueryStatus {
       sourceStatuses = Array(
         SourceStatus(
           desc = "MySource1",
-          offsetDesc = LongOffset(0).toString,
+          offsetDesc = LongOffset(0).json,
           inputRate = 15.5,
           processingRate = 23.5,
           triggerDetails = Map(
@@ -149,7 +149,7 @@ private[sql] object StreamingQueryStatus {
             SOURCE_GET_BATCH_LATENCY -> "20"))),
       sinkStatus = SinkStatus(
         desc = "MySink",
-        offsetDesc = CompositeOffset(Some(LongOffset(1)) :: None :: Nil).toString),
+        offsetDesc = OffsetSeq(Some(LongOffset(1)) :: None :: Nil).toString),
       triggerDetails = Map(
         TRIGGER_ID -> "5",
         IS_TRIGGER_ACTIVE -> "true",

http://git-wip-us.apache.org/repos/asf/spark/blob/3f62e1b5/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala
new file mode 100644
index 0000000..3afd11f
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.io.File
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.test.SharedSQLContext
+
+class OffsetSeqLogSuite extends SparkFunSuite with SharedSQLContext {
+
+  /** test string offset type */
+  case class StringOffset(override val json: String) extends Offset
+
+  testWithUninterruptibleThread("serialization - deserialization") {
+    withTempDir { temp =>
+      val dir = new File(temp, "dir") // use non-existent directory to test whether log make the dir
+    val metadataLog = new OffsetSeqLog(spark, dir.getAbsolutePath)
+      val batch0 = OffsetSeq.fill(LongOffset(0), LongOffset(1), LongOffset(2))
+      val batch1 = OffsetSeq.fill(StringOffset("one"), StringOffset("two"), StringOffset("three"))
+
+      val batch0Serialized = OffsetSeq.fill(batch0.offsets.flatMap(_.map(o =>
+        SerializedOffset(o.json))): _*)
+
+      val batch1Serialized = OffsetSeq.fill(batch1.offsets.flatMap(_.map(o =>
+        SerializedOffset(o.json))): _*)
+
+      assert(metadataLog.add(0, batch0))
+      assert(metadataLog.getLatest() === Some(0 -> batch0Serialized))
+      assert(metadataLog.get(0) === Some(batch0Serialized))
+
+      assert(metadataLog.add(1, batch1))
+      assert(metadataLog.get(0) === Some(batch0Serialized))
+      assert(metadataLog.get(1) === Some(batch1Serialized))
+      assert(metadataLog.getLatest() === Some(1 -> batch1Serialized))
+      assert(metadataLog.get(None, Some(1)) ===
+        Array(0 -> batch0Serialized, 1 -> batch1Serialized))
+
+      // Adding the same batch does nothing
+      metadataLog.add(1, OffsetSeq.fill(LongOffset(3)))
+      assert(metadataLog.get(0) === Some(batch0Serialized))
+      assert(metadataLog.get(1) === Some(batch1Serialized))
+      assert(metadataLog.getLatest() === Some(1 -> batch1Serialized))
+      assert(metadataLog.get(None, Some(1)) ===
+        Array(0 -> batch0Serialized, 1 -> batch1Serialized))
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/3f62e1b5/sql/core/src/test/scala/org/apache/spark/sql/streaming/OffsetSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/OffsetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/OffsetSuite.scala
index b65a987..f208f9b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/OffsetSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/OffsetSuite.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.sql.streaming
 
 import org.apache.spark.SparkFunSuite
-import org.apache.spark.sql.execution.streaming.{CompositeOffset, LongOffset, Offset}
+import org.apache.spark.sql.execution.streaming.{LongOffset, Offset, SerializedOffset}
 
 trait OffsetSuite extends SparkFunSuite {
   /** Creates test to check all the comparisons of offsets given a `one` that is less than `two`. */
@@ -35,25 +35,11 @@ trait OffsetSuite extends SparkFunSuite {
 class LongOffsetSuite extends OffsetSuite {
   val one = LongOffset(1)
   val two = LongOffset(2)
+  val three = LongOffset(3)
   compare(one, two)
-}
-
-class CompositeOffsetSuite extends OffsetSuite {
-  compare(
-    one = CompositeOffset(Some(LongOffset(1)) :: Nil),
-    two = CompositeOffset(Some(LongOffset(2)) :: Nil))
-
-  compare(
-    one = CompositeOffset(None :: Nil),
-    two = CompositeOffset(Some(LongOffset(2)) :: Nil))
-
-  compare(
-    one = CompositeOffset.fill(LongOffset(0), LongOffset(1)),
-    two = CompositeOffset.fill(LongOffset(1), LongOffset(2)))
-
-  compare(
-    one = CompositeOffset.fill(LongOffset(1), LongOffset(1)),
-    two = CompositeOffset.fill(LongOffset(1), LongOffset(2)))
 
+  compare(LongOffset(SerializedOffset(one.json)),
+          LongOffset(SerializedOffset(three.json)))
 }
 
+

http://git-wip-us.apache.org/repos/asf/spark/blob/3f62e1b5/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusSuite.scala
index 1a98cf2..6af19fb 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusSuite.scala
@@ -24,7 +24,7 @@ class StreamingQueryStatusSuite extends SparkFunSuite {
     assert(StreamingQueryStatus.testStatus.sourceStatuses(0).toString ===
       """
         |Status of source MySource1
-        |    Available offset: #0
+        |    Available offset: 0
         |    Input rate: 15.5 rows/sec
         |    Processing rate: 23.5 rows/sec
         |    Trigger details:
@@ -36,7 +36,7 @@ class StreamingQueryStatusSuite extends SparkFunSuite {
     assert(StreamingQueryStatus.testStatus.sinkStatus.toString ===
       """
         |Status of sink MySink
-        |    Committed offsets: [#1, -]
+        |    Committed offsets: [1, -]
       """.stripMargin.trim, "SinkStatus.toString does not match")
 
     assert(StreamingQueryStatus.testStatus.toString ===
@@ -56,7 +56,7 @@ class StreamingQueryStatusSuite extends SparkFunSuite {
         |        triggerId: 5
         |    Source statuses [1 source]:
         |        Source 1 - MySource1
-        |            Available offset: #0
+        |            Available offset: 0
         |            Input rate: 15.5 rows/sec
         |            Processing rate: 23.5 rows/sec
         |            Trigger details:
@@ -64,7 +64,7 @@ class StreamingQueryStatusSuite extends SparkFunSuite {
         |                latency.getOffset.source: 10
         |                latency.getBatch.source: 20
         |    Sink status - MySink
-        |        Committed offsets: [#1, -]
+        |        Committed offsets: [1, -]
       """.stripMargin.trim, "StreamingQueryStatus.toString does not match")
 
   }
@@ -72,10 +72,10 @@ class StreamingQueryStatusSuite extends SparkFunSuite {
   test("json") {
     assert(StreamingQueryStatus.testStatus.json ===
       """
-        |{"sourceStatuses":[{"description":"MySource1","offsetDesc":"#0","inputRate":15.5,
+        |{"sourceStatuses":[{"description":"MySource1","offsetDesc":"0","inputRate":15.5,
         |"processingRate":23.5,"triggerDetails":{"numRows.input.source":"100",
         |"latency.getOffset.source":"10","latency.getBatch.source":"20"}}],
-        |"sinkStatus":{"description":"MySink","offsetDesc":"[#1, -]"}}
+        |"sinkStatus":{"description":"MySink","offsetDesc":"[1, -]"}}
       """.stripMargin.replace("\n", "").trim)
   }
 
@@ -86,7 +86,7 @@ class StreamingQueryStatusSuite extends SparkFunSuite {
           |{
           |  "sourceStatuses" : [ {
           |    "description" : "MySource1",
-          |    "offsetDesc" : "#0",
+          |    "offsetDesc" : "0",
           |    "inputRate" : 15.5,
           |    "processingRate" : 23.5,
           |    "triggerDetails" : {
@@ -97,7 +97,7 @@ class StreamingQueryStatusSuite extends SparkFunSuite {
           |  } ],
           |  "sinkStatus" : {
           |    "description" : "MySink",
-          |    "offsetDesc" : "[#1, -]"
+          |    "offsetDesc" : "[1, -]"
           |  }
           |}
         """.stripMargin.trim)

http://git-wip-us.apache.org/repos/asf/spark/blob/3f62e1b5/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index 31b7fe0..e2e66d6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -104,7 +104,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging {
       TestAwaitTermination(ExpectException[SparkException], timeoutMs = 10),
       AssertOnQuery(
         q =>
-          q.exception.get.startOffset.get === q.committedOffsets.toCompositeOffset(Seq(inputData)),
+          q.exception.get.startOffset.get === q.committedOffsets.toOffsetSeq(Seq(inputData)),
         "incorrect start offset on exception")
     )
   }
@@ -124,13 +124,13 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging {
       AssertOnQuery(_.status.sourceStatuses(0).inputRate === 0.0),
       AssertOnQuery(_.status.sourceStatuses(0).processingRate === 0.0),
       AssertOnQuery(_.status.sinkStatus.description.contains("Memory")),
-      AssertOnQuery(_.status.sinkStatus.offsetDesc === CompositeOffset(None :: Nil).toString),
+      AssertOnQuery(_.status.sinkStatus.offsetDesc === OffsetSeq(None :: Nil).toString),
       AssertOnQuery(_.sourceStatuses(0).description.contains("Memory")),
       AssertOnQuery(_.sourceStatuses(0).offsetDesc === "-"),
       AssertOnQuery(_.sourceStatuses(0).inputRate === 0.0),
       AssertOnQuery(_.sourceStatuses(0).processingRate === 0.0),
       AssertOnQuery(_.sinkStatus.description.contains("Memory")),
-      AssertOnQuery(_.sinkStatus.offsetDesc === new CompositeOffset(None :: Nil).toString),
+      AssertOnQuery(_.sinkStatus.offsetDesc === new OffsetSeq(None :: Nil).toString),
 
       AddData(inputData, 1, 2),
       CheckAnswer(6, 3),
@@ -139,38 +139,38 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging {
       AssertOnQuery(_.status.processingRate >= 0.0),
       AssertOnQuery(_.status.sourceStatuses.length === 1),
       AssertOnQuery(_.status.sourceStatuses(0).description.contains("Memory")),
-      AssertOnQuery(_.status.sourceStatuses(0).offsetDesc === LongOffset(0).toString),
+      AssertOnQuery(_.status.sourceStatuses(0).offsetDesc === LongOffset(0).json),
       AssertOnQuery(_.status.sourceStatuses(0).inputRate >= 0.0),
       AssertOnQuery(_.status.sourceStatuses(0).processingRate >= 0.0),
       AssertOnQuery(_.status.sinkStatus.description.contains("Memory")),
       AssertOnQuery(_.status.sinkStatus.offsetDesc ===
-        CompositeOffset.fill(LongOffset(0)).toString),
-      AssertOnQuery(_.sourceStatuses(0).offsetDesc === LongOffset(0).toString),
+        OffsetSeq.fill(LongOffset(0)).toString),
+      AssertOnQuery(_.sourceStatuses(0).offsetDesc === LongOffset(0).json),
       AssertOnQuery(_.sourceStatuses(0).inputRate >= 0.0),
       AssertOnQuery(_.sourceStatuses(0).processingRate >= 0.0),
-      AssertOnQuery(_.sinkStatus.offsetDesc === CompositeOffset.fill(LongOffset(0)).toString),
+      AssertOnQuery(_.sinkStatus.offsetDesc === OffsetSeq.fill(LongOffset(0)).toString),
 
       AddData(inputData, 1, 2),
       CheckAnswer(6, 3, 6, 3),
-      AssertOnQuery(_.status.sourceStatuses(0).offsetDesc === LongOffset(1).toString),
+      AssertOnQuery(_.status.sourceStatuses(0).offsetDesc === LongOffset(1).json),
       AssertOnQuery(_.status.sinkStatus.offsetDesc ===
-        CompositeOffset.fill(LongOffset(1)).toString),
-      AssertOnQuery(_.sourceStatuses(0).offsetDesc === LongOffset(1).toString),
-      AssertOnQuery(_.sinkStatus.offsetDesc === CompositeOffset.fill(LongOffset(1)).toString),
+        OffsetSeq.fill(LongOffset(1)).toString),
+      AssertOnQuery(_.sourceStatuses(0).offsetDesc === LongOffset(1).json),
+      AssertOnQuery(_.sinkStatus.offsetDesc === OffsetSeq.fill(LongOffset(1)).toString),
 
       StopStream,
       AssertOnQuery(_.status.inputRate === 0.0),
       AssertOnQuery(_.status.processingRate === 0.0),
       AssertOnQuery(_.status.sourceStatuses.length === 1),
-      AssertOnQuery(_.status.sourceStatuses(0).offsetDesc === LongOffset(1).toString),
+      AssertOnQuery(_.status.sourceStatuses(0).offsetDesc === LongOffset(1).json),
       AssertOnQuery(_.status.sourceStatuses(0).inputRate === 0.0),
       AssertOnQuery(_.status.sourceStatuses(0).processingRate === 0.0),
       AssertOnQuery(_.status.sinkStatus.offsetDesc ===
-        CompositeOffset.fill(LongOffset(1)).toString),
-      AssertOnQuery(_.sourceStatuses(0).offsetDesc === LongOffset(1).toString),
+        OffsetSeq.fill(LongOffset(1)).toString),
+      AssertOnQuery(_.sourceStatuses(0).offsetDesc === LongOffset(1).json),
       AssertOnQuery(_.sourceStatuses(0).inputRate === 0.0),
       AssertOnQuery(_.sourceStatuses(0).processingRate === 0.0),
-      AssertOnQuery(_.sinkStatus.offsetDesc === CompositeOffset.fill(LongOffset(1)).toString),
+      AssertOnQuery(_.sinkStatus.offsetDesc === OffsetSeq.fill(LongOffset(1)).toString),
       AssertOnQuery(_.status.triggerDetails.isEmpty),
 
       StartStream(),
@@ -179,15 +179,15 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging {
       AssertOnQuery(_.status.inputRate === 0.0),
       AssertOnQuery(_.status.processingRate === 0.0),
       AssertOnQuery(_.status.sourceStatuses.length === 1),
-      AssertOnQuery(_.status.sourceStatuses(0).offsetDesc === LongOffset(2).toString),
+      AssertOnQuery(_.status.sourceStatuses(0).offsetDesc === LongOffset(2).json),
       AssertOnQuery(_.status.sourceStatuses(0).inputRate === 0.0),
       AssertOnQuery(_.status.sourceStatuses(0).processingRate === 0.0),
       AssertOnQuery(_.status.sinkStatus.offsetDesc ===
-        CompositeOffset.fill(LongOffset(1)).toString),
-      AssertOnQuery(_.sourceStatuses(0).offsetDesc === LongOffset(2).toString),
+        OffsetSeq.fill(LongOffset(1)).toString),
+      AssertOnQuery(_.sourceStatuses(0).offsetDesc === LongOffset(2).json),
       AssertOnQuery(_.sourceStatuses(0).inputRate === 0.0),
       AssertOnQuery(_.sourceStatuses(0).processingRate === 0.0),
-      AssertOnQuery(_.sinkStatus.offsetDesc === CompositeOffset.fill(LongOffset(1)).toString)
+      AssertOnQuery(_.sinkStatus.offsetDesc === OffsetSeq.fill(LongOffset(1)).toString)
     )
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org