You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2018/05/18 21:37:07 UTC

spark git commit: [SPARK-24308][SQL] Handle DataReaderFactory to InputPartition rename in left over classes

Repository: spark
Updated Branches:
  refs/heads/master a53ea70c1 -> 710e4e81a


[SPARK-24308][SQL] Handle DataReaderFactory to InputPartition rename in left over classes

## What changes were proposed in this pull request?

SPARK-24073 renames DataReaderFactory -> InputPartition and DataReader -> InputPartitionReader. Some classes still reflects the old name and causes confusion. This patch renames the left over classes to reflect the new interface and fixes a few comments.

## How was this patch tested?

Existing unit tests.

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Arun Mahadevan <ar...@apache.org>

Closes #21355 from arunmahadevan/SPARK-24308.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/710e4e81
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/710e4e81
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/710e4e81

Branch: refs/heads/master
Commit: 710e4e81a8efc1aacc14283fb57bc8786146f885
Parents: a53ea70
Author: Arun Mahadevan <ar...@apache.org>
Authored: Fri May 18 14:37:01 2018 -0700
Committer: gatorsmile <ga...@gmail.com>
Committed: Fri May 18 14:37:01 2018 -0700

----------------------------------------------------------------------
 .../spark/sql/kafka010/KafkaContinuousReader.scala      |  6 +++---
 .../spark/sql/kafka010/KafkaMicroBatchReader.scala      |  4 ++--
 .../spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala |  2 +-
 .../sql/sources/v2/reader/ContinuousInputPartition.java |  4 ++--
 .../spark/sql/sources/v2/reader/InputPartition.java     |  6 +++---
 .../sql/sources/v2/reader/InputPartitionReader.java     |  6 +++---
 .../sql/execution/datasources/v2/DataSourceRDD.scala    |  6 +++---
 .../continuous/ContinuousRateStreamSource.scala         |  4 ++--
 .../apache/spark/sql/execution/streaming/memory.scala   |  4 ++--
 .../streaming/sources/ContinuousMemoryStream.scala      | 12 ++++++------
 .../streaming/sources/RateStreamMicroBatchReader.scala  |  4 ++--
 .../streaming/sources/RateStreamProviderSuite.scala     |  2 +-
 12 files changed, 30 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/710e4e81/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala
