You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by cloud-fan <gi...@git.apache.org> on 2018/09/25 16:59:25 UTC

[GitHub] spark pull request #22547: [SPARK-25528][SQL] data source V2 read side API r...

GitHub user cloud-fan opened a pull request:

    https://github.com/apache/spark/pull/22547

    [SPARK-25528][SQL] data source V2 read side API refactoring

    ## What changes were proposed in this pull request?
    
    Refactor the read side API according to the abstraction proposed in the [dev list](http://apache-spark-developers-list.1001551.n3.nabble.com/data-source-api-v2-refactoring-td24848.html)
    
    ```
    batch: catalog -> table -> scan
    streaming: catalog -> table -> stream -> scan
    ```
    
    More concretely, this PR
    1. add a new interface called `Format` that can return `Table`
    2. rename `ReadSupportProvider` to `Table`, represents a logical data set, with a schema.
    3. add a new interface `InputStream` to represent a streaming source in a streaming query. It can create `Scan`s.
    4. rename `ReadSupport` to `Scan`. Each `Scan` triggers one Spark job. (like an RDD)
    
    ## How was this patch tested?
    
    existing tests.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/cloud-fan/spark new-idea

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/22547.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #22547
    
----
commit 92dfdaf990f2676d49766f5ab094e8b8a9a755b1
Author: Wenchen Fan <we...@...>
Date:   2018-08-27T15:20:08Z

    data source V2 read side API refactoring

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22547: [SPARK-25528][SQL] data source V2 read side API refactor...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22547
  
    **[Test build #97538 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/97538/testReport)** for PR 22547 at commit [`f2ea923`](https://github.com/apache/spark/commit/f2ea923d91ec6fb1a2777dee4bcb728120000ef6).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22547: [SPARK-25528][SQL] data source V2 read side API r...

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22547#discussion_r226785695
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/BatchScan.java ---
    @@ -0,0 +1,43 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.sources.v2.reader;
    +
    +import org.apache.spark.annotation.InterfaceStability;
    +import org.apache.spark.sql.sources.v2.DataSourceOptions;
    +import org.apache.spark.sql.sources.v2.SupportsBatchRead;
    +import org.apache.spark.sql.sources.v2.Table;
    +
    +/**
    + * A {@link Scan} for batch queries.
    + *
    + * The execution engine will get an instance of {@link Table} first, then call
    + * {@link Table#newScanConfigBuilder(DataSourceOptions)} and create an instance of
    + * {@link ScanConfig}. The {@link ScanConfigBuilder} can apply operator pushdown and keep the
    + * pushdown result in {@link ScanConfig}. Then
    + * {@link SupportsBatchRead#createBatchScan(ScanConfig, DataSourceOptions)} will be called to create
    + * a {@link BatchScan} instance, which will be used to create input partitions and reader factory to
    + * scan data from the data source with a Spark job.
    + */
    +@InterfaceStability.Evolving
    +public interface BatchScan extends Scan {
    +
    +  /**
    +   * Returns a factory, which produces one {@link PartitionReader} for one {@link InputPartition}.
    +   */
    +  PartitionReaderFactory createReaderFactory();
    --- End diff --
    
    Why are `BatchScan` and `PartitionReaderFactory` different interfaces?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22547: [SPARK-25528][SQL] data source V2 read side API r...

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22547#discussion_r226798213
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/Format.java ---
    @@ -0,0 +1,60 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.sources.v2;
    +
    +import org.apache.spark.annotation.InterfaceStability;
    +import org.apache.spark.sql.sources.DataSourceRegister;
    +import org.apache.spark.sql.types.StructType;
    +
    +/**
    + * The base interface for data source v2. Implementations must have a public, 0-arg constructor.
    + *
    + * The major responsibility of this interface is to return a {@link Table} for read/write.
    + */
    +@InterfaceStability.Evolving
    +public interface Format extends DataSourceV2 {
    --- End diff --
    
    Why is there both Format and DataSourceV2? What does DataSourceV2 do?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22547: [SPARK-25528][SQL] data source V2 read side API refactor...

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on the issue:

    https://github.com/apache/spark/pull/22547
  
    I agree that there is consensus for the proposal in the design doc and I don't think there are any blockers. If there's something I can do to help, please let me know. Otherwise ping me to review!


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22547: [SPARK-25528][SQL] data source V2 read side API refactor...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22547
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/3448/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22547: [SPARK-25528][SQL] data source V2 read side API refactor...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22547
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/3854/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22547: [SPARK-25528][SQL] data source V2 read side API refactor...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22547
  
    **[Test build #96563 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/96563/testReport)** for PR 22547 at commit [`92dfdaf`](https://github.com/apache/spark/commit/92dfdaf990f2676d49766f5ab094e8b8a9a755b1).
     * This patch **fails to generate documentation**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22547: [SPARK-25528][SQL] data source V2 read side API refactor...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22547
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22547: [SPARK-25528][SQL] data source V2 read side API refactor...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22547
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22547: [SPARK-25528][SQL] data source V2 read side API refactor...

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on the issue:

    https://github.com/apache/spark/pull/22547
  
    @cloud-fan, is there a design doc that outlines these changes and the new API structure?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22547: [SPARK-25528][SQL] data source V2 read side API refactor...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22547
  
    **[Test build #97538 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/97538/testReport)** for PR 22547 at commit [`f2ea923`](https://github.com/apache/spark/commit/f2ea923d91ec6fb1a2777dee4bcb728120000ef6).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22547: [SPARK-25528][SQL] data source V2 read side API r...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan closed the pull request at:

    https://github.com/apache/spark/pull/22547


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22547: [SPARK-25528][SQL] data source V2 read side API r...

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22547#discussion_r226790252
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/InputStream.java ---
    @@ -17,14 +17,18 @@
     
     package org.apache.spark.sql.sources.v2.reader.streaming;
     
    -import org.apache.spark.sql.sources.v2.reader.ReadSupport;
    +import org.apache.spark.annotation.InterfaceStability;
    +import org.apache.spark.sql.execution.streaming.BaseStreamingSource;
     
     /**
    - * A base interface for streaming read support. This is package private and is invisible to data
    - * sources. Data sources should implement concrete streaming read support interfaces:
    - * {@link MicroBatchReadSupport} or {@link ContinuousReadSupport}.
    + * An interface representing a readable data stream in a streaming query. It's responsible to manage
    + * the offsets of the streaming source in this streaming query.
    + *
    + * Data sources should implement concrete input stream interfaces: {@link MicroBatchInputStream} and
    + * {@link ContinuousInputStream}.
      */
    -interface StreamingReadSupport extends ReadSupport {
    +@InterfaceStability.Evolving
    +public interface InputStream extends BaseStreamingSource {
    --- End diff --
    
    `InputStream` conflicts with a well-known JVM class, [`java.io.InputStream`](https://docs.oracle.com/javase/9/docs/api/java/io/InputStream.html). I think this should be renamed to be more specific to a streaming table scan.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22547: [SPARK-25528][SQL] data source V2 read side API refactor...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22547
  
    **[Test build #96567 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/96567/testReport)** for PR 22547 at commit [`a349686`](https://github.com/apache/spark/commit/a3496862956f2ccc03ed076c9a9fbe1648be4c3b).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22547: [SPARK-25528][SQL] data source V2 read side API refactor...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22547
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22547: [SPARK-25528][SQL] data source V2 read side API refactor...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22547
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22547: [SPARK-25528][SQL] data source V2 read side API refactor...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22547
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/96570/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22547: [SPARK-25528][SQL] data source V2 read side API refactor...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22547
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/4093/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22547: [SPARK-25528][SQL] data source V2 read side API r...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22547#discussion_r220275016
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala ---
    @@ -106,85 +107,96 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
           failOnDataLoss(caseInsensitiveParams))
       }
     
    -  /**
    -   * Creates a [[org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReadSupport]] to read
    -   * batches of Kafka data in a micro-batch streaming query.
    -   */
    -  override def createMicroBatchReadSupport(
    -      metadataPath: String,
    -      options: DataSourceOptions): KafkaMicroBatchReadSupport = {
    -
    -    val parameters = options.asMap().asScala.toMap
    -    validateStreamOptions(parameters)
    -    // Each running query should use its own group id. Otherwise, the query may be only assigned
    -    // partial data since Kafka will assign partitions to multiple consumers having the same group
    -    // id. Hence, we should generate a unique id for each query.
    -    val uniqueGroupId = s"spark-kafka-source-${UUID.randomUUID}-${metadataPath.hashCode}"
    -
    -    val caseInsensitiveParams = parameters.map { case (k, v) => (k.toLowerCase(Locale.ROOT), v) }
    -    val specifiedKafkaParams =
    -      parameters
    -        .keySet
    -        .filter(_.toLowerCase(Locale.ROOT).startsWith("kafka."))
    -        .map { k => k.drop(6).toString -> parameters(k) }
    -        .toMap
    -
    -    val startingStreamOffsets = KafkaSourceProvider.getKafkaOffsetRangeLimit(caseInsensitiveParams,
    -      STARTING_OFFSETS_OPTION_KEY, LatestOffsetRangeLimit)
    -
    -    val kafkaOffsetReader = new KafkaOffsetReader(
    -      strategy(caseInsensitiveParams),
    -      kafkaParamsForDriver(specifiedKafkaParams),
    -      parameters,
    -      driverGroupIdPrefix = s"$uniqueGroupId-driver")
    -
    -    new KafkaMicroBatchReadSupport(
    -      kafkaOffsetReader,
    -      kafkaParamsForExecutors(specifiedKafkaParams, uniqueGroupId),
    -      options,
    -      metadataPath,
    -      startingStreamOffsets,
    -      failOnDataLoss(caseInsensitiveParams))
    +  override def getTable(options: DataSourceOptions): KafkaTable.type = {
    +    KafkaTable
       }
     
    -  /**
    -   * Creates a [[org.apache.spark.sql.sources.v2.reader.streaming.ContinuousReadSupport]] to read
    -   * Kafka data in a continuous streaming query.
    -   */
    -  override def createContinuousReadSupport(
    -      metadataPath: String,
    -      options: DataSourceOptions): KafkaContinuousReadSupport = {
    -    val parameters = options.asMap().asScala.toMap
    -    validateStreamOptions(parameters)
    -    // Each running query should use its own group id. Otherwise, the query may be only assigned
    -    // partial data since Kafka will assign partitions to multiple consumers having the same group
    -    // id. Hence, we should generate a unique id for each query.
    -    val uniqueGroupId = s"spark-kafka-source-${UUID.randomUUID}-${metadataPath.hashCode}"
    -
    -    val caseInsensitiveParams = parameters.map { case (k, v) => (k.toLowerCase(Locale.ROOT), v) }
    -    val specifiedKafkaParams =
    -      parameters
    -        .keySet
    -        .filter(_.toLowerCase(Locale.ROOT).startsWith("kafka."))
    -        .map { k => k.drop(6).toString -> parameters(k) }
    -        .toMap
    +  object KafkaTable extends Table
    +    with SupportsMicroBatchRead with SupportsContinuousRead {
     
    -    val startingStreamOffsets = KafkaSourceProvider.getKafkaOffsetRangeLimit(caseInsensitiveParams,
    -      STARTING_OFFSETS_OPTION_KEY, LatestOffsetRangeLimit)
    +    override def schema(): StructType = KafkaOffsetReader.kafkaSchema
     
    -    val kafkaOffsetReader = new KafkaOffsetReader(
    -      strategy(caseInsensitiveParams),
    -      kafkaParamsForDriver(specifiedKafkaParams),
    -      parameters,
    -      driverGroupIdPrefix = s"$uniqueGroupId-driver")
    +    /**
    +     * Creates a [[org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchInputStream]] to read
    +     * batches of Kafka data in a micro-batch streaming query.
    +     */
    +    override def createMicroBatchInputStream(
    +        checkpointLocation: String,
    +        config: ScanConfig,
    +        options: DataSourceOptions): MicroBatchInputStream = {
    +      val parameters = options.asMap().asScala.toMap
    --- End diff --
    
    moved from https://github.com/apache/spark/pull/22547/files#diff-eeac5bdf3a1ecd7b9f8aaf10fff37f05L117


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22547: [SPARK-25528][SQL] data source V2 read side API r...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22547#discussion_r226361309
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala ---
    @@ -319,29 +307,18 @@ class RateSourceSuite extends StreamTest {
           "rate source does not support user-specified schema"))
       }
     
    -  test("continuous in registry") {
    --- End diff --
    
    we don't need this test now. With the new `Format` abstraction, the lookup logic is unified between microbatch and continuous


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22547: [SPARK-25528][SQL] data source V2 read side API refactor...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22547
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/97540/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22547: [SPARK-25528][SQL] data source V2 read side API r...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22547#discussion_r220275520
  
    --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala ---
    @@ -207,13 +207,13 @@ class KafkaContinuousSourceTopicDeletionSuite extends KafkaContinuousTest {
             testUtils.createTopic(topic2, partitions = 5)
             eventually(timeout(streamingTimeout)) {
               assert(
    -            query.lastExecution.executedPlan.collectFirst {
    -              case scan: DataSourceV2ScanExec
    -                if scan.readSupport.isInstanceOf[KafkaContinuousReadSupport] =>
    -                scan.scanConfig.asInstanceOf[KafkaContinuousScanConfig]
    -            }.exists { config =>
    +            query.lastExecution.logical.collectFirst {
    --- End diff --
    
    now the known partitions is tracked by the `KafkaContinuousInputStream` in logical plan: https://github.com/apache/spark/pull/22547/files#diff-5fa6c9fc023183f4a855f778944d23ebR62


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22547: [SPARK-25528][SQL] data source V2 read side API refactor...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22547
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22547: [SPARK-25528][SQL] data source V2 read side API refactor...

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on the issue:

    https://github.com/apache/spark/pull/22547
  
    @cloud-fan, sorry to look at this so late, I was out on vacation for a little while. Is this about ready for review?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22547: [SPARK-25528][SQL] data source V2 read side API r...

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22547#discussion_r226796934
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala ---
    @@ -173,12 +185,17 @@ object DataSourceV2Relation {
           source: DataSourceV2,
           options: Map[String, String],
           tableIdent: Option[TableIdentifier] = None,
    -      userSpecifiedSchema: Option[StructType] = None): DataSourceV2Relation = {
    -    val readSupport = source.createReadSupport(options, userSpecifiedSchema)
    -    val output = readSupport.fullSchema().toAttributes
    +      userSpecifiedSchema: Option[StructType] = None): Option[DataSourceV2Relation] = {
    --- End diff --
    
    This shouldn't return an option. A relation is not a read-side structure, it is also used in write-side logical plans as the target of a write. Validation rules like PreprocessTableInsertion validate the write dataframe against the relation's schema. That's why the relation has a newWriteSupport method.
    
    Creating a relation from a Table should always work, even if the table isn't readable or isn't writable. Analysis can be done later to validate whether the plan that contains a relation can actually use the table.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22547: [SPARK-25528][SQL] data source V2 read side API refactor...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22547
  
    **[Test build #97206 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/97206/testReport)** for PR 22547 at commit [`a35d98c`](https://github.com/apache/spark/commit/a35d98cc676c9b3c3b39a45d4b03b43dfe9d0767).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22547: [SPARK-25528][SQL] data source V2 read side API r...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22547#discussion_r226355931
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala ---
    @@ -90,6 +140,8 @@ class ContinuousExecution(
         do {
           runContinuous(sparkSessionForStream)
         } while (state.updateAndGet(stateUpdate) == ACTIVE)
    +
    +    stopSources()
    --- End diff --
    
    with the new abstraction, we should only stop sources when the stream query ends, instead of each reconfiguration.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22547: [SPARK-25528][SQL] data source V2 read side API r...

Posted by mccheah <gi...@git.apache.org>.
Github user mccheah commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22547#discussion_r230505785
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousInputStream.scala ---
    @@ -46,17 +45,22 @@ import org.apache.spark.sql.types.StructType
      *                       scenarios, where some offsets after the specified initial ones can't be
      *                       properly read.
      */
    -class KafkaContinuousReadSupport(
    +class KafkaContinuousInputStream(
    --- End diff --
    
    +1 for this. A lot of the changes right now are for moving around the streaming code especially, which makes it harder to isolate just the proposed API for review.
    
    An alternative is to make this PR separate commits that, while the commits themselves may not compile because of mismatching signatures - but all the commits taken together would compile, and each commit can be reviewed individually for assessing the API and then the implementation.
    
    For example I'd propose 3 PRs:
    
    * Batch reading, with a commit for the interface changes and a separate commit for the implementation changes
    * Micro Batch Streaming read, with a commit for the interface changes and a separate commit for the implementation changes
    * Continuous streaming read, similar to above
    
    Thoughts?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22547: [SPARK-25528][SQL] data source V2 read side API refactor...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22547
  
    **[Test build #96570 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/96570/testReport)** for PR 22547 at commit [`59d0abb`](https://github.com/apache/spark/commit/59d0abb74c461e3afcb83b69a4a6d48a2ee5197c).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22547: [SPARK-25528][SQL] data source V2 read side API refactor...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22547
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22547: [SPARK-25528][SQL] data source V2 read side API refactor...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22547
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22547: [SPARK-25528][SQL] data source V2 read side API refactor...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/22547
  
    I was stuck with some personal business recently, I'll send a PR for batch source after the weekend.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22547: [SPARK-25528][SQL] data source V2 read side API refactor...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22547
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/3444/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22547: [SPARK-25528][SQL] data source V2 read side API refactor...

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on the issue:

    https://github.com/apache/spark/pull/22547
  
    After looking at the changes, I want to reiterate that request for a design doc. I think that code is a great way to prototype a design, but that we need to step back and make sure that the design makes sense when you view it from a high level.
    
    I have two main motivations for that point. First, there are some classes that I don't see a justification for, like having a separate ScanConfig, BatchScan, and PartitionReaderFactory. Are all of those separate classes necessary? Can a ScanConfigBuilder return a BatchScan? Can BatchScan expose a createBatchReader(InputPartition) method?
    
    My second motivation for saying we need a clear design doc is that I think that the current way to interact with v2 doesn't fit well with catalogs. This is based around Format, which is based on the v1 method of loading read and write implementations. But that isn't the primary way that v2 will be used be used. It happens to be the only way to call into the v2 API from Spark today, but the primary use of v2 is to integrate sources that are actually modeled as tables in some catalog.
    
    For example, Format exposes getTable that returns a Table implementation from DataSourceOptions. Those options have tableName and databaseName methods. But tables that are identified by name shouldn't be loaded by a Format, they should be loaded by a catalog. It also uses the options for both table options and read options because there isn't a way to pass both. But most tables will be created with table options by a catalog and will accept read-specific options passed to the DataFrameReader.
    
    I think we would approach a usable API much sooner if this work was planned based on a shared understanding of how catalogs and tables will interact in the future. Not having a catalog API right now is affecting the way tables work in this PR, and that's a concern for me.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22547: [SPARK-25528][SQL] data source V2 read side API refactor...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22547
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22547: [SPARK-25528][SQL] data source V2 read side API r...

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22547#discussion_r226783272
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala ---
    @@ -106,85 +107,96 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
           failOnDataLoss(caseInsensitiveParams))
       }
     
    -  /**
    -   * Creates a [[org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReadSupport]] to read
    -   * batches of Kafka data in a micro-batch streaming query.
    -   */
    -  override def createMicroBatchReadSupport(
    -      metadataPath: String,
    -      options: DataSourceOptions): KafkaMicroBatchReadSupport = {
    -
    -    val parameters = options.asMap().asScala.toMap
    -    validateStreamOptions(parameters)
    -    // Each running query should use its own group id. Otherwise, the query may be only assigned
    -    // partial data since Kafka will assign partitions to multiple consumers having the same group
    -    // id. Hence, we should generate a unique id for each query.
    -    val uniqueGroupId = s"spark-kafka-source-${UUID.randomUUID}-${metadataPath.hashCode}"
    -
    -    val caseInsensitiveParams = parameters.map { case (k, v) => (k.toLowerCase(Locale.ROOT), v) }
    -    val specifiedKafkaParams =
    -      parameters
    -        .keySet
    -        .filter(_.toLowerCase(Locale.ROOT).startsWith("kafka."))
    -        .map { k => k.drop(6).toString -> parameters(k) }
    -        .toMap
    -
    -    val startingStreamOffsets = KafkaSourceProvider.getKafkaOffsetRangeLimit(caseInsensitiveParams,
    -      STARTING_OFFSETS_OPTION_KEY, LatestOffsetRangeLimit)
    -
    -    val kafkaOffsetReader = new KafkaOffsetReader(
    -      strategy(caseInsensitiveParams),
    -      kafkaParamsForDriver(specifiedKafkaParams),
    -      parameters,
    -      driverGroupIdPrefix = s"$uniqueGroupId-driver")
    -
    -    new KafkaMicroBatchReadSupport(
    -      kafkaOffsetReader,
    -      kafkaParamsForExecutors(specifiedKafkaParams, uniqueGroupId),
    -      options,
    -      metadataPath,
    -      startingStreamOffsets,
    -      failOnDataLoss(caseInsensitiveParams))
    +  override def getTable(options: DataSourceOptions): KafkaTable.type = {
    +    KafkaTable
       }
     
    -  /**
    -   * Creates a [[org.apache.spark.sql.sources.v2.reader.streaming.ContinuousReadSupport]] to read
    -   * Kafka data in a continuous streaming query.
    -   */
    -  override def createContinuousReadSupport(
    -      metadataPath: String,
    -      options: DataSourceOptions): KafkaContinuousReadSupport = {
    -    val parameters = options.asMap().asScala.toMap
    -    validateStreamOptions(parameters)
    -    // Each running query should use its own group id. Otherwise, the query may be only assigned
    -    // partial data since Kafka will assign partitions to multiple consumers having the same group
    -    // id. Hence, we should generate a unique id for each query.
    -    val uniqueGroupId = s"spark-kafka-source-${UUID.randomUUID}-${metadataPath.hashCode}"
    -
    -    val caseInsensitiveParams = parameters.map { case (k, v) => (k.toLowerCase(Locale.ROOT), v) }
    -    val specifiedKafkaParams =
    -      parameters
    -        .keySet
    -        .filter(_.toLowerCase(Locale.ROOT).startsWith("kafka."))
    -        .map { k => k.drop(6).toString -> parameters(k) }
    -        .toMap
    +  object KafkaTable extends Table
    --- End diff --
    
    Why is `KafkaTable` an object, not a class? This doesn't seem to fit an abstraction.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22547: [SPARK-25528][SQL] data source V2 read side API refactor...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/22547
  
    Hi @rdblue welcome back! I just rebased it so it's ready for review :)


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22547: [SPARK-25528][SQL] data source V2 read side API refactor...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22547
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22547: [SPARK-25528][SQL] data source V2 read side API r...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22547#discussion_r220274862
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchInputStream.scala ---
    @@ -294,6 +227,88 @@ private[kafka010] class KafkaMicroBatchReadSupport(
       }
     }
     
    +private[kafka010] class KafkaMicroBatchScan(
    +    kafkaOffsetReader: KafkaOffsetReader,
    +    rangeCalculator: KafkaOffsetRangeCalculator,
    +    executorKafkaParams: ju.Map[String, Object],
    +    pollTimeoutMs: Long,
    +    failOnDataLoss: Boolean,
    +    reportDataLoss: String => Unit,
    +    start: KafkaSourceOffset,
    +    end: KafkaSourceOffset) extends MicroBatchScan with Logging {
    +
    +  override def createReaderFactory(): PartitionReaderFactory = {
    +    KafkaMicroBatchReaderFactory
    +  }
    +
    +  override def planInputPartitions(): Array[InputPartition] = {
    +    val startPartitionOffsets = start.partitionToOffsets
    --- End diff --
    
    moved from https://github.com/apache/spark/pull/22547/files#diff-314d02b954fc05ec7ae687dd486a8e84L104


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22547: [SPARK-25528][SQL] data source V2 read side API r...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22547#discussion_r226812577
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/Format.java ---
    @@ -0,0 +1,60 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.sources.v2;
    +
    +import org.apache.spark.annotation.InterfaceStability;
    +import org.apache.spark.sql.sources.DataSourceRegister;
    +import org.apache.spark.sql.types.StructType;
    +
    +/**
    + * The base interface for data source v2. Implementations must have a public, 0-arg constructor.
    + *
    + * The major responsibility of this interface is to return a {@link Table} for read/write.
    + */
    +@InterfaceStability.Evolving
    +public interface Format extends DataSourceV2 {
    --- End diff --
    
    the write API has not been migrated and still need `DataSourceV2`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22547: [SPARK-25528][SQL] data source V2 read side API refactor...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/22547
  
    Let's move the high-level discussion to https://docs.google.com/document/d/1uUmKCpWLdh9vHxP7AWJ9EgbwB_U6T3EJYNjhISGmiQg/edit?usp=sharing


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22547: [SPARK-25528][SQL] data source V2 read side API refactor...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22547
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22547: [SPARK-25528][SQL] data source V2 read side API refactor...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22547
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/96567/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22547: [SPARK-25528][SQL] data source V2 read side API refactor...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22547
  
    **[Test build #97520 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/97520/testReport)** for PR 22547 at commit [`328bccd`](https://github.com/apache/spark/commit/328bccd68d13318c680d66c6e5e5bca8b5d10923).
     * This patch **fails to build**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22547: [SPARK-25528][SQL] data source V2 read side API refactor...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22547
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/4095/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22547: [SPARK-25528][SQL] data source V2 read side API refactor...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22547
  
    **[Test build #96566 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/96566/testReport)** for PR 22547 at commit [`f08e02a`](https://github.com/apache/spark/commit/f08e02a6a8fdb070a01e3f2ac01b62a7d6f8dd9e).
     * This patch **fails Java style tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22547: [SPARK-25528][SQL] data source V2 read side API r...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22547#discussion_r220275173
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala ---
    @@ -106,85 +107,96 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
           failOnDataLoss(caseInsensitiveParams))
       }
     
    -  /**
    -   * Creates a [[org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReadSupport]] to read
    -   * batches of Kafka data in a micro-batch streaming query.
    -   */
    -  override def createMicroBatchReadSupport(
    -      metadataPath: String,
    -      options: DataSourceOptions): KafkaMicroBatchReadSupport = {
    -
    -    val parameters = options.asMap().asScala.toMap
    -    validateStreamOptions(parameters)
    -    // Each running query should use its own group id. Otherwise, the query may be only assigned
    -    // partial data since Kafka will assign partitions to multiple consumers having the same group
    -    // id. Hence, we should generate a unique id for each query.
    -    val uniqueGroupId = s"spark-kafka-source-${UUID.randomUUID}-${metadataPath.hashCode}"
    -
    -    val caseInsensitiveParams = parameters.map { case (k, v) => (k.toLowerCase(Locale.ROOT), v) }
    -    val specifiedKafkaParams =
    -      parameters
    -        .keySet
    -        .filter(_.toLowerCase(Locale.ROOT).startsWith("kafka."))
    -        .map { k => k.drop(6).toString -> parameters(k) }
    -        .toMap
    -
    -    val startingStreamOffsets = KafkaSourceProvider.getKafkaOffsetRangeLimit(caseInsensitiveParams,
    -      STARTING_OFFSETS_OPTION_KEY, LatestOffsetRangeLimit)
    -
    -    val kafkaOffsetReader = new KafkaOffsetReader(
    -      strategy(caseInsensitiveParams),
    -      kafkaParamsForDriver(specifiedKafkaParams),
    -      parameters,
    -      driverGroupIdPrefix = s"$uniqueGroupId-driver")
    -
    -    new KafkaMicroBatchReadSupport(
    -      kafkaOffsetReader,
    -      kafkaParamsForExecutors(specifiedKafkaParams, uniqueGroupId),
    -      options,
    -      metadataPath,
    -      startingStreamOffsets,
    -      failOnDataLoss(caseInsensitiveParams))
    +  override def getTable(options: DataSourceOptions): KafkaTable.type = {
    +    KafkaTable
       }
     
    -  /**
    -   * Creates a [[org.apache.spark.sql.sources.v2.reader.streaming.ContinuousReadSupport]] to read
    -   * Kafka data in a continuous streaming query.
    -   */
    -  override def createContinuousReadSupport(
    -      metadataPath: String,
    -      options: DataSourceOptions): KafkaContinuousReadSupport = {
    -    val parameters = options.asMap().asScala.toMap
    -    validateStreamOptions(parameters)
    -    // Each running query should use its own group id. Otherwise, the query may be only assigned
    -    // partial data since Kafka will assign partitions to multiple consumers having the same group
    -    // id. Hence, we should generate a unique id for each query.
    -    val uniqueGroupId = s"spark-kafka-source-${UUID.randomUUID}-${metadataPath.hashCode}"
    -
    -    val caseInsensitiveParams = parameters.map { case (k, v) => (k.toLowerCase(Locale.ROOT), v) }
    -    val specifiedKafkaParams =
    -      parameters
    -        .keySet
    -        .filter(_.toLowerCase(Locale.ROOT).startsWith("kafka."))
    -        .map { k => k.drop(6).toString -> parameters(k) }
    -        .toMap
    +  object KafkaTable extends Table
    +    with SupportsMicroBatchRead with SupportsContinuousRead {
     
    -    val startingStreamOffsets = KafkaSourceProvider.getKafkaOffsetRangeLimit(caseInsensitiveParams,
    -      STARTING_OFFSETS_OPTION_KEY, LatestOffsetRangeLimit)
    +    override def schema(): StructType = KafkaOffsetReader.kafkaSchema
     
    -    val kafkaOffsetReader = new KafkaOffsetReader(
    -      strategy(caseInsensitiveParams),
    -      kafkaParamsForDriver(specifiedKafkaParams),
    -      parameters,
    -      driverGroupIdPrefix = s"$uniqueGroupId-driver")
    +    /**
    +     * Creates a [[org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchInputStream]] to read
    +     * batches of Kafka data in a micro-batch streaming query.
    +     */
    +    override def createMicroBatchInputStream(
    +        checkpointLocation: String,
    +        config: ScanConfig,
    +        options: DataSourceOptions): MicroBatchInputStream = {
    +      val parameters = options.asMap().asScala.toMap
    +      validateStreamOptions(parameters)
    +      // Each running query should use its own group id. Otherwise, the query may be only assigned
    +      // partial data since Kafka will assign partitions to multiple consumers having the same group
    +      // id. Hence, we should generate a unique id for each query.
    +      val uniqueGroupId = s"spark-kafka-source-${UUID.randomUUID}-${checkpointLocation.hashCode}"
    +
    +      val caseInsensitiveParams = parameters.map { case (k, v) => (k.toLowerCase(Locale.ROOT), v) }
    +      val specifiedKafkaParams =
    +        parameters
    +          .keySet
    +          .filter(_.toLowerCase(Locale.ROOT).startsWith("kafka."))
    +          .map { k => k.drop(6).toString -> parameters(k) }
    +          .toMap
    +
    +      val startingStreamOffsets = KafkaSourceProvider.getKafkaOffsetRangeLimit(
    +        caseInsensitiveParams, STARTING_OFFSETS_OPTION_KEY, LatestOffsetRangeLimit)
    +
    +      val kafkaOffsetReader = new KafkaOffsetReader(
    +        strategy(caseInsensitiveParams),
    +        kafkaParamsForDriver(specifiedKafkaParams),
    +        parameters,
    +        driverGroupIdPrefix = s"$uniqueGroupId-driver")
    +
    +      new KafkaMicroBatchInputStream(
    +        kafkaOffsetReader,
    +        kafkaParamsForExecutors(specifiedKafkaParams, uniqueGroupId),
    +        options,
    +        checkpointLocation,
    +        startingStreamOffsets,
    +        failOnDataLoss(caseInsensitiveParams))
    +    }
     
    -    new KafkaContinuousReadSupport(
    -      kafkaOffsetReader,
    -      kafkaParamsForExecutors(specifiedKafkaParams, uniqueGroupId),
    -      parameters,
    -      metadataPath,
    -      startingStreamOffsets,
    -      failOnDataLoss(caseInsensitiveParams))
    +    /**
    +     * Creates a [[org.apache.spark.sql.sources.v2.reader.streaming.ContinuousInputStream]] to read
    +     * Kafka data in a continuous streaming query.
    +     */
    +    override def createContinuousInputStream(
    +        checkpointLocation: String,
    +        config: ScanConfig,
    +        options: DataSourceOptions): ContinuousInputStream = {
    +      val parameters = options.asMap().asScala.toMap
    --- End diff --
    
    moved from https://github.com/apache/spark/pull/22547/files#diff-eeac5bdf3a1ecd7b9f8aaf10fff37f05L157


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22547: [SPARK-25528][SQL] data source V2 read side API refactor...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22547
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/97538/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22547: [SPARK-25528][SQL] data source V2 read side API r...

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22547#discussion_r226789610
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsBatchRead.java ---
    @@ -0,0 +1,40 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.sources.v2;
    +
    +import org.apache.spark.annotation.InterfaceStability;
    +import org.apache.spark.sql.sources.v2.reader.BatchScan;
    +import org.apache.spark.sql.sources.v2.reader.ScanConfig;
    +import org.apache.spark.sql.sources.v2.reader.ScanConfigBuilder;
    +
    +/**
    + * A mix-in interface for {@link Table}. Table implementations can mixin this interface to
    + * provide data reading ability for batch processing.
    + */
    +@InterfaceStability.Evolving
    +public interface SupportsBatchRead extends Table {
    +
    +  /**
    +   * Creates a {@link BatchScan} instance with a {@link ScanConfig} and user-specified options.
    +   *
    +   * @param config a {@link ScanConfig} which may contains operator pushdown information.
    +   * @param options the user-specified options, which is same as the one used to create the
    +   *                {@link ScanConfigBuilder} that built the given {@link ScanConfig}.
    +   */
    +  BatchScan createBatchScan(ScanConfig config, DataSourceOptions options);
    --- End diff --
    
    Is there a benefit to having both `ScanConfig` and `BatchScan` objects? Why not have `ScanConfigBuilder` return a `BatchScan` directly by calling `buildBatch`?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22547: [SPARK-25528][SQL] data source V2 read side API refactor...

Posted by mccheah <gi...@git.apache.org>.
Github user mccheah commented on the issue:

    https://github.com/apache/spark/pull/22547
  
    @cloud-fan @rdblue I believe we've converged on an appropriate API as per our last sync. Do we have a plan to move this forward, with the separated smaller patches?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22547: [SPARK-25528][SQL] data source V2 read side API refactor...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22547
  
    **[Test build #97540 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/97540/testReport)** for PR 22547 at commit [`9f63721`](https://github.com/apache/spark/commit/9f63721677cea627f43f7d536bb32b588cee30a3).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22547: [SPARK-25528][SQL] data source V2 read side API refactor...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22547
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/3445/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22547: [SPARK-25528][SQL] data source V2 read side API refactor...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22547
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22547: [SPARK-25528][SQL] data source V2 read side API r...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22547#discussion_r226359031
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamMicroBatchInputStream.scala ---
    @@ -60,6 +59,14 @@ class RateStreamMicroBatchReadSupport(options: DataSourceOptions, checkpointLoca
             s" is $maxSeconds, but 'rampUpTimeSeconds' is $rampUpTimeSeconds.")
       }
     
    +  private val numPartitions = {
    --- End diff --
    
    moved from https://github.com/apache/spark/pull/22547/files#diff-6cd4de793a1c68d3d9415a246823b55eL151


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22547: [SPARK-25528][SQL] data source V2 read side API refactor...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22547
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22547: [SPARK-25528][SQL] data source V2 read side API refactor...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22547
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/4077/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22547: [SPARK-25528][SQL] data source V2 read side API r...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22547#discussion_r220274562
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousInputStream.scala ---
    @@ -67,28 +71,29 @@ class KafkaContinuousReadSupport(
         offsets
       }
     
    -  override def fullSchema(): StructType = KafkaOffsetReader.kafkaSchema
    -
    -  override def newScanConfigBuilder(start: Offset): ScanConfigBuilder = {
    -    new KafkaContinuousScanConfigBuilder(fullSchema(), start, offsetReader, reportDataLoss)
    -  }
    -
       override def deserializeOffset(json: String): Offset = {
         KafkaSourceOffset(JsonUtils.partitionOffsets(json))
       }
     
    -  override def planInputPartitions(config: ScanConfig): Array[InputPartition] = {
    -    val startOffsets = config.asInstanceOf[KafkaContinuousScanConfig].startOffsets
    -    startOffsets.toSeq.map {
    -      case (topicPartition, start) =>
    -        KafkaContinuousInputPartition(
    -          topicPartition, start, kafkaParams, pollTimeoutMs, failOnDataLoss)
    -    }.toArray
    -  }
    +  override def createContinuousScan(start: Offset): ContinuousScan = {
    +    val oldStartPartitionOffsets = KafkaSourceOffset.getPartitionOffsets(start)
    --- End diff --
    
    moved from https://github.com/apache/spark/pull/22547/files#diff-5fa6c9fc023183f4a855f778944d23ebL162


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22547: [SPARK-25528][SQL] data source V2 read side API refactor...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22547
  
    **[Test build #97206 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/97206/testReport)** for PR 22547 at commit [`a35d98c`](https://github.com/apache/spark/commit/a35d98cc676c9b3c3b39a45d4b03b43dfe9d0767).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22547: [SPARK-25528][SQL] data source V2 read side API refactor...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22547
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/97206/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22547: [SPARK-25528][SQL] data source V2 read side API refactor...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22547
  
    **[Test build #96570 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/96570/testReport)** for PR 22547 at commit [`59d0abb`](https://github.com/apache/spark/commit/59d0abb74c461e3afcb83b69a4a6d48a2ee5197c).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22547: [SPARK-25528][SQL] data source V2 read side API refactor...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22547
  
    **[Test build #96567 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/96567/testReport)** for PR 22547 at commit [`a349686`](https://github.com/apache/spark/commit/a3496862956f2ccc03ed076c9a9fbe1648be4c3b).
     * This patch **fails Java style tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22547: [SPARK-25528][SQL] data source V2 read side API refactor...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22547
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/3442/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22547: [SPARK-25528][SQL] data source V2 read side API refactor...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22547
  
    **[Test build #96563 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/96563/testReport)** for PR 22547 at commit [`92dfdaf`](https://github.com/apache/spark/commit/92dfdaf990f2676d49766f5ab094e8b8a9a755b1).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22547: [SPARK-25528][SQL] data source V2 read side API refactor...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22547
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/96563/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22547: [SPARK-25528][SQL] data source V2 read side API r...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22547#discussion_r226363445
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala ---
    @@ -381,7 +390,7 @@ class StreamSuite extends StreamTest {
     
       test("insert an extraStrategy") {
         try {
    -      spark.experimental.extraStrategies = TestStrategy :: Nil
    +      spark.experimental.extraStrategies = CustomStrategy :: Nil
    --- End diff --
    
    Since we need to do a temporary planning for streaming queries, we can't allow custom strategy to remove streaming leaf nodes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22547: [SPARK-25528][SQL] data source V2 read side API refactor...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22547
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/97520/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22547: [SPARK-25528][SQL] data source V2 read side API r...

Posted by mccheah <gi...@git.apache.org>.
Github user mccheah commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22547#discussion_r230973917
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousInputStream.scala ---
    @@ -46,17 +45,22 @@ import org.apache.spark.sql.types.StructType
      *                       scenarios, where some offsets after the specified initial ones can't be
      *                       properly read.
      */
    -class KafkaContinuousReadSupport(
    +class KafkaContinuousInputStream(
    --- End diff --
    
    Makes sense. I really consider this to be a blocker on getting this merged and approved. It's difficult to have confidence in a review over such a large change. Thoughts @cloud-fan @rdblue?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22547: [SPARK-25528][SQL] data source V2 read side API refactor...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22547
  
    **[Test build #97540 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/97540/testReport)** for PR 22547 at commit [`9f63721`](https://github.com/apache/spark/commit/9f63721677cea627f43f7d536bb32b588cee30a3).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22547: [SPARK-25528][SQL] data source V2 read side API r...

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22547#discussion_r226798538
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/Format.java ---
    @@ -0,0 +1,60 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.sources.v2;
    +
    +import org.apache.spark.annotation.InterfaceStability;
    +import org.apache.spark.sql.sources.DataSourceRegister;
    +import org.apache.spark.sql.types.StructType;
    +
    +/**
    + * The base interface for data source v2. Implementations must have a public, 0-arg constructor.
    + *
    + * The major responsibility of this interface is to return a {@link Table} for read/write.
    + */
    +@InterfaceStability.Evolving
    +public interface Format extends DataSourceV2 {
    +
    +  /**
    +   * Return a {@link Table} instance to do read/write with user-specified options.
    +   *
    +   * @param options the user-specified options that can identify a table, e.g. path, table name,
    --- End diff --
    
    Why is it necessary to pass table name and database to Format? Format should only be used in 2 places to create tables. First, in the DataFrameReader (or writer) API when a format is specified directly instead of a catalog/database/table or catalog/path. Second, it would be used in catalogs that support pluggable implementations, like the current session catalog, which needs to dynamically instantiate implementations based on the table's provider.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22547: [SPARK-25528][SQL] data source V2 read side API r...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22547#discussion_r226363020
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala ---
    @@ -154,21 +159,25 @@ class StreamSuite extends StreamTest {
       }
     
       test("SPARK-20432: union one stream with itself") {
    -    val df = spark.readStream.format(classOf[FakeDefaultSource].getName).load().select("a")
    -    val unioned = df.union(df)
    -    withTempDir { outputDir =>
    -      withTempDir { checkpointDir =>
    -        val query =
    -          unioned
    -            .writeStream.format("parquet")
    -            .option("checkpointLocation", checkpointDir.getAbsolutePath)
    -            .start(outputDir.getAbsolutePath)
    -        try {
    -          query.processAllAvailable()
    -          val outputDf = spark.read.parquet(outputDir.getAbsolutePath).as[Long]
    -          checkDatasetUnorderly[Long](outputDf, (0L to 10L).union((0L to 10L)).toArray: _*)
    -        } finally {
    -          query.stop()
    +    val v1Source = spark.readStream.format(classOf[FakeDefaultSource].getName).load().select("a")
    +    val v2Source = spark.readStream.format(classOf[FakeFormat].getName).load().select("a")
    +
    +    Seq(v1Source, v2Source).foreach { df =>
    --- End diff --
    
    improve this test to make sure v2 also works.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22547: [SPARK-25528][SQL] data source V2 read side API refactor...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22547
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22547: [SPARK-25528][SQL] data source V2 read side API r...

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22547#discussion_r226789748
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/Table.java ---
    @@ -15,37 +15,43 @@
      * limitations under the License.
      */
     
    -package org.apache.spark.sql.sources.v2.reader;
    +package org.apache.spark.sql.sources.v2;
     
     import org.apache.spark.annotation.InterfaceStability;
    +import org.apache.spark.sql.execution.datasources.v2.NoopScanConfigBuilder;
    +import org.apache.spark.sql.sources.v2.reader.ScanConfig;
    +import org.apache.spark.sql.sources.v2.reader.ScanConfigBuilder;
    +import org.apache.spark.sql.types.StructType;
     
     /**
    - * An interface that defines how to load the data from data source for batch processing.
    + * An interface representing a logical structured data set of a data source. For example, the
    + * implementation can be a directory on the file system, or a table in the catalog, etc.
      *
    - * The execution engine will get an instance of this interface from a data source provider
    - * (e.g. {@link org.apache.spark.sql.sources.v2.BatchReadSupportProvider}) at the start of a batch
    - * query, then call {@link #newScanConfigBuilder()} and create an instance of {@link ScanConfig}.
    - * The {@link ScanConfigBuilder} can apply operator pushdown and keep the pushdown result in
    - * {@link ScanConfig}. The {@link ScanConfig} will be used to create input partitions and reader
    - * factory to scan data from the data source with a Spark job.
    + * This interface can mixin the following interfaces to support different operations:
    + * <ul>
    + *   <li>{@link SupportsBatchRead}: this table can be read in batch queries.</li>
    + *   <li>{@link SupportsMicroBatchRead}: this table can be read in streaming queries with
    + *   micro-batch trigger.</li>
    + *   <li>{@link SupportsContinuousRead}: this table can be read in streaming queries with
    + *   continuous trigger.</li>
    + * </ul>
      */
     @InterfaceStability.Evolving
    -public interface BatchReadSupport extends ReadSupport {
    +public interface Table {
    +
    +  /**
    +   * Returns the schema of this table.
    +   */
    +  StructType schema();
     
       /**
        * Returns a builder of {@link ScanConfig}. Spark will call this method and create a
        * {@link ScanConfig} for each data scanning job.
        *
        * The builder can take some query specific information to do operators pushdown, and keep these
        * information in the created {@link ScanConfig}.
    -   *
    -   * This is the first step of the data scan. All other methods in {@link BatchReadSupport} needs
    -   * to take {@link ScanConfig} as an input.
    -   */
    -  ScanConfigBuilder newScanConfigBuilder();
    -
    -  /**
    -   * Returns a factory, which produces one {@link PartitionReader} for one {@link InputPartition}.
        */
    -  PartitionReaderFactory createReaderFactory(ScanConfig config);
    +  default ScanConfigBuilder newScanConfigBuilder(DataSourceOptions options) {
    --- End diff --
    
    I think it should be clear that these are scan-specific options. Maybe add some documentation with an example of something that would be passed to configure a scan, like a target split size for combining.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22547: [SPARK-25528][SQL] data source V2 read side API refactor...

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on the issue:

    https://github.com/apache/spark/pull/22547
  
    @jose-torres, I don't mean that the primary purpose of the v2 API is for catalog integration, I mean that the primary use of v2 is with tables that are stored in some catalog. So we should make sure that the plan and design work well with catalog tables.
    
    Another reason that catalog tables are important is that the v2 plans require a catalog for consistent behavior. So catalogs are important and I think will affect the implementation details.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22547: [SPARK-25528][SQL] data source V2 read side API refactor...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/22547
  
    cc @rxin @jose-torres @rdblue 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22547: [SPARK-25528][SQL] data source V2 read side API r...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22547#discussion_r230989559
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousInputStream.scala ---
    @@ -46,17 +45,22 @@ import org.apache.spark.sql.types.StructType
      *                       scenarios, where some offsets after the specified initial ones can't be
      *                       properly read.
      */
    -class KafkaContinuousReadSupport(
    +class KafkaContinuousInputStream(
    --- End diff --
    
    Yea I'll separate this PR into 3 smaller ones, after we have agreed on the high-level design at https://docs.google.com/document/d/1uUmKCpWLdh9vHxP7AWJ9EgbwB_U6T3EJYNjhISGmiQg/edit?usp=sharing


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22547: [SPARK-25528][SQL] data source V2 read side API refactor...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/22547
  
    A major part of this PR is to update existing streaming sources, which is just moving code around. There are 3 things we need to pay attention to during review:
    1. the naming and documentation of the new interfaces.
    2. the new streaming query planning workflow. See the PR description for details.
    3. the updated tests, make sure there is nothing wrong.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22547: [SPARK-25528][SQL] data source V2 read side API r...

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22547#discussion_r226784919
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsBatchRead.java ---
    @@ -0,0 +1,40 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.sources.v2;
    +
    +import org.apache.spark.annotation.InterfaceStability;
    +import org.apache.spark.sql.sources.v2.reader.BatchScan;
    +import org.apache.spark.sql.sources.v2.reader.ScanConfig;
    +import org.apache.spark.sql.sources.v2.reader.ScanConfigBuilder;
    +
    +/**
    + * A mix-in interface for {@link Table}. Table implementations can mixin this interface to
    + * provide data reading ability for batch processing.
    + */
    +@InterfaceStability.Evolving
    +public interface SupportsBatchRead extends Table {
    +
    +  /**
    +   * Creates a {@link BatchScan} instance with a {@link ScanConfig} and user-specified options.
    +   *
    +   * @param config a {@link ScanConfig} which may contains operator pushdown information.
    +   * @param options the user-specified options, which is same as the one used to create the
    +   *                {@link ScanConfigBuilder} that built the given {@link ScanConfig}.
    --- End diff --
    
    I don't think that options should be passed twice.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22547: [SPARK-25528][SQL] data source V2 read side API r...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22547#discussion_r226338580
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsBatchRead.java ---
    @@ -0,0 +1,40 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.sources.v2;
    +
    +import org.apache.spark.annotation.InterfaceStability;
    +import org.apache.spark.sql.sources.v2.reader.BatchScan;
    +import org.apache.spark.sql.sources.v2.reader.ScanConfig;
    +import org.apache.spark.sql.sources.v2.reader.ScanConfigBuilder;
    +
    +/**
    + * A mix-in interface for {@link Table}. Table implementations can mixin this interface to
    + * provide data reading ability for batch processing.
    + */
    +@InterfaceStability.Evolving
    +public interface SupportsBatchRead extends Table {
    +
    +  /**
    +   * Creates a {@link BatchScan} instance with a {@link ScanConfig} and user-specified options.
    +   *
    +   * @param config a {@link ScanConfig} which may contains operator pushdown information.
    +   * @param options the user-specified options, which is same as the one used to create the
    +   *                {@link ScanConfigBuilder} that built the given {@link ScanConfig}.
    --- End diff --
    
    Another choice is to let `ScanConfig` carry the options. But `ScanConfig` is an interface and doing this will put more work at user side, so I decided to pass the options again here. Feedbacks are welcome!


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22547: [SPARK-25528][SQL] data source V2 read side API refactor...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22547
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/96566/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22547: [SPARK-25528][SQL] data source V2 read side API refactor...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22547
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22547: [SPARK-25528][SQL] data source V2 read side API r...

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22547#discussion_r226782371
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousInputStream.scala ---
    @@ -46,17 +45,22 @@ import org.apache.spark.sql.types.StructType
      *                       scenarios, where some offsets after the specified initial ones can't be
      *                       properly read.
      */
    -class KafkaContinuousReadSupport(
    +class KafkaContinuousInputStream(
    --- End diff --
    
    Is it possible to break this change into multiple PRs for batch, microbatch, and continuous? It's really large and it would be nice if we could get the changes in incrementally.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22547: [SPARK-25528][SQL] data source V2 read side API refactor...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22547
  
    **[Test build #97520 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/97520/testReport)** for PR 22547 at commit [`328bccd`](https://github.com/apache/spark/commit/328bccd68d13318c680d66c6e5e5bca8b5d10923).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22547: [SPARK-25528][SQL] data source V2 read side API refactor...

Posted by jose-torres <gi...@git.apache.org>.
Github user jose-torres commented on the issue:

    https://github.com/apache/spark/pull/22547
  
    I agree that we need a shared understanding of the relationship between this work and the new catalog API. I was not under the impression that the primary purpose of v2 is to integrate catalog tables.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22547: [SPARK-25528][SQL] data source V2 read side API r...

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22547#discussion_r230528510
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousInputStream.scala ---
    @@ -46,17 +45,22 @@ import org.apache.spark.sql.types.StructType
      *                       scenarios, where some offsets after the specified initial ones can't be
      *                       properly read.
      */
    -class KafkaContinuousReadSupport(
    +class KafkaContinuousInputStream(
    --- End diff --
    
    I'd prefer that the commits themselves compile, but since this is separating the modes I think it could be done incrementally.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22547: [SPARK-25528][SQL] data source V2 read side API refactor...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22547
  
    **[Test build #96566 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/96566/testReport)** for PR 22547 at commit [`f08e02a`](https://github.com/apache/spark/commit/f08e02a6a8fdb070a01e3f2ac01b62a7d6f8dd9e).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22547: [SPARK-25528][SQL] data source V2 read side API refactor...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22547
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22547: [SPARK-25528][SQL] data source V2 read side API refactor...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22547
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org