You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2016/09/23 19:17:58 UTC

spark git commit: [SPARK-17643] Remove comparable requirement from Offset

Repository: spark
Updated Branches:
  refs/heads/master f62ddc598 -> 988c71457


[SPARK-17643] Remove comparable requirement from Offset

For some sources, it is difficult to provide a global ordering based only on the data in the offset.  Since we don't use comparison for correctness, lets remove it.

Author: Michael Armbrust <mi...@databricks.com>

Closes #15207 from marmbrus/removeComparable.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/988c7145
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/988c7145
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/988c7145

Branch: refs/heads/master
Commit: 988c71457354b0a443471f501cef544a85b1a76a
Parents: f62ddc5
Author: Michael Armbrust <mi...@databricks.com>
Authored: Fri Sep 23 12:17:59 2016 -0700
Committer: Tathagata Das <ta...@gmail.com>
Committed: Fri Sep 23 12:17:59 2016 -0700

----------------------------------------------------------------------
 .../execution/streaming/CompositeOffset.scala   | 30 ---------------
 .../sql/execution/streaming/LongOffset.scala    |  6 ---
 .../spark/sql/execution/streaming/Offset.scala  | 19 ++--------
 .../execution/streaming/StreamExecution.scala   |  9 +++--
 .../spark/sql/streaming/OffsetSuite.scala       | 39 --------------------
 5 files changed, 9 insertions(+), 94 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/988c7145/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompositeOffset.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompositeOffset.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompositeOffset.scala
index 729c846..ebc6ee8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompositeOffset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompositeOffset.scala
@@ -24,36 +24,6 @@ package org.apache.spark.sql.execution.streaming
  */
 case class CompositeOffset(offsets: Seq[Option[Offset]]) extends Offset {
   /**
-   * Returns a negative integer, zero, or a positive integer as this object is less than, equal to,
-   * or greater than the specified object.
-   */
-  override def compareTo(other: Offset): Int = other match {
-    case otherComposite: CompositeOffset if otherComposite.offsets.size == offsets.size =>
-      val comparisons = offsets.zip(otherComposite.offsets).map {
-        case (Some(a), Some(b)) => a compareTo b
-        case (None, None) => 0
-        case (None, _) => -1
-        case (_, None) => 1
-      }
-      val nonZeroSigns = comparisons.map(sign).filter(_ != 0).toSet
-      nonZeroSigns.size match {
-        case 0 => 0                       // if both empty or only 0s
-        case 1 => nonZeroSigns.head       // if there are only (0s and 1s) or (0s and -1s)
-        case _ =>                         // there are both 1s and -1s
-          throw new IllegalArgumentException(
-            s"Invalid comparison between non-linear histories: $this <=> $other")
-      }
-    case _ =>
-      throw new IllegalArgumentException(s"Cannot compare $this <=> $other")
-  }
-
-  private def sign(num: Int): Int = num match {
-    case i if i < 0 => -1
-    case i if i == 0 => 0
-    case i if i > 0 => 1
-  }
-
-  /**
    * Unpacks an offset into [[StreamProgress]] by associating each offset with the order list of
    * sources.
    *

http://git-wip-us.apache.org/repos/asf/spark/blob/988c7145/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/LongOffset.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/LongOffset.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/LongOffset.scala
index bb17640..c5e8827 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/LongOffset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/LongOffset.scala
@@ -22,12 +22,6 @@ package org.apache.spark.sql.execution.streaming
  */
 case class LongOffset(offset: Long) extends Offset {
 
-  override def compareTo(other: Offset): Int = other match {
-    case l: LongOffset => offset.compareTo(l.offset)
-    case _ =>
-      throw new IllegalArgumentException(s"Invalid comparison of $getClass with ${other.getClass}")
-  }
-
   def +(increment: Long): LongOffset = new LongOffset(offset + increment)
   def -(decrement: Long): LongOffset = new LongOffset(offset - decrement)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/988c7145/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Offset.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Offset.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Offset.scala
index 2cc0128..1f52abf 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Offset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Offset.scala
@@ -19,19 +19,8 @@ package org.apache.spark.sql.execution.streaming
 
 /**
  * An offset is a monotonically increasing metric used to track progress in the computation of a
- * stream. An [[Offset]] must be comparable, and the result of `compareTo` must be consistent
- * with `equals` and `hashcode`.
+ * stream. Since offsets are retrieved from a [[Source]] by a single thread, we know the global
+ * ordering of two [[Offset]] instances.  We do assume that if two offsets are `equal` then no
+ * new data has arrived.
  */
-trait Offset extends Serializable {
-
-  /**
-   * Returns a negative integer, zero, or a positive integer as this object is less than, equal to,
-   * or greater than the specified object.
-   */
-  def compareTo(other: Offset): Int
-
-  def >(other: Offset): Boolean = compareTo(other) > 0
-  def <(other: Offset): Boolean = compareTo(other) < 0
-  def <=(other: Offset): Boolean = compareTo(other) <= 0
-  def >=(other: Offset): Boolean = compareTo(other) >= 0
-}
+trait Offset extends Serializable {}

http://git-wip-us.apache.org/repos/asf/spark/blob/988c7145/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 220f77d..9825f19 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -259,7 +259,7 @@ class StreamExecution(
       case (source, available) =>
         committedOffsets
             .get(source)
-            .map(committed => committed < available)
+            .map(committed => committed != available)
             .getOrElse(true)
     }
   }