index 88abf8a..badaa69 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala
@@ -106,7 +106,7 @@ class KafkaContinuousReader(
 
     startOffsets.toSeq.map {
       case (topicPartition, start) =>
-        KafkaContinuousDataReaderFactory(
+        KafkaContinuousInputPartition(
           topicPartition, start, kafkaParams, pollTimeoutMs, failOnDataLoss)
           .asInstanceOf[InputPartition[UnsafeRow]]
     }.asJava
@@ -146,7 +146,7 @@ class KafkaContinuousReader(
 }
 
 /**
- * A data reader factory for continuous Kafka processing. This will be serialized and transformed
+ * An input partition for continuous Kafka processing. This will be serialized and transformed
  * into a full reader on executors.
  *
  * @param topicPartition The (topic, partition) pair this task is responsible for.
@@ -156,7 +156,7 @@ class KafkaContinuousReader(
  * @param failOnDataLoss Flag indicating whether data reader should fail if some offsets
  *                       are skipped.
  */
-case class KafkaContinuousDataReaderFactory(
+case class KafkaContinuousInputPartition(
     topicPartition: TopicPartition,
     startOffset: Long,
     kafkaParams: ju.Map[String, Object],

http://git-wip-us.apache.org/repos/asf/spark/blob/710e4e81/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala
index 8a37773..64ba987 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala
@@ -143,7 +143,7 @@ private[kafka010] class KafkaMicroBatchReader(
 
     // Generate factories based on the offset ranges
     val factories = offsetRanges.map { range =>
-      new KafkaMicroBatchDataReaderFactory(
+      new KafkaMicroBatchInputPartition(
         range, executorKafkaParams, pollTimeoutMs, failOnDataLoss, reuseKafkaConsumer)
     }
     factories.map(_.asInstanceOf[InputPartition[UnsafeRow]]).asJava
@@ -300,7 +300,7 @@ private[kafka010] class KafkaMicroBatchReader(
 }
 
 /** A [[InputPartition]] for reading Kafka data in a micro-batch streaming query. */
-private[kafka010] case class KafkaMicroBatchDataReaderFactory(
+private[kafka010] case class KafkaMicroBatchInputPartition(
     offsetRange: KafkaOffsetRange,
     executorKafkaParams: ju.Map[String, Object],
     pollTimeoutMs: Long,

http://git-wip-us.apache.org/repos/asf/spark/blob/710e4e81/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
index 871f970..c6412ea 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
@@ -679,7 +679,7 @@ class KafkaMicroBatchV2SourceSuite extends KafkaMicroBatchSourceSuiteBase {
           Optional.of[OffsetV2](KafkaSourceOffset(Map(tp -> 100L)))
         )
         val factories = reader.planUnsafeInputPartitions().asScala
-          .map(_.asInstanceOf[KafkaMicroBatchDataReaderFactory])
+          .map(_.asInstanceOf[KafkaMicroBatchInputPartition])
         withClue(s"minPartitions = $minPartitions generated factories $factories\n\t") {
           assert(factories.size == numPartitionsGenerated)
           factories.foreach { f => assert(f.reuseKafkaConsumer == reusesConsumers) }

http://git-wip-us.apache.org/repos/asf/spark/blob/710e4e81/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ContinuousInputPartition.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ContinuousInputPartition.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ContinuousInputPartition.java
index c24f3b2..dcb8771 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ContinuousInputPartition.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ContinuousInputPartition.java
@@ -27,9 +27,9 @@ import org.apache.spark.sql.sources.v2.reader.streaming.PartitionOffset;
 @InterfaceStability.Evolving
 public interface ContinuousInputPartition<T> extends InputPartition<T> {
   /**
-   * Create a DataReader with particular offset as its startOffset.
+   * Create an input partition reader with particular offset as its startOffset.
    *
-   * @param offset offset want to set as the DataReader's startOffset.
+   * @param offset offset want to set as the input partition reader's startOffset.
    */
   InputPartitionReader<T> createContinuousReader(PartitionOffset offset);
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/710e4e81/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/InputPartition.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/InputPartition.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/InputPartition.java
index 3524481..f53687e 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/InputPartition.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/InputPartition.java
@@ -36,8 +36,8 @@ import org.apache.spark.annotation.InterfaceStability;
 public interface InputPartition<T> extends Serializable {
 
   /**
-   * The preferred locations where the data reader returned by this partition can run faster,
-   * but Spark does not guarantee to run the data reader on these locations.
+   * The preferred locations where the input partition reader returned by this partition can run faster,
+   * but Spark does not guarantee to run the input partition reader on these locations.
    * The implementations should make sure that it can be run on any location.
    * The location is a string representing the host name.
    *
@@ -53,7 +53,7 @@ public interface InputPartition<T> extends Serializable {
   }
 
   /**
-   * Returns a data reader to do the actual reading work.
+   * Returns an input partition reader to do the actual reading work.
    *
    * If this method fails (by throwing an exception), the corresponding Spark task would fail and
    * get retried until hitting the maximum retry times.

http://git-wip-us.apache.org/repos/asf/spark/blob/710e4e81/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/InputPartitionReader.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/InputPartitionReader.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/InputPartitionReader.java
index 1b7051f..f0d8085 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/InputPartitionReader.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/InputPartitionReader.java
@@ -23,11 +23,11 @@ import java.io.IOException;
 import org.apache.spark.annotation.InterfaceStability;
 
 /**
- * A data reader returned by {@link InputPartition#createPartitionReader()} and is responsible for
+ * An input partition reader returned by {@link InputPartition#createPartitionReader()} and is responsible for
  * outputting data for a RDD partition.
  *
- * Note that, Currently the type `T` can only be {@link org.apache.spark.sql.Row} for normal data
- * source readers, or {@link org.apache.spark.sql.catalyst.expressions.UnsafeRow} for data source
+ * Note that, Currently the type `T` can only be {@link org.apache.spark.sql.Row} for normal input
+ * partition readers, or {@link org.apache.spark.sql.catalyst.expressions.UnsafeRow} for input partition
  * readers that mix in {@link SupportsScanUnsafeRow}.
  */
 @InterfaceStability.Evolving

http://git-wip-us.apache.org/repos/asf/spark/blob/710e4e81/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala
index 1a6b324..8d6fb38 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala
@@ -29,12 +29,12 @@ class DataSourceRDDPartition[T : ClassTag](val index: Int, val inputPartition: I
 
 class DataSourceRDD[T: ClassTag](
     sc: SparkContext,
-    @transient private val readerFactories: Seq[InputPartition[T]])
+    @transient private val inputPartitions: Seq[InputPartition[T]])
   extends RDD[T](sc, Nil) {
 
   override protected def getPartitions: Array[Partition] = {
-    readerFactories.zipWithIndex.map {
-      case (readerFactory, index) => new DataSourceRDDPartition(index, readerFactory)
+    inputPartitions.zipWithIndex.map {
+      case (inputPartition, index) => new DataSourceRDDPartition(index, inputPartition)
     }.toArray
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/710e4e81/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala
index 8d25d9c..516a563 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala
@@ -85,7 +85,7 @@ class RateStreamContinuousReader(options: DataSourceOptions)
       val start = partitionStartMap(i)
       // Have each partition advance by numPartitions each row, with starting points staggered
       // by their partition index.
-      RateStreamContinuousDataReaderFactory(
+      RateStreamContinuousInputPartition(
         start.value,
         start.runTimeMs,
         i,
@@ -113,7 +113,7 @@ class RateStreamContinuousReader(options: DataSourceOptions)
 
 }
 
-case class RateStreamContinuousDataReaderFactory(
+case class RateStreamContinuousInputPartition(
     startValue: Long,
     startTimeMs: Long,
     partitionIndex: Int,

http://git-wip-us.apache.org/repos/asf/spark/blob/710e4e81/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
index daa2963..b137f98 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
@@ -156,7 +156,7 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext)
       logDebug(generateDebugString(newBlocks.flatten, startOrdinal, endOrdinal))
 
       newBlocks.map { block =>
-        new MemoryStreamDataReaderFactory(block).asInstanceOf[InputPartition[UnsafeRow]]
+        new MemoryStreamInputPartition(block).asInstanceOf[InputPartition[UnsafeRow]]
       }.asJava
     }
   }
@@ -201,7 +201,7 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext)
 }
 
 
-class MemoryStreamDataReaderFactory(records: Array[UnsafeRow])
+class MemoryStreamInputPartition(records: Array[UnsafeRow])
   extends InputPartition[UnsafeRow] {
   override def createPartitionReader(): InputPartitionReader[UnsafeRow] = {
     new InputPartitionReader[UnsafeRow] {

http://git-wip-us.apache.org/repos/asf/spark/blob/710e4e81/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ContinuousMemoryStream.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ContinuousMemoryStream.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ContinuousMemoryStream.scala
index 4daafa6..d1c3498 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ContinuousMemoryStream.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ContinuousMemoryStream.scala
@@ -44,8 +44,8 @@ import org.apache.spark.util.RpcUtils
  *  * ContinuousMemoryStream maintains a list of records for each partition. addData() will
  *    distribute records evenly-ish across partitions.
  *  * RecordEndpoint is set up as an endpoint for executor-side
- *    ContinuousMemoryStreamDataReader instances to poll. It returns the record at the specified
- *    offset within the list, or null if that offset doesn't yet have a record.
+ *    ContinuousMemoryStreamInputPartitionReader instances to poll. It returns the record at
+ *    the specified offset within the list, or null if that offset doesn't yet have a record.
  */
 class ContinuousMemoryStream[A : Encoder](id: Int, sqlContext: SQLContext, numPartitions: Int = 2)
   extends MemoryStreamBase[A](sqlContext) with ContinuousReader with ContinuousReadSupport {
@@ -106,7 +106,7 @@ class ContinuousMemoryStream[A : Encoder](id: Int, sqlContext: SQLContext, numPa
 
       startOffset.partitionNums.map {
         case (part, index) =>
-          new ContinuousMemoryStreamDataReaderFactory(
+          new ContinuousMemoryStreamInputPartition(
             endpointName, part, index): InputPartition[Row]
       }.toList.asJava
     }
@@ -157,9 +157,9 @@ object ContinuousMemoryStream {
 }
 
 /**
- * Data reader factory for continuous memory stream.
+ * An input partition for continuous memory stream.
  */
-class ContinuousMemoryStreamDataReaderFactory(
+class ContinuousMemoryStreamInputPartition(
     driverEndpointName: String,
     partition: Int,
     startOffset: Int) extends InputPartition[Row] {
@@ -168,7 +168,7 @@ class ContinuousMemoryStreamDataReaderFactory(
 }
 
 /**
- * Data reader for continuous memory stream.
+ * An input partition reader for continuous memory stream.
  *
  * Polls the driver endpoint for new records.
  */

http://git-wip-us.apache.org/repos/asf/spark/blob/710e4e81/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamMicroBatchReader.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamMicroBatchReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamMicroBatchReader.scala
index 723cc3a..fbff8db 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamMicroBatchReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamMicroBatchReader.scala
@@ -167,7 +167,7 @@ class RateStreamMicroBatchReader(options: DataSourceOptions, checkpointLocation:
     }
 
     (0 until numPartitions).map { p =>
-      new RateStreamMicroBatchDataReaderFactory(
+      new RateStreamMicroBatchInputPartition(
         p, numPartitions, rangeStart, rangeEnd, localStartTimeMs, relativeMsPerValue)
         : InputPartition[Row]
     }.toList.asJava
@@ -182,7 +182,7 @@ class RateStreamMicroBatchReader(options: DataSourceOptions, checkpointLocation:
     s"numPartitions=${options.get(NUM_PARTITIONS).orElse("default")}"
 }
 
-class RateStreamMicroBatchDataReaderFactory(
+class RateStreamMicroBatchInputPartition(
     partitionId: Int,
     numPartitions: Int,
     rangeStart: Long,

http://git-wip-us.apache.org/repos/asf/spark/blob/710e4e81/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala
index 39a010f..bf72e5c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala
@@ -309,7 +309,7 @@ class RateSourceSuite extends StreamTest {
 
     val data = scala.collection.mutable.ListBuffer[Row]()
     tasks.asScala.foreach {
-      case t: RateStreamContinuousDataReaderFactory =>
+      case t: RateStreamContinuousInputPartition =>
         val startTimeMs = reader.getStartOffset()
           .asInstanceOf[RateStreamOffset]
           .partitionToValueAndRunTimeMs(t.partitionIndex)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org