You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2018/02/03 04:49:13 UTC
spark git commit: [SPARK-23317][SQL] rename
ContinuousReader.setOffset to setStartOffset
Repository: spark
Updated Branches:
refs/heads/master 3ff83ad43 -> fe73cb4b4
[SPARK-23317][SQL] rename ContinuousReader.setOffset to setStartOffset
## What changes were proposed in this pull request?
In the document of `ContinuousReader.setOffset`, we say this method is used to specify the start offset. We also have a `ContinuousReader.getStartOffset` to get the value back. I think it makes more sense to rename `ContinuousReader.setOffset` to `setStartOffset`.
## How was this patch tested?
N/A
Author: Wenchen Fan <we...@databricks.com>
Closes #20486 from cloud-fan/rename.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fe73cb4b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fe73cb4b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fe73cb4b
Branch: refs/heads/master
Commit: fe73cb4b439169f16cc24cd851a11fd398ce7edf
Parents: 3ff83ad
Author: Wenchen Fan <we...@databricks.com>
Authored: Fri Feb 2 20:49:08 2018 -0800
Committer: gatorsmile <ga...@gmail.com>
Committed: Fri Feb 2 20:49:08 2018 -0800
----------------------------------------------------------------------
.../org/apache/spark/sql/kafka010/KafkaContinuousReader.scala | 2 +-
.../spark/sql/sources/v2/reader/streaming/ContinuousReader.java | 4 ++--
.../sql/execution/streaming/continuous/ContinuousExecution.scala | 2 +-
.../streaming/continuous/ContinuousRateStreamSource.scala | 2 +-
.../apache/spark/sql/execution/streaming/RateSourceV2Suite.scala | 2 +-
.../spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala | 2 +-
6 files changed, 7 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/fe73cb4b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala
index 41c443b..b049a05 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala
@@ -71,7 +71,7 @@ class KafkaContinuousReader(
override def readSchema: StructType = KafkaOffsetReader.kafkaSchema
private var offset: Offset = _
- override def setOffset(start: ju.Optional[Offset]): Unit = {
+ override def setStartOffset(start: ju.Optional[Offset]): Unit = {
offset = start.orElse {
val offsets = initialOffsets match {
case EarliestOffsetRangeLimit => KafkaSourceOffset(offsetReader.fetchEarliestOffsets())
http://git-wip-us.apache.org/repos/asf/spark/blob/fe73cb4b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousReader.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousReader.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousReader.java
index d1d1e7f..7fe7f00 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousReader.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousReader.java
@@ -51,12 +51,12 @@ public interface ContinuousReader extends BaseStreamingSource, DataSourceReader
* start from the first record after the provided offset, or from an implementation-defined
* inferred starting point if no offset is provided.
*/
- void setOffset(Optional<Offset> start);
+ void setStartOffset(Optional<Offset> start);
/**
* Return the specified or inferred start offset for this reader.
*
- * @throws IllegalStateException if setOffset has not been called
+ * @throws IllegalStateException if setStartOffset has not been called
*/
Offset getStartOffset();
http://git-wip-us.apache.org/repos/asf/spark/blob/fe73cb4b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
index 08c8141..ed22b91 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
@@ -181,7 +181,7 @@ class ContinuousExecution(
val loggedOffset = offsets.offsets(0)
val realOffset = loggedOffset.map(off => reader.deserializeOffset(off.json))
- reader.setOffset(java.util.Optional.ofNullable(realOffset.orNull))
+ reader.setStartOffset(java.util.Optional.ofNullable(realOffset.orNull))
new StreamingDataSourceV2Relation(newOutput, reader)
}
http://git-wip-us.apache.org/repos/asf/spark/blob/fe73cb4b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala
index 0eaaa48..b63d8d3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala
@@ -61,7 +61,7 @@ class RateStreamContinuousReader(options: DataSourceOptions)
private var offset: Offset = _
- override def setOffset(offset: java.util.Optional[Offset]): Unit = {
+ override def setStartOffset(offset: java.util.Optional[Offset]): Unit = {
this.offset = offset.orElse(RateStreamSourceV2.createInitialOffset(numPartitions, creationTime))
}
http://git-wip-us.apache.org/repos/asf/spark/blob/fe73cb4b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/RateSourceV2Suite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/RateSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/RateSourceV2Suite.scala
index 3158995..0d68d9c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/RateSourceV2Suite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/RateSourceV2Suite.scala
@@ -160,7 +160,7 @@ class RateSourceV2Suite extends StreamTest {
test("continuous data") {
val reader = new RateStreamContinuousReader(
new DataSourceOptions(Map("numPartitions" -> "2", "rowsPerSecond" -> "20").asJava))
- reader.setOffset(Optional.empty())
+ reader.setStartOffset(Optional.empty())
val tasks = reader.createDataReaderFactories()
assert(tasks.size == 2)
http://git-wip-us.apache.org/repos/asf/spark/blob/fe73cb4b/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala
index cb873ab..51f44fa 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala
@@ -43,7 +43,7 @@ case class FakeReader() extends MicroBatchReader with ContinuousReader {
def readSchema(): StructType = StructType(Seq())
def stop(): Unit = {}
def mergeOffsets(offsets: Array[PartitionOffset]): Offset = RateStreamOffset(Map())
- def setOffset(start: Optional[Offset]): Unit = {}
+ def setStartOffset(start: Optional[Offset]): Unit = {}
def createDataReaderFactories(): java.util.ArrayList[DataReaderFactory[Row]] = {
throw new IllegalStateException("fake source - cannot actually read")
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org