@@ -318,7 +318,8 @@ class StreamExecution(
 
     // Request unprocessed data from all sources.
     val newData = availableOffsets.flatMap {
-      case (source, available) if committedOffsets.get(source).map(_ < available).getOrElse(true) =>
+      case (source, available)
+          if committedOffsets.get(source).map(_ != available).getOrElse(true) =>
         val current = committedOffsets.get(source)
         val batch = source.getBatch(current, available)
         logDebug(s"Retrieving data from $source: $current -> $available")
@@ -404,10 +405,10 @@ class StreamExecution(
    * Blocks the current thread until processing for data from the given `source` has reached at
    * least the given `Offset`. This method is indented for use primarily when writing tests.
    */
-  def awaitOffset(source: Source, newOffset: Offset): Unit = {
+  private[sql] def awaitOffset(source: Source, newOffset: Offset): Unit = {
     def notDone = {
       val localCommittedOffsets = committedOffsets
-      !localCommittedOffsets.contains(source) || localCommittedOffsets(source) < newOffset
+      !localCommittedOffsets.contains(source) || localCommittedOffsets(source) != newOffset
     }
 
     while (notDone) {

http://git-wip-us.apache.org/repos/asf/spark/blob/988c7145/sql/core/src/test/scala/org/apache/spark/sql/streaming/OffsetSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/OffsetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/OffsetSuite.scala
index 9590af4..b65a987 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/OffsetSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/OffsetSuite.scala
@@ -24,44 +24,12 @@ trait OffsetSuite extends SparkFunSuite {
   /** Creates test to check all the comparisons of offsets given a `one` that is less than `two`. */
   def compare(one: Offset, two: Offset): Unit = {
     test(s"comparison $one <=> $two") {
-      assert(one < two)
-      assert(one <= two)
-      assert(one <= one)
-      assert(two > one)
-      assert(two >= one)
-      assert(one >= one)
       assert(one == one)
       assert(two == two)
       assert(one != two)
       assert(two != one)
     }
   }
-
-  /** Creates test to check that non-equality comparisons throw exception. */
-  def compareInvalid(one: Offset, two: Offset): Unit = {
-    test(s"invalid comparison $one <=> $two") {
-      intercept[IllegalArgumentException] {
-        assert(one < two)
-      }
-
-      intercept[IllegalArgumentException] {
-        assert(one <= two)
-      }
-
-      intercept[IllegalArgumentException] {
-        assert(one > two)
-      }
-
-      intercept[IllegalArgumentException] {
-        assert(one >= two)
-      }
-
-      assert(!(one == two))
-      assert(!(two == one))
-      assert(one != two)
-      assert(two != one)
-    }
-  }
 }
 
 class LongOffsetSuite extends OffsetSuite {
@@ -79,10 +47,6 @@ class CompositeOffsetSuite extends OffsetSuite {
     one = CompositeOffset(None :: Nil),
     two = CompositeOffset(Some(LongOffset(2)) :: Nil))
 
-  compareInvalid(                                               // sizes must be same
-    one = CompositeOffset(Nil),
-    two = CompositeOffset(Some(LongOffset(2)) :: Nil))
-
   compare(
     one = CompositeOffset.fill(LongOffset(0), LongOffset(1)),
     two = CompositeOffset.fill(LongOffset(1), LongOffset(2)))
@@ -91,8 +55,5 @@ class CompositeOffsetSuite extends OffsetSuite {
     one = CompositeOffset.fill(LongOffset(1), LongOffset(1)),
     two = CompositeOffset.fill(LongOffset(1), LongOffset(2)))
 
-  compareInvalid(
-    one = CompositeOffset.fill(LongOffset(2), LongOffset(1)),   // vector time inconsistent
-    two = CompositeOffset.fill(LongOffset(1), LongOffset(2)))
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org