You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2018/02/03 04:49:13 UTC

spark git commit: [SPARK-23317][SQL] rename ContinuousReader.setOffset to setStartOffset

Repository: spark
Updated Branches:
  refs/heads/master 3ff83ad43 -> fe73cb4b4


[SPARK-23317][SQL] rename ContinuousReader.setOffset to setStartOffset

## What changes were proposed in this pull request?

In the document of `ContinuousReader.setOffset`, we say this method is used to specify the start offset. We also have a `ContinuousReader.getStartOffset` to get the value back. I think it makes more sense to rename `ContinuousReader.setOffset` to `setStartOffset`.

## How was this patch tested?

N/A

Author: Wenchen Fan <we...@databricks.com>

Closes #20486 from cloud-fan/rename.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fe73cb4b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fe73cb4b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fe73cb4b

Branch: refs/heads/master
Commit: fe73cb4b439169f16cc24cd851a11fd398ce7edf
Parents: 3ff83ad
Author: Wenchen Fan <we...@databricks.com>
Authored: Fri Feb 2 20:49:08 2018 -0800
Committer: gatorsmile <ga...@gmail.com>
Committed: Fri Feb 2 20:49:08 2018 -0800

----------------------------------------------------------------------
 .../org/apache/spark/sql/kafka010/KafkaContinuousReader.scala    | 2 +-
 .../spark/sql/sources/v2/reader/streaming/ContinuousReader.java  | 4 ++--
 .../sql/execution/streaming/continuous/ContinuousExecution.scala | 2 +-
 .../streaming/continuous/ContinuousRateStreamSource.scala        | 2 +-
 .../apache/spark/sql/execution/streaming/RateSourceV2Suite.scala | 2 +-
 .../spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala | 2 +-
 6 files changed, 7 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/fe73cb4b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala
index 41c443b..b049a05 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala
@@ -71,7 +71,7 @@ class KafkaContinuousReader(
   override def readSchema: StructType = KafkaOffsetReader.kafkaSchema
 
   private var offset: Offset = _
-  override def setOffset(start: ju.Optional[Offset]): Unit = {
+  override def setStartOffset(start: ju.Optional[Offset]): Unit = {
     offset = start.orElse {
       val offsets = initialOffsets match {
         case EarliestOffsetRangeLimit => KafkaSourceOffset(offsetReader.fetchEarliestOffsets())

http://git-wip-us.apache.org/repos/asf/spark/blob/fe73cb4b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousReader.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousReader.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousReader.java
index d1d1e7f..7fe7f00 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousReader.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousReader.java
@@ -51,12 +51,12 @@ public interface ContinuousReader extends BaseStreamingSource, DataSourceReader
      * start from the first record after the provided offset, or from an implementation-defined
      * inferred starting point if no offset is provided.
      */
-    void setOffset(Optional<Offset> start);
+    void setStartOffset(Optional<Offset> start);
 
     /**
      * Return the specified or inferred start offset for this reader.
      *
-     * @throws IllegalStateException if setOffset has not been called
+     * @throws IllegalStateException if setStartOffset has not been called
      */
     Offset getStartOffset();
 

http://git-wip-us.apache.org/repos/asf/spark/blob/fe73cb4b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
index 08c8141..ed22b91 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
@@ -181,7 +181,7 @@ class ContinuousExecution(
 
         val loggedOffset = offsets.offsets(0)
         val realOffset = loggedOffset.map(off => reader.deserializeOffset(off.json))
-        reader.setOffset(java.util.Optional.ofNullable(realOffset.orNull))
+        reader.setStartOffset(java.util.Optional.ofNullable(realOffset.orNull))
         new StreamingDataSourceV2Relation(newOutput, reader)
     }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/fe73cb4b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala
index 0eaaa48..b63d8d3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala
@@ -61,7 +61,7 @@ class RateStreamContinuousReader(options: DataSourceOptions)
 
   private var offset: Offset = _
 
-  override def setOffset(offset: java.util.Optional[Offset]): Unit = {
+  override def setStartOffset(offset: java.util.Optional[Offset]): Unit = {
     this.offset = offset.orElse(RateStreamSourceV2.createInitialOffset(numPartitions, creationTime))
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/fe73cb4b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/RateSourceV2Suite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/RateSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/RateSourceV2Suite.scala
index 3158995..0d68d9c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/RateSourceV2Suite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/RateSourceV2Suite.scala
@@ -160,7 +160,7 @@ class RateSourceV2Suite extends StreamTest {
   test("continuous data") {
     val reader = new RateStreamContinuousReader(
       new DataSourceOptions(Map("numPartitions" -> "2", "rowsPerSecond" -> "20").asJava))
-    reader.setOffset(Optional.empty())
+    reader.setStartOffset(Optional.empty())
     val tasks = reader.createDataReaderFactories()
     assert(tasks.size == 2)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/fe73cb4b/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala
index cb873ab..51f44fa 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala
@@ -43,7 +43,7 @@ case class FakeReader() extends MicroBatchReader with ContinuousReader {
   def readSchema(): StructType = StructType(Seq())
   def stop(): Unit = {}
   def mergeOffsets(offsets: Array[PartitionOffset]): Offset = RateStreamOffset(Map())
-  def setOffset(start: Optional[Offset]): Unit = {}
+  def setStartOffset(start: Optional[Offset]): Unit = {}
 
   def createDataReaderFactories(): java.util.ArrayList[DataReaderFactory[Row]] = {
     throw new IllegalStateException("fake source - cannot actually read")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org