You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2018/05/18 21:37:07 UTC
spark git commit: [SPARK-24308][SQL] Handle DataReaderFactory to
InputPartition rename in left over classes
Repository: spark
Updated Branches:
refs/heads/master a53ea70c1 -> 710e4e81a
[SPARK-24308][SQL] Handle DataReaderFactory to InputPartition rename in left over classes
## What changes were proposed in this pull request?
SPARK-24073 renames DataReaderFactory -> InputPartition and DataReader -> InputPartitionReader. Some classes still reflects the old name and causes confusion. This patch renames the left over classes to reflect the new interface and fixes a few comments.
## How was this patch tested?
Existing unit tests.
Please review http://spark.apache.org/contributing.html before opening a pull request.
Author: Arun Mahadevan <ar...@apache.org>
Closes #21355 from arunmahadevan/SPARK-24308.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/710e4e81
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/710e4e81
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/710e4e81
Branch: refs/heads/master
Commit: 710e4e81a8efc1aacc14283fb57bc8786146f885
Parents: a53ea70
Author: Arun Mahadevan <ar...@apache.org>
Authored: Fri May 18 14:37:01 2018 -0700
Committer: gatorsmile <ga...@gmail.com>
Committed: Fri May 18 14:37:01 2018 -0700
----------------------------------------------------------------------
.../spark/sql/kafka010/KafkaContinuousReader.scala | 6 +++---
.../spark/sql/kafka010/KafkaMicroBatchReader.scala | 4 ++--
.../spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala | 2 +-
.../sql/sources/v2/reader/ContinuousInputPartition.java | 4 ++--
.../spark/sql/sources/v2/reader/InputPartition.java | 6 +++---
.../sql/sources/v2/reader/InputPartitionReader.java | 6 +++---
.../sql/execution/datasources/v2/DataSourceRDD.scala | 6 +++---
.../continuous/ContinuousRateStreamSource.scala | 4 ++--
.../apache/spark/sql/execution/streaming/memory.scala | 4 ++--
.../streaming/sources/ContinuousMemoryStream.scala | 12 ++++++------
.../streaming/sources/RateStreamMicroBatchReader.scala | 4 ++--
.../streaming/sources/RateStreamProviderSuite.scala | 2 +-
12 files changed, 30 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/710e4e81/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala
index 88abf8a..badaa69 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala
@@ -106,7 +106,7 @@ class KafkaContinuousReader(
startOffsets.toSeq.map {
case (topicPartition, start) =>
- KafkaContinuousDataReaderFactory(
+ KafkaContinuousInputPartition(
topicPartition, start, kafkaParams, pollTimeoutMs, failOnDataLoss)
.asInstanceOf[InputPartition[UnsafeRow]]
}.asJava
@@ -146,7 +146,7 @@ class KafkaContinuousReader(
}
/**
- * A data reader factory for continuous Kafka processing. This will be serialized and transformed
+ * An input partition for continuous Kafka processing. This will be serialized and transformed
* into a full reader on executors.
*
* @param topicPartition The (topic, partition) pair this task is responsible for.
@@ -156,7 +156,7 @@ class KafkaContinuousReader(
* @param failOnDataLoss Flag indicating whether data reader should fail if some offsets
* are skipped.
*/
-case class KafkaContinuousDataReaderFactory(
+case class KafkaContinuousInputPartition(
topicPartition: TopicPartition,
startOffset: Long,
kafkaParams: ju.Map[String, Object],
http://git-wip-us.apache.org/repos/asf/spark/blob/710e4e81/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala
index 8a37773..64ba987 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala
@@ -143,7 +143,7 @@ private[kafka010] class KafkaMicroBatchReader(
// Generate factories based on the offset ranges
val factories = offsetRanges.map { range =>
- new KafkaMicroBatchDataReaderFactory(
+ new KafkaMicroBatchInputPartition(
range, executorKafkaParams, pollTimeoutMs, failOnDataLoss, reuseKafkaConsumer)
}
factories.map(_.asInstanceOf[InputPartition[UnsafeRow]]).asJava
@@ -300,7 +300,7 @@ private[kafka010] class KafkaMicroBatchReader(
}
/** A [[InputPartition]] for reading Kafka data in a micro-batch streaming query. */
-private[kafka010] case class KafkaMicroBatchDataReaderFactory(
+private[kafka010] case class KafkaMicroBatchInputPartition(
offsetRange: KafkaOffsetRange,
executorKafkaParams: ju.Map[String, Object],
pollTimeoutMs: Long,
http://git-wip-us.apache.org/repos/asf/spark/blob/710e4e81/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
index 871f970..c6412ea 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
@@ -679,7 +679,7 @@ class KafkaMicroBatchV2SourceSuite extends KafkaMicroBatchSourceSuiteBase {
Optional.of[OffsetV2](KafkaSourceOffset(Map(tp -> 100L)))
)
val factories = reader.planUnsafeInputPartitions().asScala
- .map(_.asInstanceOf[KafkaMicroBatchDataReaderFactory])
+ .map(_.asInstanceOf[KafkaMicroBatchInputPartition])
withClue(s"minPartitions = $minPartitions generated factories $factories\n\t") {
assert(factories.size == numPartitionsGenerated)
factories.foreach { f => assert(f.reuseKafkaConsumer == reusesConsumers) }
http://git-wip-us.apache.org/repos/asf/spark/blob/710e4e81/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ContinuousInputPartition.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ContinuousInputPartition.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ContinuousInputPartition.java
index c24f3b2..dcb8771 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ContinuousInputPartition.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ContinuousInputPartition.java
@@ -27,9 +27,9 @@ import org.apache.spark.sql.sources.v2.reader.streaming.PartitionOffset;
@InterfaceStability.Evolving
public interface ContinuousInputPartition<T> extends InputPartition<T> {
/**
- * Create a DataReader with particular offset as its startOffset.
+ * Create an input partition reader with particular offset as its startOffset.
*
- * @param offset offset want to set as the DataReader's startOffset.
+ * @param offset offset want to set as the input partition reader's startOffset.
*/
InputPartitionReader<T> createContinuousReader(PartitionOffset offset);
}
http://git-wip-us.apache.org/repos/asf/spark/blob/710e4e81/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/InputPartition.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/InputPartition.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/InputPartition.java
index 3524481..f53687e 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/InputPartition.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/InputPartition.java
@@ -36,8 +36,8 @@ import org.apache.spark.annotation.InterfaceStability;
public interface InputPartition<T> extends Serializable {
/**
- * The preferred locations where the data reader returned by this partition can run faster,
- * but Spark does not guarantee to run the data reader on these locations.
+ * The preferred locations where the input partition reader returned by this partition can run faster,
+ * but Spark does not guarantee to run the input partition reader on these locations.
* The implementations should make sure that it can be run on any location.
* The location is a string representing the host name.
*
@@ -53,7 +53,7 @@ public interface InputPartition<T> extends Serializable {
}
/**
- * Returns a data reader to do the actual reading work.
+ * Returns an input partition reader to do the actual reading work.
*
* If this method fails (by throwing an exception), the corresponding Spark task would fail and
* get retried until hitting the maximum retry times.
http://git-wip-us.apache.org/repos/asf/spark/blob/710e4e81/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/InputPartitionReader.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/InputPartitionReader.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/InputPartitionReader.java
index 1b7051f..f0d8085 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/InputPartitionReader.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/InputPartitionReader.java
@@ -23,11 +23,11 @@ import java.io.IOException;
import org.apache.spark.annotation.InterfaceStability;
/**
- * A data reader returned by {@link InputPartition#createPartitionReader()} and is responsible for
+ * An input partition reader returned by {@link InputPartition#createPartitionReader()} and is responsible for
* outputting data for a RDD partition.
*
- * Note that, Currently the type `T` can only be {@link org.apache.spark.sql.Row} for normal data
- * source readers, or {@link org.apache.spark.sql.catalyst.expressions.UnsafeRow} for data source
+ * Note that, Currently the type `T` can only be {@link org.apache.spark.sql.Row} for normal input
+ * partition readers, or {@link org.apache.spark.sql.catalyst.expressions.UnsafeRow} for input partition
* readers that mix in {@link SupportsScanUnsafeRow}.
*/
@InterfaceStability.Evolving
http://git-wip-us.apache.org/repos/asf/spark/blob/710e4e81/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala
index 1a6b324..8d6fb38 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala
@@ -29,12 +29,12 @@ class DataSourceRDDPartition[T : ClassTag](val index: Int, val inputPartition: I
class DataSourceRDD[T: ClassTag](
sc: SparkContext,
- @transient private val readerFactories: Seq[InputPartition[T]])
+ @transient private val inputPartitions: Seq[InputPartition[T]])
extends RDD[T](sc, Nil) {
override protected def getPartitions: Array[Partition] = {
- readerFactories.zipWithIndex.map {
- case (readerFactory, index) => new DataSourceRDDPartition(index, readerFactory)
+ inputPartitions.zipWithIndex.map {
+ case (inputPartition, index) => new DataSourceRDDPartition(index, inputPartition)
}.toArray
}
http://git-wip-us.apache.org/repos/asf/spark/blob/710e4e81/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala
index 8d25d9c..516a563 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala
@@ -85,7 +85,7 @@ class RateStreamContinuousReader(options: DataSourceOptions)
val start = partitionStartMap(i)
// Have each partition advance by numPartitions each row, with starting points staggered
// by their partition index.
- RateStreamContinuousDataReaderFactory(
+ RateStreamContinuousInputPartition(
start.value,
start.runTimeMs,
i,
@@ -113,7 +113,7 @@ class RateStreamContinuousReader(options: DataSourceOptions)
}
-case class RateStreamContinuousDataReaderFactory(
+case class RateStreamContinuousInputPartition(
startValue: Long,
startTimeMs: Long,
partitionIndex: Int,
http://git-wip-us.apache.org/repos/asf/spark/blob/710e4e81/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
index daa2963..b137f98 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
@@ -156,7 +156,7 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext)
logDebug(generateDebugString(newBlocks.flatten, startOrdinal, endOrdinal))
newBlocks.map { block =>
- new MemoryStreamDataReaderFactory(block).asInstanceOf[InputPartition[UnsafeRow]]
+ new MemoryStreamInputPartition(block).asInstanceOf[InputPartition[UnsafeRow]]
}.asJava
}
}
@@ -201,7 +201,7 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext)
}
-class MemoryStreamDataReaderFactory(records: Array[UnsafeRow])
+class MemoryStreamInputPartition(records: Array[UnsafeRow])
extends InputPartition[UnsafeRow] {
override def createPartitionReader(): InputPartitionReader[UnsafeRow] = {
new InputPartitionReader[UnsafeRow] {
http://git-wip-us.apache.org/repos/asf/spark/blob/710e4e81/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ContinuousMemoryStream.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ContinuousMemoryStream.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ContinuousMemoryStream.scala
index 4daafa6..d1c3498 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ContinuousMemoryStream.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ContinuousMemoryStream.scala
@@ -44,8 +44,8 @@ import org.apache.spark.util.RpcUtils
* * ContinuousMemoryStream maintains a list of records for each partition. addData() will
* distribute records evenly-ish across partitions.
* * RecordEndpoint is set up as an endpoint for executor-side
- * ContinuousMemoryStreamDataReader instances to poll. It returns the record at the specified
- * offset within the list, or null if that offset doesn't yet have a record.
+ * ContinuousMemoryStreamInputPartitionReader instances to poll. It returns the record at
+ * the specified offset within the list, or null if that offset doesn't yet have a record.
*/
class ContinuousMemoryStream[A : Encoder](id: Int, sqlContext: SQLContext, numPartitions: Int = 2)
extends MemoryStreamBase[A](sqlContext) with ContinuousReader with ContinuousReadSupport {
@@ -106,7 +106,7 @@ class ContinuousMemoryStream[A : Encoder](id: Int, sqlContext: SQLContext, numPa
startOffset.partitionNums.map {
case (part, index) =>
- new ContinuousMemoryStreamDataReaderFactory(
+ new ContinuousMemoryStreamInputPartition(
endpointName, part, index): InputPartition[Row]
}.toList.asJava
}
@@ -157,9 +157,9 @@ object ContinuousMemoryStream {
}
/**
- * Data reader factory for continuous memory stream.
+ * An input partition for continuous memory stream.
*/
-class ContinuousMemoryStreamDataReaderFactory(
+class ContinuousMemoryStreamInputPartition(
driverEndpointName: String,
partition: Int,
startOffset: Int) extends InputPartition[Row] {
@@ -168,7 +168,7 @@ class ContinuousMemoryStreamDataReaderFactory(
}
/**
- * Data reader for continuous memory stream.
+ * An input partition reader for continuous memory stream.
*
* Polls the driver endpoint for new records.
*/
http://git-wip-us.apache.org/repos/asf/spark/blob/710e4e81/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamMicroBatchReader.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamMicroBatchReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamMicroBatchReader.scala
index 723cc3a..fbff8db 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamMicroBatchReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamMicroBatchReader.scala
@@ -167,7 +167,7 @@ class RateStreamMicroBatchReader(options: DataSourceOptions, checkpointLocation:
}
(0 until numPartitions).map { p =>
- new RateStreamMicroBatchDataReaderFactory(
+ new RateStreamMicroBatchInputPartition(
p, numPartitions, rangeStart, rangeEnd, localStartTimeMs, relativeMsPerValue)
: InputPartition[Row]
}.toList.asJava
@@ -182,7 +182,7 @@ class RateStreamMicroBatchReader(options: DataSourceOptions, checkpointLocation:
s"numPartitions=${options.get(NUM_PARTITIONS).orElse("default")}"
}
-class RateStreamMicroBatchDataReaderFactory(
+class RateStreamMicroBatchInputPartition(
partitionId: Int,
numPartitions: Int,
rangeStart: Long,
http://git-wip-us.apache.org/repos/asf/spark/blob/710e4e81/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala
index 39a010f..bf72e5c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala
@@ -309,7 +309,7 @@ class RateSourceSuite extends StreamTest {
val data = scala.collection.mutable.ListBuffer[Row]()
tasks.asScala.foreach {
- case t: RateStreamContinuousDataReaderFactory =>
+ case t: RateStreamContinuousInputPartition =>
val startTimeMs = reader.getStartOffset()
.asInstanceOf[RateStreamOffset]
.partitionToValueAndRunTimeMs(t.partitionIndex)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org