You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by cloud-fan <gi...@git.apache.org> on 2018/08/06 17:14:15 UTC

[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

GitHub user cloud-fan opened a pull request:

    https://github.com/apache/spark/pull/22009

    [SPARK-24882][SQL] improve data source v2 API

    ## What changes were proposed in this pull request?
    
    Improve the data source v2 API according to the [design doc](https://docs.google.com/document/d/1DDXCTCrup4bKWByTalkXWgavcPdvur8a4eEu8x1BzPM/edit?usp=sharing)
    
    summary of the changes
    1. rename `ReadSupport` -> `DataSourceReader` -> `InputPartition` -> `InputPartitionReader` to `BatchReadSupportProvider` -> `BatchReadSupport` -> `InputPartition`/`PartitionReaderFactory` -> `PartitionReader`. Similar renaming also happens at streaming and write APIs.
    2. create `ScanConfig` to store query specific information like operator pushdown result, streaming offsets, etc. This makes batch and streaming `ReadSupport`(previouslly named `DataSourceReader`) immutable. All other methods take `ScanConfig` as input, which implies applying operator pushdown and getting streaming offsets happen before all other things(get input partitions, report statistics, etc.).
    3. separate `InputPartition` to `InputPartition` and `PartitionReaderFactory`. This is a natural separation, data splitting and reading are orthogonal and we should not mix them in one interfaces. This also makes the naming consistent between read and write API: `PartitionReaderFactory` vs `DataWriterFactory`.
    4. separate the batch and streaming interfaces. Sometimes it's painful to force the streaming interface to extend batch interface, as we may need to override some batch methods to return false, or even leak the streaming concept to batch API(e.g. `DataWriterFactory#createWriter(partitionId, taskId, epochId)`)
    
    ## How was this patch tested?
    
    existing tests.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/cloud-fan/spark redesign

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/22009.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #22009
    
----
commit 770a43dfdc1648dd0fb91eea2249da728cfdb360
Author: Wenchen Fan <we...@...>
Date:   2018-08-03T04:54:45Z

    improve data source v2 API

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    There must not be one. I thought you'd already started a PR, my mistake.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    > the scan config builder needs to accept the options
    
    This is the first item in the follow-up list. Currently the life cycle of `ReadSupport` instance is bound to a batch/streaming query, so the user-specified options should be passed when returning the `ReadSupport` instance.
    
    > SaveMode needs to be removed
    
    We can't remove existing features. Even if we have a better way to write data, what we should do is to deprecate the `SaveMode` but not remove it. I think all other reviewers do not agree to remove `SaveMode`.
    
    > It's a bad idea to put half the changes in the read PR
    
    Yea I totally agree with it, but we need to define "half." I don't think the write side change here is just a haft. It's complete to all the data source features we have today. We can't say a change is half because it doesn't support a non-existing feature(the replaceWhere stuff). We can change the API later if it's necessary to support new features.
    
    Again, this PR is not marking data source v2 API as stable. We can reject it if it does something wrong, but we can't just say "this doesn't have XYZ feature and we can't merge it". I tried my best to minimize the changes by keeping some design/semantic same as before(pushdown API is not changed, life cycle is not changed, etc.). It's OK if you have some better design/ideas, please send a new PR to do it instead of squashing them to this PR. In general we should improve the data source v2 incrementally.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/94732/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/94837/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22009#discussion_r208439490
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala ---
    @@ -51,18 +58,19 @@ class DataSourceRDD[T: ClassTag](
             valuePrepared
           }
     
    -      override def next(): T = {
    +      override def next(): Any = {
             if (!hasNext) {
               throw new java.util.NoSuchElementException("End of stream")
             }
             valuePrepared = false
             reader.get()
           }
         }
    -    new InterruptibleIterator(context, iter)
    +    // TODO: get rid of this type hack.
    +    new InterruptibleIterator(context, iter.asInstanceOf[Iterator[InternalRow]])
    --- End diff --
    
    The problem is that, we don't really have a batch API in Spark SQL. We rely on type erasure and codegen hack to implement columnar scan. It's hardcoded in the engine: `SparkPlan#execute` returns `RDD[InternalRow]`.
    
    if we have a RDD iterate over the rows in the batch, then whole stage codegen will break, as it iterates the input RDD and cast the record to `ColumnarBatch`.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22009#discussion_r210021990
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/StreamingReadSupport.java ---
    @@ -0,0 +1,49 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.sources.v2.reader.streaming;
    +
    +import org.apache.spark.sql.sources.v2.reader.ReadSupport;
    +
    +/**
    + * A base interface for streaming read support. This is package private and is invisible to data
    + * sources. Data sources should implement concrete streaming read support interfaces:
    + * {@link MicroBatchReadSupport} or {@link ContinuousReadSupport}.
    + */
    +interface StreamingReadSupport extends ReadSupport {
    +
    +  /**
    +   * Returns the initial offset for a streaming query to start reading from. Note that the
    +   * streaming data source should not assume that it will start reading from its
    +   * {@link #initialOffset()} value: if Spark is restarting an existing query, it will restart from
    +   * the check-pointed offset rather than the initial one.
    +   */
    +  Offset initialOffset();
    +
    +  /**
    +   * Deserialize a JSON string into an Offset of the implementation-defined offset type.
    +   *
    +   * @throws IllegalArgumentException if the JSON does not encode a valid offset for this reader
    +   */
    +  Offset deserializeOffset(String json);
    +
    +  /**
    +   * Informs the source that Spark has completed processing all data for offsets less than or
    +   * equal to `end` and will only request offsets greater than `end` in the future.
    +   */
    +  void commit(Offset end);
    --- End diff --
    
    Okay, I think I see where the misunderstanding is coming from: the current Spark implementation keeps track of offsets itself and doesn't commit those offsets in an external system. That makes sense and I would not want Spark to make the ScanConfig mutable to update it with the scan's current offset.
    
    However, as a streaming API I think that it is entirely possible for an implementation to commit offsets to an external system based on a scan, so it makes sense to me to pass ScanConfig in for that purpose, when a ScanConfig does represent a single consumer of a scan.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22009#discussion_r223988838
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala ---
    @@ -169,15 +174,16 @@ object DataSourceV2Relation {
           options: Map[String, String],
           tableIdent: Option[TableIdentifier] = None,
           userSpecifiedSchema: Option[StructType] = None): DataSourceV2Relation = {
    -    val reader = source.createReader(options, userSpecifiedSchema)
    +    val readSupport = source.createReadSupport(options, userSpecifiedSchema)
    --- End diff --
    
    This looks a regression comparing 2.3 - Data Source V2 is under heavy development so I understand but this is quite crucial. From a cursory look, this is introduced in https://github.com/apache/spark/commit/5fef6e3513d6023a837c427d183006d153c7102b
    
    I would suggest to partially revert this commit.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    **[Test build #94439 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94439/testReport)** for PR 22009 at commit [`f620297`](https://github.com/apache/spark/commit/f6202975da271892b101987dff16eece713104f8).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/2235/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22009#discussion_r208337291
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/MicroBatchReadSupportProvider.java ---
    @@ -45,8 +51,8 @@
        * @param options the options for the returned data source reader, which is an immutable
        *                case-insensitive string-to-string map.
        */
    -  MicroBatchReader createMicroBatchReader(
    -      Optional<StructType> schema,
    -      String checkpointLocation,
    -      DataSourceOptions options);
    +  MicroBatchReadSupport createMicroBatchReadSupport(
    +    Optional<StructType> schema,
    --- End diff --
    
    Nit: whitespace change that doesn't fit with typical Java style of 2 indent levels for method params.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    Build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/94306/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22009#discussion_r208337966
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/BatchWriteSupportProvider.java ---
    @@ -21,32 +21,32 @@
     
     import org.apache.spark.annotation.InterfaceStability;
     import org.apache.spark.sql.SaveMode;
    -import org.apache.spark.sql.sources.v2.writer.DataSourceWriter;
    +import org.apache.spark.sql.sources.v2.writer.BatchWriteSupport;
     import org.apache.spark.sql.types.StructType;
     
     /**
      * A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to
    - * provide data writing ability and save the data to the data source.
    + * provide data writing ability for batch processing.
      */
     @InterfaceStability.Evolving
    -public interface WriteSupport extends DataSourceV2 {
    +public interface BatchWriteSupportProvider extends DataSourceV2 {
     
       /**
    -   * Creates an optional {@link DataSourceWriter} to save the data to this data source. Data
    +   * Creates an optional {@link BatchWriteSupport} to save the data to this data source. Data
        * sources can return None if there is no writing needed to be done according to the save mode.
        *
        * If this method fails (by throwing an exception), the action will fail and no Spark job will be
        * submitted.
        *
        * @param jobId A unique string for the writing job. It's possible that there are many writing
    -   *              jobs running at the same time, and the returned {@link DataSourceWriter} can
    +   *              jobs running at the same time, and the returned {@link BatchWriteSupport} can
        *              use this job id to distinguish itself from other jobs.
        * @param schema the schema of the data to be written.
        * @param mode the save mode which determines what to do when the data are already in this data
        *             source, please refer to {@link SaveMode} for more details.
        * @param options the options for the returned data source writer, which is an immutable
        *                case-insensitive string-to-string map.
        */
    -  Optional<DataSourceWriter> createWriter(
    +  Optional<BatchWriteSupport> createBatchWriteSupport(
           String jobId, StructType schema, SaveMode mode, DataSourceOptions options);
    --- End diff --
    
    If it is done here, this should no longer return `Optional`.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by jose-torres <gi...@git.apache.org>.
Github user jose-torres commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22009#discussion_r211639298
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousReadSupport.java ---
    @@ -24,16 +24,17 @@
     import org.apache.spark.sql.sources.v2.reader.ScanConfigBuilder;
     
     /**
    - * An interface that defines how to scan the data from data source for continuous streaming
    + * An interface that defines how to load the data from data source for continuous streaming
      * processing.
      *
    - * The execution engine will create an instance of this interface at the start of a streaming query,
    - * then call {@link #newScanConfigBuilder(Offset)} and create an instance of {@link ScanConfig} for
    - * the duration of the streaming query or until {@link #needsReconfiguration(ScanConfig)} is true.
    - * The {@link ScanConfig} will be used to create input partitions and reader factory to process data
    - * for its duration. At the end {@link #stop()} will be called when the streaming execution is
    - * completed. Note that a single query may have multiple executions due to restart or failure
    - * recovery.
    + * The execution engine will get an instance of this interface from a data source provider
    + * (e.g. {@link org.apache.spark.sql.sources.v2.ContinuousReadSupportProvider}) at the start of a
    + * streaming query, then call {@link #newScanConfigBuilder(Offset)} to create an instance of
    + * {@link ScanConfig} for the duration of the streaming query or until
    + * {@link #needsReconfiguration(ScanConfig)} is true. The {@link ScanConfig} will be used to create
    + * input partitions and reader factory to scan data for its duration. At the end {@link #stop()}
    + * will be called when the streaming execution is completed. Note that a single query may have
    + * multiple executions due to restart or failure recovery.
    --- End diff --
    
    I would also add this documentation on the relevant methods. So getContinuousReadSupport and getMicroBatchReadSupport would say something like "Spark will call this method at the beginning of each streaming query to get a ReadSupport", newScanConfigBuilder would say something like "Spark will get a ScanConfig once for each data scanning job".


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22009#discussion_r208638252
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala ---
    @@ -39,52 +36,43 @@ case class DataSourceV2ScanExec(
         @transient source: DataSourceV2,
         @transient options: Map[String, String],
         @transient pushedFilters: Seq[Expression],
    -    @transient reader: DataSourceReader)
    +    @transient readSupport: ReadSupport,
    +    @transient scanConfig: ScanConfig)
       extends LeafExecNode with DataSourceV2StringFormat with ColumnarBatchScan {
     
       override def simpleString: String = "ScanV2 " + metadataString
     
       // TODO: unify the equal/hashCode implementation for all data source v2 query plans.
       override def equals(other: Any): Boolean = other match {
         case other: DataSourceV2ScanExec =>
    -      output == other.output && reader.getClass == other.reader.getClass && options == other.options
    +      output == other.output && readSupport.getClass == other.readSupport.getClass &&
    +        options == other.options
         case _ => false
       }
     
       override def hashCode(): Int = {
         Seq(output, source, options).hashCode()
       }
     
    -  override def outputPartitioning: physical.Partitioning = reader match {
    -    case r: SupportsScanColumnarBatch if r.enableBatchRead() && batchPartitions.size == 1 =>
    -      SinglePartition
    -
    -    case r: SupportsScanColumnarBatch if !r.enableBatchRead() && partitions.size == 1 =>
    -      SinglePartition
    -
    -    case r if !r.isInstanceOf[SupportsScanColumnarBatch] && partitions.size == 1 =>
    +  override def outputPartitioning: physical.Partitioning = readSupport match {
    +    case _ if partitions.length == 1 =>
           SinglePartition
     
         case s: SupportsReportPartitioning =>
           new DataSourcePartitioning(
    -        s.outputPartitioning(), AttributeMap(output.map(a => a -> a.name)))
    +        s.outputPartitioning(scanConfig), AttributeMap(output.map(a => a -> a.name)))
     
         case _ => super.outputPartitioning
       }
     
    -  private lazy val partitions: Seq[InputPartition[InternalRow]] = {
    -    reader.planInputPartitions().asScala
    -  }
    +  private lazy val partitions: Seq[InputPartition] = readSupport.planInputPartitions(scanConfig)
     
    -  private lazy val batchPartitions: Seq[InputPartition[ColumnarBatch]] = reader match {
    -    case r: SupportsScanColumnarBatch if r.enableBatchRead() =>
    -      assert(!reader.isInstanceOf[ContinuousReader],
    -        "continuous stream reader does not support columnar read yet.")
    -      r.planBatchInputPartitions().asScala
    -  }
    +  private lazy val partitionReaderFactory = readSupport.createReaderFactory(scanConfig)
     
    -  private lazy val inputRDD: RDD[InternalRow] = reader match {
    -    case _: ContinuousReader =>
    +  private lazy val inputRDD: RDD[InternalRow] = readSupport match {
    +    case _: ContinuousReadSupport =>
    +      assert(!partitionReaderFactory.supportColumnarReads(),
    --- End diff --
    
    This is a slightly different case. Can Spark choose not to use columnar reads if the source returns true for `supportsColumnarReads`? If so, then this isn't a problem.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/94895/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22009#discussion_r208423440
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/StreamingReadSupport.java ---
    @@ -0,0 +1,49 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.sources.v2.reader.streaming;
    +
    +import org.apache.spark.sql.sources.v2.reader.ReadSupport;
    +
    +/**
    + * A base interface for streaming read support. This is package private and is invisible to data
    + * sources. Data sources should implement concrete streaming read support interfaces:
    + * {@link MicroBatchReadSupport} or {@link ContinuousReadSupport}.
    + */
    +interface StreamingReadSupport extends ReadSupport {
    +
    +  /**
    +   * Returns the initial offset for a streaming query to start reading from. Note that the
    +   * streaming data source should not assume that it will start reading from its
    +   * {@link #initialOffset()} value: if Spark is restarting an existing query, it will restart from
    +   * the check-pointed offset rather than the initial one.
    +   */
    +  Offset initialOffset();
    +
    +  /**
    +   * Deserialize a JSON string into an Offset of the implementation-defined offset type.
    +   *
    +   * @throws IllegalArgumentException if the JSON does not encode a valid offset for this reader
    +   */
    +  Offset deserializeOffset(String json);
    --- End diff --
    
    If Spark can serialize the offset based on the Offset interface, then it should deserialize to a generic Offset and use that to create a source-specific Offset. But then the Offset can't have any implementation-specific information, so why is the concrete class determined by the source anyway? This seems to me like a concrete case where the abstraction is failing.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/94301/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    It sounds like the sync between apache and github is down. Although it has been merged, the PR has not been closed. 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    **[Test build #94675 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94675/testReport)** for PR 22009 at commit [`f4f85a8`](https://github.com/apache/spark/commit/f4f85a833ef319a6860134e12655574aca081ed6).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22009#discussion_r209698964
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/BatchWriteSupportProvider.java ---
    @@ -21,33 +21,39 @@
     
     import org.apache.spark.annotation.InterfaceStability;
     import org.apache.spark.sql.SaveMode;
    -import org.apache.spark.sql.sources.v2.writer.DataSourceWriter;
    +import org.apache.spark.sql.sources.v2.writer.BatchWriteSupport;
     import org.apache.spark.sql.types.StructType;
     
     /**
      * A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to
    - * provide data writing ability and save the data to the data source.
    + * provide data writing ability for batch processing.
    + *
    + * This interface is used when end users want to use a data source implementation directly, e.g.
    + * {@code Dataset.write.format(...).option(...).save()}.
      */
     @InterfaceStability.Evolving
    -public interface WriteSupport extends DataSourceV2 {
    +public interface BatchWriteSupportProvider extends DataSourceV2 {
     
       /**
    -   * Creates an optional {@link DataSourceWriter} to save the data to this data source. Data
    +   * Creates an optional {@link BatchWriteSupport} to save the data to this data source. Data
        * sources can return None if there is no writing needed to be done according to the save mode.
        *
        * If this method fails (by throwing an exception), the action will fail and no Spark job will be
        * submitted.
        *
    -   * @param writeUUID A unique string for the writing job. It's possible that there are many writing
    -   *                  jobs running at the same time, and the returned {@link DataSourceWriter} can
    -   *                  use this job id to distinguish itself from other jobs.
    +   * @param queryId A unique string for the writing query. It's possible that there are many
    +   *                writing queries running at the same time, and the returned
    +   *                {@link BatchWriteSupport} can use this id to distinguish itself from others.
        * @param schema the schema of the data to be written.
        * @param mode the save mode which determines what to do when the data are already in this data
        *             source, please refer to {@link SaveMode} for more details.
        * @param options the options for the returned data source writer, which is an immutable
        *                case-insensitive string-to-string map.
    -   * @return a writer to append data to this data source
    +   * @return a write support to write data to this data source.
        */
    -  Optional<DataSourceWriter> createWriter(
    -      String writeUUID, StructType schema, SaveMode mode, DataSourceOptions options);
    +  Optional<BatchWriteSupport> createBatchWriteSupport(
    +      String queryId,
    +      StructType schema,
    +      SaveMode mode,
    --- End diff --
    
    @cloud-fan: this is a bad idea **because** it enables save modes other than append for DataSourceV2 without using the new logical plans. This leads to undefined behavior and is why we proposed standard logical plans in the first place.
    
    Using a data source implementation directly should only support appending and scanning anything more complex must require `DeleteSupport` (see #21308) or `TableCatalog` (see #21306) and the new logical plans. Otherwise, this will allow sources to expose behavior that we are trying to fix.
    
    I'm -1 on this.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    **[Test build #94439 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94439/testReport)** for PR 22009 at commit [`f620297`](https://github.com/apache/spark/commit/f6202975da271892b101987dff16eece713104f8).
     * This patch **fails to generate documentation**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22009#discussion_r208437780
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/StreamingWriteSupportProvider.java ---
    @@ -29,24 +28,24 @@
      * provide data writing ability for structured streaming.
      */
     @InterfaceStability.Evolving
    -public interface StreamWriteSupport extends DataSourceV2, BaseStreamingSink {
    +public interface StreamingWriteSupportProvider extends DataSourceV2, BaseStreamingSink {
     
    -    /**
    -     * Creates an optional {@link StreamWriter} to save the data to this data source. Data
    -     * sources can return None if there is no writing needed to be done.
    -     *
    -     * @param queryId A unique string for the writing query. It's possible that there are many
    -     *                writing queries running at the same time, and the returned
    -     *                {@link DataSourceWriter} can use this id to distinguish itself from others.
    -     * @param schema the schema of the data to be written.
    -     * @param mode the output mode which determines what successive epoch output means to this
    -     *             sink, please refer to {@link OutputMode} for more details.
    -     * @param options the options for the returned data source writer, which is an immutable
    -     *                case-insensitive string-to-string map.
    -     */
    -    StreamWriter createStreamWriter(
    -        String queryId,
    -        StructType schema,
    -        OutputMode mode,
    -        DataSourceOptions options);
    +  /**
    +   * Creates an optional {@link StreamingWriteSupport} to save the data to this data source. Data
    +   * sources can return None if there is no writing needed to be done.
    +   *
    +   * @param queryId A unique string for the writing query. It's possible that there are many
    +   *                writing queries running at the same time, and the returned
    +   *                {@link StreamingWriteSupport} can use this id to distinguish itself from others.
    +   * @param schema the schema of the data to be written.
    +   * @param mode the output mode which determines what successive epoch output means to this
    +   *             sink, please refer to {@link OutputMode} for more details.
    +   * @param options the options for the returned data source writer, which is an immutable
    +   *                case-insensitive string-to-string map.
    +   */
    +  StreamingWriteSupport createStreamingWritSupport(
    +    String queryId,
    --- End diff --
    
    for the batch API, I think we can remove job id and ask the data source to generate UUID themselves. But for streaming, I'm not sure. Maybe we need it for failure recovery or streaming restart, cc @jose-torres 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/1880/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by jose-torres <gi...@git.apache.org>.
Github user jose-torres commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22009#discussion_r208392865
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/StreamingReadSupport.java ---
    @@ -0,0 +1,49 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.sources.v2.reader.streaming;
    +
    +import org.apache.spark.sql.sources.v2.reader.ReadSupport;
    +
    +/**
    + * A base interface for streaming read support. This is package private and is invisible to data
    + * sources. Data sources should implement concrete streaming read support interfaces:
    + * {@link MicroBatchReadSupport} or {@link ContinuousReadSupport}.
    + */
    +interface StreamingReadSupport extends ReadSupport {
    +
    +  /**
    +   * Returns the initial offset for a streaming query to start reading from. Note that the
    +   * streaming data source should not assume that it will start reading from its
    +   * {@link #initialOffset()} value: if Spark is restarting an existing query, it will restart from
    +   * the check-pointed offset rather than the initial one.
    +   */
    +  Offset initialOffset();
    +
    +  /**
    +   * Deserialize a JSON string into an Offset of the implementation-defined offset type.
    +   *
    +   * @throws IllegalArgumentException if the JSON does not encode a valid offset for this reader
    +   */
    +  Offset deserializeOffset(String json);
    --- End diff --
    
    Currently, there are two representations of any given offset: a connector-defined JVM object and a serialized JSON string.
    
    Spark can't build the JVM object itself because it doesn't know what the right type is. If you know of some clean way for a connector to declare "here is the type of my offsets", we should do that instead, but I only know how to do it through reflection magic more confusing than the status quo.
    
    I'd hesitate to introduce a third representation unless there's some concrete use case where JSON serialization won't work well.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    retest this please


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    **[Test build #94779 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94779/testReport)** for PR 22009 at commit [`f4f85a8`](https://github.com/apache/spark/commit/f4f85a833ef319a6860134e12655574aca081ed6).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/1884/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    retest this please


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    **[Test build #94574 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94574/testReport)** for PR 22009 at commit [`f4f85a8`](https://github.com/apache/spark/commit/f4f85a833ef319a6860134e12655574aca081ed6).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    **[Test build #94297 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94297/testReport)** for PR 22009 at commit [`291304a`](https://github.com/apache/spark/commit/291304a511e0514aa43c99e87d7772a8d9dadc50).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    **[Test build #94377 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94377/testReport)** for PR 22009 at commit [`c224999`](https://github.com/apache/spark/commit/c22499964ac759670c3629c690f77018bc79a7c1).
     * This patch **fails Spark unit tests**.
     * This patch **does not merge cleanly**.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22009#discussion_r208348226
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportStatistics.java ---
    @@ -20,18 +20,18 @@
     import org.apache.spark.annotation.InterfaceStability;
     
     /**
    - * A mix in interface for {@link DataSourceReader}. Data source readers can implement this
    - * interface to report statistics to Spark.
    + * A mix in interface for {@link BatchReadSupport}. Data sources can implement this interface to
    + * report statistics to Spark.
      *
    - * Statistics are reported to the optimizer before any operator is pushed to the DataSourceReader.
    - * Implementations that return more accurate statistics based on pushed operators will not improve
    - * query performance until the planner can push operators before getting stats.
    + * Currently statistics are reported to the optimizer before any operator is pushed to the data
    --- End diff --
    
    Nit: don't use "currently" in docs because it can become out of date and cause confusion. Instead, use "as of <version>" to be clear what "currently" means.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/94574/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by jose-torres <gi...@git.apache.org>.
Github user jose-torres commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    There's a reasonable chance that the
    
    Error adding data: Could not find index of the source to which data was added
    
    flakiness in the Kafka suite was caused by this PR. Let me know if you need help debugging.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by jose-torres <gi...@git.apache.org>.
Github user jose-torres commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22009#discussion_r209019530
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/StreamingReadSupport.java ---
    @@ -0,0 +1,49 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.sources.v2.reader.streaming;
    +
    +import org.apache.spark.sql.sources.v2.reader.ReadSupport;
    +
    +/**
    + * A base interface for streaming read support. This is package private and is invisible to data
    + * sources. Data sources should implement concrete streaming read support interfaces:
    + * {@link MicroBatchReadSupport} or {@link ContinuousReadSupport}.
    + */
    +interface StreamingReadSupport extends ReadSupport {
    +
    +  /**
    +   * Returns the initial offset for a streaming query to start reading from. Note that the
    +   * streaming data source should not assume that it will start reading from its
    +   * {@link #initialOffset()} value: if Spark is restarting an existing query, it will restart from
    +   * the check-pointed offset rather than the initial one.
    +   */
    +  Offset initialOffset();
    +
    +  /**
    +   * Deserialize a JSON string into an Offset of the implementation-defined offset type.
    +   *
    +   * @throws IllegalArgumentException if the JSON does not encode a valid offset for this reader
    +   */
    +  Offset deserializeOffset(String json);
    +
    +  /**
    +   * Informs the source that Spark has completed processing all data for offsets less than or
    +   * equal to `end` and will only request offsets greater than `end` in the future.
    +   */
    +  void commit(Offset end);
    --- End diff --
    
    I'd +1 passing a ScanConfig. I agree that all the existing sources are just going to pull out the offset, but "Spark is finished with this scan" is a cleaner semantic than "Spark is finished with the scan such that it goes up to this offset".


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22009#discussion_r208684225
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousPartitionReaderFactory.java ---
    @@ -0,0 +1,71 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.sources.v2.reader.streaming;
    +
    +import java.io.Serializable;
    +
    +import org.apache.spark.annotation.InterfaceStability;
    +import org.apache.spark.sql.catalyst.InternalRow;
    +import org.apache.spark.sql.sources.v2.reader.InputPartition;
    +import org.apache.spark.sql.vectorized.ColumnarBatch;
    +
    +/**
    + * A factory used to create {@link ContinuousPartitionReader} instances.
    + */
    +@InterfaceStability.Evolving
    +public interface ContinuousPartitionReaderFactory extends Serializable {
    --- End diff --
    
    @rdblue I have to fork a `PartitionReaderFactory` for continuous streaming, in order to use the name `createContinuousReader`. Also I need to rename `ContinuousReaderSupport.createReaderFactory ` to `createContinuousReaderFactory`, which requires to move out `createReaderFactory` from the base interace `ReadSupport`.
    
    Is it really worth? The existing sources only implement `XYZReadSupportProvider` at the same time, but not `XYZReadSupport`. e.g. micro-batch and continuous read support are always 2 different classes, as the logic is so different. I believe file source will be the same.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by jose-torres <gi...@git.apache.org>.
Github user jose-torres commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22009#discussion_r209013149
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/PartitionReaderFactory.java ---
    @@ -0,0 +1,68 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.sources.v2.reader;
    +
    +import java.io.Serializable;
    +
    +import org.apache.spark.annotation.InterfaceStability;
    +import org.apache.spark.sql.catalyst.InternalRow;
    +import org.apache.spark.sql.vectorized.ColumnarBatch;
    +
    +/**
    + * A factory used to create {@link PartitionReader} instances.
    + */
    +@InterfaceStability.Evolving
    +public interface PartitionReaderFactory extends Serializable {
    +
    +  /**
    +   * Returns a row-based partition reader to read data from the given {@link InputPartition}.
    +   *
    +   * Implementations probably need to cast the input partition to the concrete
    +   * {@link InputPartition} class defined for the data source.
    +   *
    +   * If this method fails (by throwing an exception), the corresponding Spark task would fail and
    +   * get retried until hitting the maximum retry times.
    +   */
    +  PartitionReader<InternalRow> createReader(InputPartition partition);
    +
    +  /**
    +   * Returns a columnar partition reader to read data from the given {@link InputPartition}.
    +   *
    +   * Implementations probably need to cast the input partition to the concrete
    +   * {@link InputPartition} class defined for the data source.
    +   *
    +   * If this method fails (by throwing an exception), the corresponding Spark task would fail and
    +   * get retried until hitting the maximum retry times.
    +   */
    +  default PartitionReader<ColumnarBatch> createColumnarReader(InputPartition partition) {
    +    throw new UnsupportedOperationException("Cannot create columnar reader.");
    +  }
    +
    +  /**
    +   * Returns true if the given {@link InputPartition} should be read by Spark in a columnar way.
    +   * This means, implementations must also implement {@link #createColumnarReader(InputPartition)}
    +   * for the input partitions that this method returns true.
    +   *
    +   * As of Spark 2.4, Spark can only read all input partition in a columnar way, or none of them.
    +   * Data source can't mix columnar and row-based partitions. This will be relaxed in future
    --- End diff --
    
    nit: may be relaxed, we shouldn't guarantee it


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22009#discussion_r211808923
  
    --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousTest.scala ---
    @@ -47,7 +47,9 @@ trait KafkaContinuousTest extends KafkaSourceTest {
         eventually(timeout(streamingTimeout)) {
           assert(
             query.lastExecution.logical.collectFirst {
    -          case StreamingDataSourceV2Relation(_, _, _, r: KafkaContinuousReader) => r
    +          case r: StreamingDataSourceV2Relation
    +              if r.readSupport.isInstanceOf[KafkaContinuousReadSupport] =>
    +            r.scanConfigBuilder.build().asInstanceOf[KafkaContinuousScanConfig]
    --- End diff --
    
    good catch!


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22009#discussion_r208337008
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/ContinuousReadSupportProvider.java ---
    @@ -39,7 +52,7 @@
        * @param options the options for the returned data source reader, which is an immutable
        *                case-insensitive string-to-string map.
        */
    -  ContinuousReader createContinuousReader(
    +  ContinuousReadSupport createContinuousReadSupport(
         Optional<StructType> schema,
    --- End diff --
    
    I though this was going to have two versions instead of passing the schema? FYI @jose-torres


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22009#discussion_r208439720
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala ---
    @@ -51,18 +58,19 @@ class DataSourceRDD[T: ClassTag](
             valuePrepared
           }
     
    -      override def next(): T = {
    +      override def next(): Any = {
             if (!hasNext) {
               throw new java.util.NoSuchElementException("End of stream")
             }
             valuePrepared = false
             reader.get()
           }
         }
    -    new InterruptibleIterator(context, iter)
    +    // TODO: get rid of this type hack.
    +    new InterruptibleIterator(context, iter.asInstanceOf[Iterator[InternalRow]])
       }
     
       override def getPreferredLocations(split: Partition): Seq[String] = {
    -    split.asInstanceOf[DataSourceRDDPartition[T]].inputPartition.preferredLocations()
    +    split.asInstanceOf[DataSourceRDDPartition].inputPartition.preferredLocations()
    --- End diff --
    
    It's a common pattern in RDD that we cast the `split` to the concrete `Partition` class defined by this RDD.
    
    The partitions are created in `RDD#getPartitions`, so if we see other splits here, it's a bug.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/1978/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22009#discussion_r209123977
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/StreamingReadSupport.java ---
    @@ -0,0 +1,49 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.sources.v2.reader.streaming;
    +
    +import org.apache.spark.sql.sources.v2.reader.ReadSupport;
    +
    +/**
    + * A base interface for streaming read support. This is package private and is invisible to data
    + * sources. Data sources should implement concrete streaming read support interfaces:
    + * {@link MicroBatchReadSupport} or {@link ContinuousReadSupport}.
    + */
    +interface StreamingReadSupport extends ReadSupport {
    +
    +  /**
    +   * Returns the initial offset for a streaming query to start reading from. Note that the
    +   * streaming data source should not assume that it will start reading from its
    +   * {@link #initialOffset()} value: if Spark is restarting an existing query, it will restart from
    +   * the check-pointed offset rather than the initial one.
    +   */
    +  Offset initialOffset();
    +
    +  /**
    +   * Deserialize a JSON string into an Offset of the implementation-defined offset type.
    +   *
    +   * @throws IllegalArgumentException if the JSON does not encode a valid offset for this reader
    +   */
    +  Offset deserializeOffset(String json);
    --- End diff --
    
    This is an existing API, if we remove it, all the streaming sources can't work...


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22009#discussion_r208380139
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala ---
    @@ -51,18 +58,19 @@ class DataSourceRDD[T: ClassTag](
             valuePrepared
           }
     
    -      override def next(): T = {
    +      override def next(): Any = {
             if (!hasNext) {
               throw new java.util.NoSuchElementException("End of stream")
             }
             valuePrepared = false
             reader.get()
           }
         }
    -    new InterruptibleIterator(context, iter)
    +    // TODO: get rid of this type hack.
    +    new InterruptibleIterator(context, iter.asInstanceOf[Iterator[InternalRow]])
    --- End diff --
    
    Why is this necessary? I think the TODO should be handled in this commit and that Spark shouldn't cast RDD[ColumnarBatch] to RDD[InternalRow].
    
    What about having the RDD iterate over the rows in the batch to actually implement the interface? It can provide the underlying batches through a different API.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22009#discussion_r208343665
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ReadSupport.java ---
    @@ -0,0 +1,61 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.sources.v2.reader;
    +
    +import org.apache.spark.annotation.InterfaceStability;
    +import org.apache.spark.sql.types.StructType;
    +
    +/**
    + * The base interface for all the batch and streaming read supports. Data sources should implement
    + * concrete read support interfaces like {@link BatchReadSupport}.
    + */
    +@InterfaceStability.Evolving
    +public interface ReadSupport {
    +
    +  /**
    +   * Returns the full schema of this data source, which is usually the physical schema of the
    +   * underlying storage. This full schema should not be affected by column pruning or other
    +   * optimizations.
    +   *
    +   * If this method fails (by throwing an exception), the action will fail and no Spark job will be
    +   * submitted.
    +   */
    +  StructType fullSchema();
    +
    +  /**
    +   * Returns a list of {@link InputPartition}s. Each {@link InputPartition} represents a data split
    +   * that can be processed by one Spark task. The number of input partitions returned here is the
    +   * same as the number of RDD partitions this scan outputs.
    +   *
    +   * Note that, this may not be a full scan if the data source supports optimization like filter
    +   * push-down. Implementations should check the input {@link ScanConfig} and adjust the resulting
    +   * {@link InputPartition}s.
    +   *
    +   * If this method fails (by throwing an exception), the action will fail and no Spark job will be
    +   * submitted.
    +   */
    +  InputPartition[] planInputPartitions(ScanConfig config);
    +
    +  /**
    +   * Returns a factory to produce {@link PartitionReader}s for {@link InputPartition}s.
    --- End diff --
    
    Minor: Adding 's' after a link isn't good Javadoc style.
    
    For cases like this, it is better to use the singular for both classes to communicate the expectation that each `InputPartition` produces a single `PartitionReader`.
    
    For cases where you don't need to communicate a one-to-one relationship, you can use `{@link InputPartition partitions}` to change the link text.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    retest this please


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22009#discussion_r208635227
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousReadSupport.java ---
    @@ -0,0 +1,72 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.sources.v2.reader.streaming;
    +
    +import org.apache.spark.annotation.InterfaceStability;
    +import org.apache.spark.sql.execution.streaming.BaseStreamingSource;
    +import org.apache.spark.sql.sources.v2.reader.InputPartition;
    +import org.apache.spark.sql.sources.v2.reader.ScanConfig;
    +import org.apache.spark.sql.sources.v2.reader.ScanConfigBuilder;
    +
    +/**
    + * An interface which defines how to scan the data from data source for streaming processing with
    + * continuous mode.
    + */
    +@InterfaceStability.Evolving
    +public interface ContinuousReadSupport extends StreamingReadSupport, BaseStreamingSource {
    +
    +  /**
    +   * Returns a builder of {@link ScanConfig}. The builder can take some query specific information
    +   * like which operators to pushdown, streaming offsets, etc., and keep these information in the
    +   * created {@link ScanConfig}.
    +   *
    +   * This is the first step of the data scan. All other methods in {@link ContinuousReadSupport}
    +   * needs to take {@link ScanConfig} as an input.
    +   *
    +   * If this method fails (by throwing an exception), the action will fail and no Spark job will be
    +   * submitted.
    +   */
    +  ScanConfigBuilder newScanConfigBuilder(Offset start);
    +
    +  /**
    +   * Returns a factory to produce {@link ContinuousPartitionReader}s for {@link InputPartition}s.
    +   *
    +   * If this method fails (by throwing an exception), the action will fail and no Spark job will be
    +   * submitted.
    +   */
    +  @Override
    +  ContinuousPartitionReaderFactory createReaderFactory(ScanConfig config);
    --- End diff --
    
    That's the problem. It forces those to be the same instance. With two separate methods I could use the same class or I could choose not to. I think this should make separate calls depending on the read support.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    **[Test build #95032 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95032/testReport)** for PR 22009 at commit [`85c2476`](https://github.com/apache/spark/commit/85c247634e78512066eb530789f2a5481d00dc39).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    **[Test build #94294 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94294/testReport)** for PR 22009 at commit [`770a43d`](https://github.com/apache/spark/commit/770a43dfdc1648dd0fb91eea2249da728cfdb360).
     * This patch **fails Scala style tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/1992/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/94484/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/94466/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    **[Test build #94895 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94895/testReport)** for PR 22009 at commit [`76f8e6b`](https://github.com/apache/spark/commit/76f8e6b9f6074b2a67c74d3a4b76eadbf7130894).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22009#discussion_r208544232
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala ---
    @@ -270,11 +269,11 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
         }
       }
     
    -  override def createStreamWriter(
    +  override def createStreamingWritSupport(
    --- End diff --
    
    good catch! fixed


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22009#discussion_r210154588
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateControlMicroBatchReadSupport.scala ---
    @@ -0,0 +1,31 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming.sources
    +
    +import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReadSupport, Offset}
    +
    +// A special `MicroBatchReadSupport` that can get latestOffset with a start offset.
    +trait RateControlMicroBatchReadSupport extends MicroBatchReadSupport {
    --- End diff --
    
    everything under `org.apache.spark.sql.execution` is private, that's why we don't add `private[...]` to the classes under `org.apache.spark.sql.execution`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    @rdblue can you point me to the other PR? This is the only PR I send out for data source v2 API improvement. I'd appreciate your time to review it, thanks!


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22009#discussion_r208370391
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/StreamingReadSupport.java ---
    @@ -0,0 +1,49 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.sources.v2.reader.streaming;
    +
    +import org.apache.spark.sql.sources.v2.reader.ReadSupport;
    +
    +/**
    + * A base interface for streaming read support. This is package private and is invisible to data
    + * sources. Data sources should implement concrete streaming read support interfaces:
    + * {@link MicroBatchReadSupport} or {@link ContinuousReadSupport}.
    + */
    +interface StreamingReadSupport extends ReadSupport {
    +
    +  /**
    +   * Returns the initial offset for a streaming query to start reading from. Note that the
    +   * streaming data source should not assume that it will start reading from its
    +   * {@link #initialOffset()} value: if Spark is restarting an existing query, it will restart from
    +   * the check-pointed offset rather than the initial one.
    +   */
    +  Offset initialOffset();
    +
    +  /**
    +   * Deserialize a JSON string into an Offset of the implementation-defined offset type.
    +   *
    +   * @throws IllegalArgumentException if the JSON does not encode a valid offset for this reader
    +   */
    +  Offset deserializeOffset(String json);
    --- End diff --
    
    Why must this be JSON and why must it be a String? Why not byte[] and let the implementation choose the representation it prefers?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    **[Test build #94330 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94330/testReport)** for PR 22009 at commit [`2f6d1d2`](https://github.com/apache/spark/commit/2f6d1d27a2a5aabc0db87b2e97f7f8e6fd6fe91c).
     * This patch **fails to generate documentation**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/1885/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22009#discussion_r208165977
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetsOnlyScanConfigBuilder.scala ---
    @@ -0,0 +1,30 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming
    +
    +import org.apache.spark.sql.sources.v2.reader.{ScanConfig, ScanConfigBuilder}
    +
    +/**
    + * A very simple [[ScanConfigBuilder]] and [[ScanConfig]] implementation that carries offsets for
    + * streaming data sources.
    + */
    +case class OffsetsOnlyScanConfigBuilder(start: Offset, end: Option[Offset] = None)
    +  extends ScanConfigBuilder with ScanConfig {
    --- End diff --
    
    otherwise we need to create 2 very similar classes. I'm fine with both.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22009#discussion_r208390264
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/StreamingReadSupport.java ---
    @@ -0,0 +1,49 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.sources.v2.reader.streaming;
    +
    +import org.apache.spark.sql.sources.v2.reader.ReadSupport;
    +
    +/**
    + * A base interface for streaming read support. This is package private and is invisible to data
    + * sources. Data sources should implement concrete streaming read support interfaces:
    + * {@link MicroBatchReadSupport} or {@link ContinuousReadSupport}.
    + */
    +interface StreamingReadSupport extends ReadSupport {
    +
    +  /**
    +   * Returns the initial offset for a streaming query to start reading from. Note that the
    +   * streaming data source should not assume that it will start reading from its
    +   * {@link #initialOffset()} value: if Spark is restarting an existing query, it will restart from
    +   * the check-pointed offset rather than the initial one.
    +   */
    +  Offset initialOffset();
    +
    +  /**
    +   * Deserialize a JSON string into an Offset of the implementation-defined offset type.
    +   *
    +   * @throws IllegalArgumentException if the JSON does not encode a valid offset for this reader
    +   */
    +  Offset deserializeOffset(String json);
    --- End diff --
    
    If Spark uses JSON to serialize, why can't Spark handle deserialization itself? 
    
    Why not require `Offset` to have a human-readable `toString` and a `toBytes` for serialization? We don't have to conflate serialization with human readability.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by gengliangwang <gi...@git.apache.org>.
Github user gengliangwang commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22009#discussion_r208133162
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ReadSupport.java ---
    @@ -0,0 +1,61 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.sources.v2.reader;
    +
    +import org.apache.spark.annotation.InterfaceStability;
    +import org.apache.spark.sql.types.StructType;
    +
    +/**
    + * The base interface for all the batch and streaming read supports. Data sources should implement
    + * concrete read support interfaces like {@link BatchReadSupport}.
    + */
    +@InterfaceStability.Evolving
    +public interface ReadSupport {
    +
    +  /**
    +   * Returns the full schema of this data source, which is usually the physical schema of the
    +   * underlying storage. This full schema should not be affected by column pruning or other
    +   * optimizations.
    +   *
    +   * If this method fails (by throwing an exception), the action will fail and no Spark job will be
    +   * submitted.
    +   */
    +  StructType fullSchema();
    --- End diff --
    
    Can we change this API to return a `ScanConfigBuilder`? 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    **[Test build #94802 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94802/testReport)** for PR 22009 at commit [`e6e599a`](https://github.com/apache/spark/commit/e6e599a9630801078c046d3aec398cf4f046945c).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    retest this please


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    **[Test build #94895 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94895/testReport)** for PR 22009 at commit [`76f8e6b`](https://github.com/apache/spark/commit/76f8e6b9f6074b2a67c74d3a4b76eadbf7130894).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `  implicit class InlineHelper(val sc: StringContext) extends AnyVal `
      * `case class Inline(codeString: String) extends JavaCode `
      * `case class TransformKeys(`
      * `case class TransformValues(`
      * `case class MapZipWith(left: Expression, right: Expression, function: Expression)`
      * `case class ZipWith(left: Expression, right: Expression, function: Expression)`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/94689/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/1915/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22009#discussion_r208370532
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/StreamingReadSupport.java ---
    @@ -0,0 +1,49 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.sources.v2.reader.streaming;
    +
    +import org.apache.spark.sql.sources.v2.reader.ReadSupport;
    +
    +/**
    + * A base interface for streaming read support. This is package private and is invisible to data
    + * sources. Data sources should implement concrete streaming read support interfaces:
    + * {@link MicroBatchReadSupport} or {@link ContinuousReadSupport}.
    + */
    +interface StreamingReadSupport extends ReadSupport {
    +
    +  /**
    +   * Returns the initial offset for a streaming query to start reading from. Note that the
    --- End diff --
    
    Should this be `oldestOffset`?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    **[Test build #94306 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94306/testReport)** for PR 22009 at commit [`779c0a0`](https://github.com/apache/spark/commit/779c0a07503e35cfcc9031a51faa93f65459e326).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/22009


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    **[Test build #94837 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94837/testReport)** for PR 22009 at commit [`0318b4b`](https://github.com/apache/spark/commit/0318b4b1dcbfde0024945308578cedf8d4a09168).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    **[Test build #95071 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95071/testReport)** for PR 22009 at commit [`8833b67`](https://github.com/apache/spark/commit/8833b675852a9464896fca6f393ba531e7ff288c).
     * This patch **fails Java style tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22009#discussion_r208335523
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/ContinuousReadSupportProvider.java ---
    @@ -20,17 +20,30 @@
     import java.util.Optional;
     
     import org.apache.spark.annotation.InterfaceStability;
    -import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousReader;
    +import org.apache.spark.sql.sources.v2.reader.ScanConfig;
    +import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousReadSupport;
    +import org.apache.spark.sql.sources.v2.reader.streaming.Offset;
     import org.apache.spark.sql.types.StructType;
     
     /**
      * A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to
    - * provide data reading ability for continuous stream processing.
    + * provide data reading ability for stream processing(continuous mode).
      */
     @InterfaceStability.Evolving
    -public interface ContinuousReadSupport extends DataSourceV2 {
    +public interface ContinuousReadSupportProvider extends DataSourceV2 {
    +
       /**
    -   * Creates a {@link ContinuousReader} to scan the data from this data source.
    +   * Creates a {@link ContinuousReadSupport} to scan the data from this streaming data source.
    +   *
    +   * The execution engine will create a {@link ContinuousReadSupport} at the start of a streaming
    +   * query, alternate calls to {@link ContinuousReadSupport#newScanConfigBuilder(Offset)}
    --- End diff --
    
    I think this should be more clear about normal operation and reconfiguration. It should say that Spark will call `newScanConfigBuilder` and will use that `ReadSupport` instance for the duration of the streaming app or until `needsReconfiguration` is true. Reconfiguration starts over calling `newScanConfigBuilder`. That would be more clear for implementers to understand.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/94330/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/2211/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/1977/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by jose-torres <gi...@git.apache.org>.
Github user jose-torres commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22009#discussion_r208425199
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/StreamingReadSupport.java ---
    @@ -0,0 +1,49 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.sources.v2.reader.streaming;
    +
    +import org.apache.spark.sql.sources.v2.reader.ReadSupport;
    +
    +/**
    + * A base interface for streaming read support. This is package private and is invisible to data
    + * sources. Data sources should implement concrete streaming read support interfaces:
    + * {@link MicroBatchReadSupport} or {@link ContinuousReadSupport}.
    + */
    +interface StreamingReadSupport extends ReadSupport {
    +
    +  /**
    +   * Returns the initial offset for a streaming query to start reading from. Note that the
    +   * streaming data source should not assume that it will start reading from its
    +   * {@link #initialOffset()} value: if Spark is restarting an existing query, it will restart from
    +   * the check-pointed offset rather than the initial one.
    +   */
    +  Offset initialOffset();
    +
    +  /**
    +   * Deserialize a JSON string into an Offset of the implementation-defined offset type.
    +   *
    +   * @throws IllegalArgumentException if the JSON does not encode a valid offset for this reader
    +   */
    +  Offset deserializeOffset(String json);
    --- End diff --
    
    I think I understand what you're saying. I could get behind a proposal to simply define "arbitrary JSON string" as the one and only offset type, with each connector responsible for writing and parsing JSON however it'd like. All the existing offsets are trivial case classes anyway; it'd be a bit of a migration, but nothing architecturally difficult to handle.
    
    I don't see how a `toBytes` method would help the problem. Neither arbitrary byte arrays nor arbitrary JSON strings let Spark know what type it's supposed to instantiate.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    **[Test build #94466 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94466/testReport)** for PR 22009 at commit [`60fb76d`](https://github.com/apache/spark/commit/60fb76d537166e51307fe99c183a95fc499e289c).
     * This patch **fails to generate documentation**.
     * This patch **does not merge cleanly**.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    **[Test build #94798 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94798/testReport)** for PR 22009 at commit [`e6e599a`](https://github.com/apache/spark/commit/e6e599a9630801078c046d3aec398cf4f046945c).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/94779/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by jose-torres <gi...@git.apache.org>.
Github user jose-torres commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22009#discussion_r208982830
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/BatchReadSupportProvider.java ---
    @@ -18,19 +18,22 @@
     package org.apache.spark.sql.sources.v2;
     
     import org.apache.spark.annotation.InterfaceStability;
    -import org.apache.spark.sql.sources.DataSourceRegister;
    -import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
    +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils;
    +import org.apache.spark.sql.sources.v2.reader.BatchReadSupport;
     import org.apache.spark.sql.types.StructType;
     
     /**
      * A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to
    - * provide data reading ability and scan the data from the data source.
    + * provide data reading ability for batch processing.
    + *
    + * This interface is used when end users want to use a data source implementation directly, e.g.
    + * {@code SparkSession.read.format(...).option(...).load()}.
      */
     @InterfaceStability.Evolving
    -public interface ReadSupport extends DataSourceV2 {
    +public interface BatchReadSupportProvider extends DataSourceV2 {
     
       /**
    -   * Creates a {@link DataSourceReader} to scan the data from this data source.
    +   * Creates a {@link BatchReadSupport} to scan the data from this data source.
    --- End diff --
    
    nit: ... from this data source with a user specified schema.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/2279/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/1854/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22009#discussion_r208439150
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/StreamingReadSupport.java ---
    @@ -0,0 +1,49 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.sources.v2.reader.streaming;
    +
    +import org.apache.spark.sql.sources.v2.reader.ReadSupport;
    +
    +/**
    + * A base interface for streaming read support. This is package private and is invisible to data
    + * sources. Data sources should implement concrete streaming read support interfaces:
    + * {@link MicroBatchReadSupport} or {@link ContinuousReadSupport}.
    + */
    +interface StreamingReadSupport extends ReadSupport {
    +
    +  /**
    +   * Returns the initial offset for a streaming query to start reading from. Note that the
    +   * streaming data source should not assume that it will start reading from its
    +   * {@link #initialOffset()} value: if Spark is restarting an existing query, it will restart from
    +   * the check-pointed offset rather than the initial one.
    +   */
    +  Offset initialOffset();
    +
    +  /**
    +   * Deserialize a JSON string into an Offset of the implementation-defined offset type.
    +   *
    +   * @throws IllegalArgumentException if the JSON does not encode a valid offset for this reader
    +   */
    +  Offset deserializeOffset(String json);
    +
    +  /**
    +   * Informs the source that Spark has completed processing all data for offsets less than or
    +   * equal to `end` and will only request offsets greater than `end` in the future.
    +   */
    +  void commit(Offset end);
    --- End diff --
    
    For commit, the only thing it's interested in is the end offset. Even we pass in a `ScanConfig`, I think the implementation would just get the end offset from the `ScanConfig` and commit.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    **[Test build #95026 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95026/testReport)** for PR 22009 at commit [`9acda35`](https://github.com/apache/spark/commit/9acda3567a415cd06a74c0daaa85ea9feb607237).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22009#discussion_r209041367
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ScanConfig.java ---
    @@ -0,0 +1,44 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.sources.v2.reader;
    +
    +import org.apache.spark.annotation.InterfaceStability;
    +import org.apache.spark.sql.types.StructType;
    +
    +/**
    + * An interface that carries query specific information for the data scan. Currently it's used to
    --- End diff --
    
    Nit: use of "currently"


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    **[Test build #94667 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94667/testReport)** for PR 22009 at commit [`f4f85a8`](https://github.com/apache/spark/commit/f4f85a833ef319a6860134e12655574aca081ed6).
     * This patch **fails due to an unknown error code, -9**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    **[Test build #94448 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94448/testReport)** for PR 22009 at commit [`c4b5469`](https://github.com/apache/spark/commit/c4b5469bc12ac77906f4f4b7cf94fd355ba4b7be).
     * This patch **fails Java style tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22009#discussion_r208440273
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala ---
    @@ -93,21 +81,17 @@ case class DataSourceV2ScanExec(
             sparkContext,
             sqlContext.conf.continuousStreamingExecutorQueueSize,
             sqlContext.conf.continuousStreamingExecutorPollIntervalMs,
    -        partitions).asInstanceOf[RDD[InternalRow]]
    -
    -    case r: SupportsScanColumnarBatch if r.enableBatchRead() =>
    -      new DataSourceRDD(sparkContext, batchPartitions).asInstanceOf[RDD[InternalRow]]
    +        partitions,
    +        schema,
    +        partitionReaderFactory.asInstanceOf[ContinuousPartitionReaderFactory])
    --- End diff --
    
    `DataSourceV2ScanExec` is shared between batch and streaming, so the `partitionReaderFactory` here is a general type instead of the concrete `ContinuousPartitionReaderFactory`. I think we can avoid this cast in the future refactoring, when we have a dedicated scan plan for continuous streaming.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22009#discussion_r226780862
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala ---
    @@ -169,15 +174,16 @@ object DataSourceV2Relation {
           options: Map[String, String],
           tableIdent: Option[TableIdentifier] = None,
           userSpecifiedSchema: Option[StructType] = None): DataSourceV2Relation = {
    -    val reader = source.createReader(options, userSpecifiedSchema)
    +    val readSupport = source.createReadSupport(options, userSpecifiedSchema)
    --- End diff --
    
    In the long term, I don't think that sources should use the reader to get a schema. This is a temporary hack until we have catalog support, which is really where schemas should come from.
    
    The way this works in our version (which is substantially ahead of upstream Spark, unfortunately), is that a Table is loaded by a Catalog. The schema reported by that table is used to validate writes. That way, the table can report it's schema and Spark knows that data written must be compatible with that schema, but the source isn't required to be readable.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    **[Test build #94675 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94675/testReport)** for PR 22009 at commit [`f4f85a8`](https://github.com/apache/spark/commit/f4f85a833ef319a6860134e12655574aca081ed6).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22009#discussion_r208340913
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/BatchReadSupport.java ---
    @@ -0,0 +1,39 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.sources.v2.reader;
    +
    +import org.apache.spark.annotation.InterfaceStability;
    +
    +/**
    + * An interface which defines how to scan the data from data source for batch processing.
    + */
    +@InterfaceStability.Evolving
    +public interface BatchReadSupport extends ReadSupport {
    +
    +  /**
    +   * Returns a builder of {@link ScanConfig}. The builder can take some query specific information
    +   * like which operators to pushdown, and keep these information in the created {@link ScanConfig}.
    --- End diff --
    
    I don't think this statement is very accurate. I would not say that the builder is passed operators to push down because that would imply that it is passed `Projection` and `Filter` nodes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/2198/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22009#discussion_r208680325
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReadSupport.scala ---
    @@ -162,23 +141,52 @@ case class KafkaContinuousInputPartition(
         startOffset: Long,
         kafkaParams: ju.Map[String, Object],
         pollTimeoutMs: Long,
    -    failOnDataLoss: Boolean) extends ContinuousInputPartition[InternalRow] {
    +    failOnDataLoss: Boolean) extends InputPartition
     
    +object KafkaContinuousReaderFactory extends ContinuousPartitionReaderFactory {
       override def createContinuousReader(
    -      offset: PartitionOffset): InputPartitionReader[InternalRow] = {
    -    val kafkaOffset = offset.asInstanceOf[KafkaSourcePartitionOffset]
    -    require(kafkaOffset.topicPartition == topicPartition,
    -      s"Expected topicPartition: $topicPartition, but got: ${kafkaOffset.topicPartition}")
    -    new KafkaContinuousInputPartitionReader(
    -      topicPartition, kafkaOffset.partitionOffset, kafkaParams, pollTimeoutMs, failOnDataLoss)
    +      partition: InputPartition): ContinuousPartitionReader[InternalRow] = {
    +    val p = partition.asInstanceOf[KafkaContinuousInputPartition]
    +    new KafkaContinuousPartitionReader(
    +      p.topicPartition, p.startOffset, p.kafkaParams, p.pollTimeoutMs, p.failOnDataLoss)
       }
    +}
    +
    +class KafkaContinuousScanConfigBuilder(
    +    schema: StructType,
    +    startOffset: Offset,
    +    offsetReader: KafkaOffsetReader,
    +    reportDataLoss: String => Unit)
    +  extends ScanConfigBuilder {
    +
    +  override def build(): ScanConfig = {
    +    val oldStartPartitionOffsets = KafkaSourceOffset.getPartitionOffsets(startOffset)
    --- End diff --
    
    Moved from https://github.com/apache/spark/pull/22009/files#diff-b35752a92e5ab595a6360d6123c7b7b8L93


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/95033/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/94675/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    **[Test build #94340 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94340/testReport)** for PR 22009 at commit [`c224999`](https://github.com/apache/spark/commit/c22499964ac759670c3629c690f77018bc79a7c1).
     * This patch **fails due to an unknown error code, -9**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22009#discussion_r209726516
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala ---
    @@ -76,41 +76,43 @@ object DataSourceV2Strategy extends Strategy {
       /**
        * Applies column pruning to the data source, w.r.t. the references of the given expressions.
        *
    -   * @return new output attributes after column pruning.
    +   * @return the created `ScanConfig`(since column pruning is the last step of operator pushdown),
    +   *         and new output attributes after column pruning.
    --- End diff --
    
    Why is column pruning not "technically" operator pushdown? This is done by pushing a Project operator to the source.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    **[Test build #94444 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94444/testReport)** for PR 22009 at commit [`2b1c22a`](https://github.com/apache/spark/commit/2b1c22a0e2056c78b5985fbca7759e6a3f3c9280).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22009#discussion_r208390359
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/StreamingReadSupport.java ---
    @@ -0,0 +1,49 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.sources.v2.reader.streaming;
    +
    +import org.apache.spark.sql.sources.v2.reader.ReadSupport;
    +
    +/**
    + * A base interface for streaming read support. This is package private and is invisible to data
    + * sources. Data sources should implement concrete streaming read support interfaces:
    + * {@link MicroBatchReadSupport} or {@link ContinuousReadSupport}.
    + */
    +interface StreamingReadSupport extends ReadSupport {
    +
    +  /**
    +   * Returns the initial offset for a streaming query to start reading from. Note that the
    --- End diff --
    
    I was thinking oldest available, but it's a minor point.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by jose-torres <gi...@git.apache.org>.
Github user jose-torres commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22009#discussion_r209094853
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/StreamingReadSupport.java ---
    @@ -0,0 +1,49 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.sources.v2.reader.streaming;
    +
    +import org.apache.spark.sql.sources.v2.reader.ReadSupport;
    +
    +/**
    + * A base interface for streaming read support. This is package private and is invisible to data
    + * sources. Data sources should implement concrete streaming read support interfaces:
    + * {@link MicroBatchReadSupport} or {@link ContinuousReadSupport}.
    + */
    +interface StreamingReadSupport extends ReadSupport {
    +
    +  /**
    +   * Returns the initial offset for a streaming query to start reading from. Note that the
    +   * streaming data source should not assume that it will start reading from its
    +   * {@link #initialOffset()} value: if Spark is restarting an existing query, it will restart from
    +   * the check-pointed offset rather than the initial one.
    +   */
    +  Offset initialOffset();
    +
    +  /**
    +   * Deserialize a JSON string into an Offset of the implementation-defined offset type.
    +   *
    +   * @throws IllegalArgumentException if the JSON does not encode a valid offset for this reader
    +   */
    +  Offset deserializeOffset(String json);
    +
    +  /**
    +   * Informs the source that Spark has completed processing all data for offsets less than or
    +   * equal to `end` and will only request offsets greater than `end` in the future.
    +   */
    +  void commit(Offset end);
    --- End diff --
    
    I'm not sure what you mean by "scan state" here. The thing that is scanned needs to know what offsets are available for scanning, which requires holding a Kafka consumer to read that information.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22009#discussion_r208333413
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/ContinuousReadSupportProvider.java ---
    @@ -20,17 +20,30 @@
     import java.util.Optional;
     
     import org.apache.spark.annotation.InterfaceStability;
    -import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousReader;
    +import org.apache.spark.sql.sources.v2.reader.ScanConfig;
    +import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousReadSupport;
    +import org.apache.spark.sql.sources.v2.reader.streaming.Offset;
     import org.apache.spark.sql.types.StructType;
     
     /**
      * A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to
    - * provide data reading ability for continuous stream processing.
    + * provide data reading ability for stream processing(continuous mode).
    --- End diff --
    
    Nit: a space is missing. Why not use "for continuous stream processing" instead? I think that's more clear.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    **[Test build #94330 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94330/testReport)** for PR 22009 at commit [`2f6d1d2`](https://github.com/apache/spark/commit/2f6d1d27a2a5aabc0db87b2e97f7f8e6fd6fe91c).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    **[Test build #95071 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95071/testReport)** for PR 22009 at commit [`8833b67`](https://github.com/apache/spark/commit/8833b675852a9464896fca6f393ba531e7ff288c).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/2208/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by jose-torres <gi...@git.apache.org>.
Github user jose-torres commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22009#discussion_r208641014
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/StreamingReadSupport.java ---
    @@ -0,0 +1,49 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.sources.v2.reader.streaming;
    +
    +import org.apache.spark.sql.sources.v2.reader.ReadSupport;
    +
    +/**
    + * A base interface for streaming read support. This is package private and is invisible to data
    + * sources. Data sources should implement concrete streaming read support interfaces:
    + * {@link MicroBatchReadSupport} or {@link ContinuousReadSupport}.
    + */
    +interface StreamingReadSupport extends ReadSupport {
    +
    +  /**
    +   * Returns the initial offset for a streaming query to start reading from. Note that the
    +   * streaming data source should not assume that it will start reading from its
    +   * {@link #initialOffset()} value: if Spark is restarting an existing query, it will restart from
    +   * the check-pointed offset rather than the initial one.
    +   */
    +  Offset initialOffset();
    +
    +  /**
    +   * Deserialize a JSON string into an Offset of the implementation-defined offset type.
    +   *
    +   * @throws IllegalArgumentException if the JSON does not encode a valid offset for this reader
    +   */
    +  Offset deserializeOffset(String json);
    --- End diff --
    
    As I said, I'm fine with defining arbitrary JSON strings as the single non-customizable offset type, if you think that would be better. (I think they would have to be strings, because making a JSON object the type would mean packaging some JSON library into the API.) I don't think it would ever be correct to have an Offset class which doesn't trivially reduce to a key-value map.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by gengliangwang <gi...@git.apache.org>.
Github user gengliangwang commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22009#discussion_r208114465
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownRequiredColumns.java ---
    @@ -21,22 +21,25 @@
     import org.apache.spark.sql.types.StructType;
     
     /**
    - * A mix-in interface for {@link DataSourceReader}. Data source readers can implement this
    + * A mix-in interface for {@link ScanConfigBuilder}. Data sources can implement this
      * interface to push down required columns to the data source and only read these columns during
      * scan to reduce the size of the data to be read.
      */
     @InterfaceStability.Evolving
    -public interface SupportsPushDownRequiredColumns extends DataSourceReader {
    +public interface SupportsPushDownRequiredColumns extends ScanConfigBuilder {
     
       /**
        * Applies column pruning w.r.t. the given requiredSchema.
        *
        * Implementation should try its best to prune the unnecessary columns or nested fields, but it's
        * also OK to do the pruning partially, e.g., a data source may not be able to prune nested
        * fields, and only prune top-level columns.
    -   *
    -   * Note that, data source readers should update {@link DataSourceReader#readSchema()} after
    -   * applying column pruning.
        */
       void pruneColumns(StructType requiredSchema);
    --- End diff --
    
    As we have a new method `prunedSchema`, should we rename this to `pruneSchema`? As the parameter is also schema.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22009#discussion_r208637663
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala ---
    @@ -51,18 +58,19 @@ class DataSourceRDD[T: ClassTag](
             valuePrepared
           }
     
    -      override def next(): T = {
    +      override def next(): Any = {
             if (!hasNext) {
               throw new java.util.NoSuchElementException("End of stream")
             }
             valuePrepared = false
             reader.get()
           }
         }
    -    new InterruptibleIterator(context, iter)
    +    // TODO: get rid of this type hack.
    +    new InterruptibleIterator(context, iter.asInstanceOf[Iterator[InternalRow]])
       }
     
       override def getPreferredLocations(split: Partition): Seq[String] = {
    -    split.asInstanceOf[DataSourceRDDPartition[T]].inputPartition.preferredLocations()
    +    split.asInstanceOf[DataSourceRDDPartition].inputPartition.preferredLocations()
    --- End diff --
    
    Makes sense. If it's a bug, then the error message should indicate that it's a bug to users instead of throwing a `ClassCastException`. Even if someone goes to the code here, it there's no comment to indicate that it's a Spark bug.
    
    I'd prefer something like this:
    ```scala
    override def getPreferredLocations(split: Partition): Seq[String] = {
      split match {
        case dsp: DataSourceRDDPartition => dsp.inputPartition.preferredLocations
        case _ => throw new SparkException(s"[BUG] Not a DataSourceRDDPartition: $split")
      }
    }
    ```


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22009#discussion_r209022127
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/SessionConfigSupport.java ---
    @@ -27,10 +27,10 @@
     @InterfaceStability.Evolving
     public interface SessionConfigSupport extends DataSourceV2 {
    --- End diff --
    
    @cloud-fan, in the `TableCatalog` addition, #21306, I added this to configure catalogs. That way, a catalog is how you add configuration for all tables. Anonymous tables from the `ReadSupportProvider` classes don't really need this. I think we should remove the session config support in favor of using the catalog initialization config.
    
    I also add a CaseInsensitiveStringMap for passing config that we should move to instead of DataSourceOptions. There are two main differences:
    1. DataSourceOptions has methods to retrieve paths, table name, etc. that aren't needed when we use catalogs. Named tables and path-based tables should use catalogs and not pass these through options. Anonymous tables don't need standard ways to pass this information.
    2. It is more clear what the class does: it provides a string to string mapping with case insensitive keys. It isn't as obvious what DataSourceOptions does.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22009#discussion_r208635572
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/MicroBatchReadSupport.java ---
    @@ -0,0 +1,49 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.sources.v2.reader.streaming;
    +
    +import org.apache.spark.annotation.InterfaceStability;
    +import org.apache.spark.sql.execution.streaming.BaseStreamingSource;
    +import org.apache.spark.sql.sources.v2.reader.ScanConfig;
    +import org.apache.spark.sql.sources.v2.reader.ScanConfigBuilder;
    +
    +/**
    + * An interface which defines how to scan the data from data source for streaming processing with
    + * micro-batch mode.
    + */
    +@InterfaceStability.Evolving
    +public interface MicroBatchReadSupport extends StreamingReadSupport, BaseStreamingSource {
    +
    +  /**
    +   * Returns a builder of {@link ScanConfig}. The builder can take some query specific information
    +   * like which operators to pushdown, streaming offsets, etc., and keep these information in the
    +   * created {@link ScanConfig}.
    +   *
    +   * This is the first step of the data scan. All other methods in {@link MicroBatchReadSupport}
    +   * needs to take {@link ScanConfig} as an input.
    +   *
    +   * If this method fails (by throwing an exception), the action will fail and no Spark job will be
    +   * submitted.
    +   */
    +  ScanConfigBuilder newScanConfigBuilder(Offset start, Offset end);
    +
    +  /**
    +   * Returns the most recent offset available.
    +   */
    +  Offset latestOffset(Offset start);
    --- End diff --
    
    Can you explain that more? Isn't the latest offset always the same? How does it depend on start?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/94667/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    **[Test build #94689 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94689/testReport)** for PR 22009 at commit [`f4f85a8`](https://github.com/apache/spark/commit/f4f85a833ef319a6860134e12655574aca081ed6).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    **[Test build #94614 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94614/testReport)** for PR 22009 at commit [`f4f85a8`](https://github.com/apache/spark/commit/f4f85a833ef319a6860134e12655574aca081ed6).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    **[Test build #94377 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94377/testReport)** for PR 22009 at commit [`c224999`](https://github.com/apache/spark/commit/c22499964ac759670c3629c690f77018bc79a7c1).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22009#discussion_r208383579
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala ---
    @@ -93,21 +81,17 @@ case class DataSourceV2ScanExec(
             sparkContext,
             sqlContext.conf.continuousStreamingExecutorQueueSize,
             sqlContext.conf.continuousStreamingExecutorPollIntervalMs,
    -        partitions).asInstanceOf[RDD[InternalRow]]
    -
    -    case r: SupportsScanColumnarBatch if r.enableBatchRead() =>
    -      new DataSourceRDD(sparkContext, batchPartitions).asInstanceOf[RDD[InternalRow]]
    +        partitions,
    +        schema,
    +        partitionReaderFactory.asInstanceOf[ContinuousPartitionReaderFactory])
    --- End diff --
    
    This should not cast. Just call `readSupport.createContinuousReaderFactory(...)` here.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by jose-torres <gi...@git.apache.org>.
Github user jose-torres commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22009#discussion_r209040246
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/MicroBatchReadSupport.java ---
    @@ -0,0 +1,49 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.sources.v2.reader.streaming;
    +
    +import org.apache.spark.annotation.InterfaceStability;
    +import org.apache.spark.sql.execution.streaming.BaseStreamingSource;
    +import org.apache.spark.sql.sources.v2.reader.ScanConfig;
    +import org.apache.spark.sql.sources.v2.reader.ScanConfigBuilder;
    +
    +/**
    + * An interface which defines how to scan the data from data source for streaming processing with
    + * micro-batch mode.
    + */
    +@InterfaceStability.Evolving
    +public interface MicroBatchReadSupport extends StreamingReadSupport, BaseStreamingSource {
    +
    +  /**
    +   * Returns a builder of {@link ScanConfig}. The builder can take some query specific information
    +   * like which operators to pushdown, streaming offsets, etc., and keep these information in the
    +   * created {@link ScanConfig}.
    +   *
    +   * This is the first step of the data scan. All other methods in {@link MicroBatchReadSupport}
    +   * needs to take {@link ScanConfig} as an input.
    +   *
    +   * If this method fails (by throwing an exception), the action will fail and no Spark job will be
    +   * submitted.
    +   */
    +  ScanConfigBuilder newScanConfigBuilder(Offset start, Offset end);
    +
    +  /**
    +   * Returns the most recent offset available.
    +   */
    +  Offset latestOffset(Offset start);
    --- End diff --
    
    I agree in principle, but I don't know of any way to special case it without adding it to the API.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by jose-torres <gi...@git.apache.org>.
Github user jose-torres commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22009#discussion_r209021329
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/StreamingDataWriterFactory.java ---
    @@ -0,0 +1,58 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.sources.v2.writer.streaming;
    +
    +import java.io.Serializable;
    +
    +import org.apache.spark.annotation.InterfaceStability;
    +import org.apache.spark.sql.catalyst.InternalRow;
    +import org.apache.spark.sql.sources.v2.writer.DataWriter;
    +
    +/**
    + * A factory of {@link DataWriter} returned by
    + * {@link StreamingWriteSupport#createStreamingWriterFactory()}, which is responsible for creating
    + * and initializing the actual data writer at executor side.
    + *
    + * Note that, the writer factory will be serialized and sent to executors, then the data writer
    + * will be created on executors and do the actual writing. So this interface must be
    + * serializable and {@link DataWriter} doesn't need to be.
    + */
    +@InterfaceStability.Evolving
    +public interface StreamingDataWriterFactory extends Serializable {
    +
    +  /**
    +   * Returns a data writer to do the actual writing work. Note that, Spark will reuse the same data
    +   * object instance when sending data to the data writer, for better performance. Data writers
    +   * are responsible for defensive copies if necessary, e.g. copy the data before buffer it in a
    +   * list.
    +   *
    +   * If this method fails (by throwing an exception), the action will fail and no Spark job will be
    +   * submitted.
    +   *
    +   * @param partitionId A unique id of the RDD partition that the returned writer will process.
    +   *                    Usually Spark processes many RDD partitions at the same time,
    +   *                    implementations should use the partition id to distinguish writers for
    +   *                    different partitions.
    +   * @param taskId A unique identifier for a task that is performing the write of the partition
    +   *               data. Spark may run multiple tasks for the same partition (due to speculation
    +   *               or task failures, for example).
    --- End diff --
    
    Is it the ID of the task or the ID of one particular attempt of the task? (The target audience here is people who know a reasonable amount about Spark - I think we should just say TaskContext.taskAttemptId() if that's what this is.)


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22009#discussion_r208389947
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousReadSupport.java ---
    @@ -0,0 +1,72 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.sources.v2.reader.streaming;
    +
    +import org.apache.spark.annotation.InterfaceStability;
    +import org.apache.spark.sql.execution.streaming.BaseStreamingSource;
    +import org.apache.spark.sql.sources.v2.reader.InputPartition;
    +import org.apache.spark.sql.sources.v2.reader.ScanConfig;
    +import org.apache.spark.sql.sources.v2.reader.ScanConfigBuilder;
    +
    +/**
    + * An interface which defines how to scan the data from data source for streaming processing with
    + * continuous mode.
    + */
    +@InterfaceStability.Evolving
    +public interface ContinuousReadSupport extends StreamingReadSupport, BaseStreamingSource {
    +
    +  /**
    +   * Returns a builder of {@link ScanConfig}. The builder can take some query specific information
    +   * like which operators to pushdown, streaming offsets, etc., and keep these information in the
    +   * created {@link ScanConfig}.
    +   *
    +   * This is the first step of the data scan. All other methods in {@link ContinuousReadSupport}
    +   * needs to take {@link ScanConfig} as an input.
    +   *
    +   * If this method fails (by throwing an exception), the action will fail and no Spark job will be
    +   * submitted.
    +   */
    +  ScanConfigBuilder newScanConfigBuilder(Offset start);
    +
    +  /**
    +   * Returns a factory to produce {@link ContinuousPartitionReader}s for {@link InputPartition}s.
    +   *
    +   * If this method fails (by throwing an exception), the action will fail and no Spark job will be
    +   * submitted.
    +   */
    +  @Override
    +  ContinuousPartitionReaderFactory createReaderFactory(ScanConfig config);
    +
    +  /**
    +   * Merge partitioned offsets coming from {@link ContinuousPartitionReader} instances
    +   * for each partition to a single global offset.
    +   */
    +  Offset mergeOffsets(PartitionOffset[] offsets);
    +
    +  /**
    +   * The execution engine will call this method in every epoch to determine if new input
    +   * partitions need to be generated, which may be required if for example the underlying
    +   * source system has had partitions added or removed.
    +   *
    +   * If true, the query will be shut down and restarted with a new {@link ContinuousReadSupport}
    +   * instance.
    +   */
    +  default boolean needsReconfiguration() {
    --- End diff --
    
    I think of a ReadSupport as something that can be read or scanned and ContinuousReadSupport as a stream that can be read. In that abstraction, the "something that can be read" probably isn't the right place to track whether a particular scan requires reconfiguration: a *scan* requires reconfiguration if that scan is based on partitions that are out of date.
    
    To me, that indicates that a Kafka `ScanConfig` should keep track of kafka partitions and then `needsReconfiguration` should return true if the Kafka topic now has a different set of partitions than the ones in the `ScanConfig`. Does that make sense?
    
    I think it would also be more consistent in the API to add `ScanConfig` here.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    **[Test build #94338 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94338/testReport)** for PR 22009 at commit [`29b4f33`](https://github.com/apache/spark/commit/29b4f33567ccf6335dfd7dbfee620008c7d50b81).
     * This patch **fails to build**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by jose-torres <gi...@git.apache.org>.
Github user jose-torres commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22009#discussion_r208987767
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/InputPartition.java ---
    @@ -22,18 +22,16 @@
     import org.apache.spark.annotation.InterfaceStability;
     
     /**
    - * An input partition returned by {@link DataSourceReader#planInputPartitions()} and is
    - * responsible for creating the actual data reader of one RDD partition.
    - * The relationship between {@link InputPartition} and {@link InputPartitionReader}
    - * is similar to the relationship between {@link Iterable} and {@link java.util.Iterator}.
    + * An input partition returned by {@link ReadSupport#planInputPartitions(ScanConfig)}, which
    + * represents a data split that should be processed by one Spark task.
    --- End diff --
    
    I'm not sure we need to talk about "data split" - I don't think people will try to implement data sources without knowing what a partition is in Spark.
    
    I'd suggest saying "A serializable representation of an input partition...", to make it clear that this should just contain metadata required to identify what the partition is and not the actual data.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22009#discussion_r208383098
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala ---
    @@ -39,52 +36,43 @@ case class DataSourceV2ScanExec(
         @transient source: DataSourceV2,
         @transient options: Map[String, String],
         @transient pushedFilters: Seq[Expression],
    -    @transient reader: DataSourceReader)
    +    @transient readSupport: ReadSupport,
    +    @transient scanConfig: ScanConfig)
       extends LeafExecNode with DataSourceV2StringFormat with ColumnarBatchScan {
     
       override def simpleString: String = "ScanV2 " + metadataString
     
       // TODO: unify the equal/hashCode implementation for all data source v2 query plans.
       override def equals(other: Any): Boolean = other match {
         case other: DataSourceV2ScanExec =>
    -      output == other.output && reader.getClass == other.reader.getClass && options == other.options
    +      output == other.output && readSupport.getClass == other.readSupport.getClass &&
    +        options == other.options
         case _ => false
       }
     
       override def hashCode(): Int = {
         Seq(output, source, options).hashCode()
       }
     
    -  override def outputPartitioning: physical.Partitioning = reader match {
    -    case r: SupportsScanColumnarBatch if r.enableBatchRead() && batchPartitions.size == 1 =>
    -      SinglePartition
    -
    -    case r: SupportsScanColumnarBatch if !r.enableBatchRead() && partitions.size == 1 =>
    -      SinglePartition
    -
    -    case r if !r.isInstanceOf[SupportsScanColumnarBatch] && partitions.size == 1 =>
    +  override def outputPartitioning: physical.Partitioning = readSupport match {
    +    case _ if partitions.length == 1 =>
           SinglePartition
     
         case s: SupportsReportPartitioning =>
           new DataSourcePartitioning(
    -        s.outputPartitioning(), AttributeMap(output.map(a => a -> a.name)))
    +        s.outputPartitioning(scanConfig), AttributeMap(output.map(a => a -> a.name)))
     
         case _ => super.outputPartitioning
       }
     
    -  private lazy val partitions: Seq[InputPartition[InternalRow]] = {
    -    reader.planInputPartitions().asScala
    -  }
    +  private lazy val partitions: Seq[InputPartition] = readSupport.planInputPartitions(scanConfig)
     
    -  private lazy val batchPartitions: Seq[InputPartition[ColumnarBatch]] = reader match {
    -    case r: SupportsScanColumnarBatch if r.enableBatchRead() =>
    -      assert(!reader.isInstanceOf[ContinuousReader],
    -        "continuous stream reader does not support columnar read yet.")
    -      r.planBatchInputPartitions().asScala
    -  }
    +  private lazy val partitionReaderFactory = readSupport.createReaderFactory(scanConfig)
     
    -  private lazy val inputRDD: RDD[InternalRow] = reader match {
    -    case _: ContinuousReader =>
    +  private lazy val inputRDD: RDD[InternalRow] = readSupport match {
    +    case _: ContinuousReadSupport =>
    +      assert(!partitionReaderFactory.supportColumnarReads(),
    --- End diff --
    
    Can't Spark choose to use InternalRow reads instead? Why can't the source support both?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/2380/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/94443/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    **[Test build #94337 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94337/testReport)** for PR 22009 at commit [`2fc3b05`](https://github.com/apache/spark/commit/2fc3b05dde2c405e2af72b815700780f114767c1).
     * This patch **fails to build**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    **[Test build #94297 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94297/testReport)** for PR 22009 at commit [`291304a`](https://github.com/apache/spark/commit/291304a511e0514aa43c99e87d7772a8d9dadc50).
     * This patch **fails Scala style tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `   * `
      * `   * `


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22009#discussion_r208095752
  
    --- Diff: sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaBatchDataSourceV2.java ---
    @@ -1,114 +0,0 @@
    -/*
    - * Licensed to the Apache Software Foundation (ASF) under one or more
    - * contributor license agreements.  See the NOTICE file distributed with
    - * this work for additional information regarding copyright ownership.
    - * The ASF licenses this file to You under the Apache License, Version 2.0
    - * (the "License"); you may not use this file except in compliance with
    - * the License.  You may obtain a copy of the License at
    - *
    - *    http://www.apache.org/licenses/LICENSE-2.0
    - *
    - * Unless required by applicable law or agreed to in writing, software
    - * distributed under the License is distributed on an "AS IS" BASIS,
    - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    - * See the License for the specific language governing permissions and
    - * limitations under the License.
    - */
    -
    -package test.org.apache.spark.sql.sources.v2;
    -
    -import java.io.IOException;
    -import java.util.List;
    -
    -import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector;
    -import org.apache.spark.sql.sources.v2.DataSourceOptions;
    -import org.apache.spark.sql.sources.v2.DataSourceV2;
    -import org.apache.spark.sql.sources.v2.ReadSupport;
    -import org.apache.spark.sql.sources.v2.reader.*;
    -import org.apache.spark.sql.types.DataTypes;
    -import org.apache.spark.sql.types.StructType;
    -import org.apache.spark.sql.vectorized.ColumnVector;
    -import org.apache.spark.sql.vectorized.ColumnarBatch;
    -
    -
    -public class JavaBatchDataSourceV2 implements DataSourceV2, ReadSupport {
    --- End diff --
    
    This is renamed to `JavaColumnarDataSourceV2`, to avoid confusion about batch vs streaming.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22009#discussion_r208661027
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala ---
    @@ -51,18 +58,19 @@ class DataSourceRDD[T: ClassTag](
             valuePrepared
           }
     
    -      override def next(): T = {
    +      override def next(): Any = {
             if (!hasNext) {
               throw new java.util.NoSuchElementException("End of stream")
             }
             valuePrepared = false
             reader.get()
           }
         }
    -    new InterruptibleIterator(context, iter)
    +    // TODO: get rid of this type hack.
    +    new InterruptibleIterator(context, iter.asInstanceOf[Iterator[InternalRow]])
    --- End diff --
    
    SGTM. Can we do it in followup? The type hack is there for years and the fix may be non-trivial. We also need to fix the file source side.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22009#discussion_r208438810
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousReadSupport.java ---
    @@ -0,0 +1,72 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.sources.v2.reader.streaming;
    +
    +import org.apache.spark.annotation.InterfaceStability;
    +import org.apache.spark.sql.execution.streaming.BaseStreamingSource;
    +import org.apache.spark.sql.sources.v2.reader.InputPartition;
    +import org.apache.spark.sql.sources.v2.reader.ScanConfig;
    +import org.apache.spark.sql.sources.v2.reader.ScanConfigBuilder;
    +
    +/**
    + * An interface which defines how to scan the data from data source for streaming processing with
    + * continuous mode.
    + */
    +@InterfaceStability.Evolving
    +public interface ContinuousReadSupport extends StreamingReadSupport, BaseStreamingSource {
    +
    +  /**
    +   * Returns a builder of {@link ScanConfig}. The builder can take some query specific information
    +   * like which operators to pushdown, streaming offsets, etc., and keep these information in the
    +   * created {@link ScanConfig}.
    +   *
    +   * This is the first step of the data scan. All other methods in {@link ContinuousReadSupport}
    +   * needs to take {@link ScanConfig} as an input.
    +   *
    +   * If this method fails (by throwing an exception), the action will fail and no Spark job will be
    +   * submitted.
    +   */
    +  ScanConfigBuilder newScanConfigBuilder(Offset start);
    +
    +  /**
    +   * Returns a factory to produce {@link ContinuousPartitionReader}s for {@link InputPartition}s.
    +   *
    +   * If this method fails (by throwing an exception), the action will fail and no Spark job will be
    +   * submitted.
    +   */
    +  @Override
    +  ContinuousPartitionReaderFactory createReaderFactory(ScanConfig config);
    --- End diff --
    
    I did it in many places, to allow a data source to implement both batch and streaming without conflicts. But here is a little different, `ContinuousPartitionReaderFactory` is a child of `PartitionReaderFactory`, which means a data source can return `ContinuousPartitionReaderFactory` for both batch and streaming.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    **[Test build #94690 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94690/testReport)** for PR 22009 at commit [`f4f85a8`](https://github.com/apache/spark/commit/f4f85a833ef319a6860134e12655574aca081ed6).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22009#discussion_r210024440
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/StreamingReadSupport.java ---
    @@ -0,0 +1,49 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.sources.v2.reader.streaming;
    +
    +import org.apache.spark.sql.sources.v2.reader.ReadSupport;
    +
    +/**
    + * A base interface for streaming read support. This is package private and is invisible to data
    + * sources. Data sources should implement concrete streaming read support interfaces:
    + * {@link MicroBatchReadSupport} or {@link ContinuousReadSupport}.
    + */
    +interface StreamingReadSupport extends ReadSupport {
    +
    +  /**
    +   * Returns the initial offset for a streaming query to start reading from. Note that the
    +   * streaming data source should not assume that it will start reading from its
    +   * {@link #initialOffset()} value: if Spark is restarting an existing query, it will restart from
    +   * the check-pointed offset rather than the initial one.
    +   */
    +  Offset initialOffset();
    +
    +  /**
    +   * Deserialize a JSON string into an Offset of the implementation-defined offset type.
    +   *
    +   * @throws IllegalArgumentException if the JSON does not encode a valid offset for this reader
    +   */
    +  Offset deserializeOffset(String json);
    --- End diff --
    
    @jose-torres, that's a fair point, this isn't a great place to decide whether serialization should be JSON or not.
    
    It also looks like both serialization and deserialization is delegated to the source in that design doc, since Offset implementations must have a `json` method. That was a big part of my concern I pointed out, so requiring a source to provide both should be okay.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    **[Test build #94484 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94484/testReport)** for PR 22009 at commit [`6728d33`](https://github.com/apache/spark/commit/6728d334b154b1cef24a26d83134366df8ebbc21).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/95029/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22009#discussion_r210155543
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/BatchWriteSupportProvider.java ---
    @@ -21,33 +21,39 @@
     
     import org.apache.spark.annotation.InterfaceStability;
     import org.apache.spark.sql.SaveMode;
    -import org.apache.spark.sql.sources.v2.writer.DataSourceWriter;
    +import org.apache.spark.sql.sources.v2.writer.BatchWriteSupport;
     import org.apache.spark.sql.types.StructType;
     
     /**
      * A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to
    - * provide data writing ability and save the data to the data source.
    + * provide data writing ability for batch processing.
    + *
    + * This interface is used when end users want to use a data source implementation directly, e.g.
    + * {@code Dataset.write.format(...).option(...).save()}.
      */
     @InterfaceStability.Evolving
    -public interface WriteSupport extends DataSourceV2 {
    +public interface BatchWriteSupportProvider extends DataSourceV2 {
     
       /**
    -   * Creates an optional {@link DataSourceWriter} to save the data to this data source. Data
    +   * Creates an optional {@link BatchWriteSupport} to save the data to this data source. Data
        * sources can return None if there is no writing needed to be done according to the save mode.
        *
        * If this method fails (by throwing an exception), the action will fail and no Spark job will be
        * submitted.
        *
    -   * @param writeUUID A unique string for the writing job. It's possible that there are many writing
    -   *                  jobs running at the same time, and the returned {@link DataSourceWriter} can
    -   *                  use this job id to distinguish itself from other jobs.
    +   * @param queryId A unique string for the writing query. It's possible that there are many
    +   *                writing queries running at the same time, and the returned
    +   *                {@link BatchWriteSupport} can use this id to distinguish itself from others.
        * @param schema the schema of the data to be written.
        * @param mode the save mode which determines what to do when the data are already in this data
        *             source, please refer to {@link SaveMode} for more details.
        * @param options the options for the returned data source writer, which is an immutable
        *                case-insensitive string-to-string map.
    -   * @return a writer to append data to this data source
    +   * @return a write support to write data to this data source.
        */
    -  Optional<DataSourceWriter> createWriter(
    -      String writeUUID, StructType schema, SaveMode mode, DataSourceOptions options);
    +  Optional<BatchWriteSupport> createBatchWriteSupport(
    +      String queryId,
    +      StructType schema,
    +      SaveMode mode,
    --- End diff --
    
    I totally agree that `SaveMode` is a bad API which leads to undefined behavior. That's why we start a project to design new DDL logical plans and write APIs. However, I believe we had an agreement before that we can't remove existing APIs, so the `DataFrameWriter` and `SaveMode` will still be there in Spark. If I'm a data source developer, even I've implemented the new write APIs (assuming it's finished), I would still support `SaveMode` to attract more users. `DataFrameWriter` is a very widely used API and the end users may need a long time to migrate to the new write APIs. BTW, file source (without catalog) does have a clearly defined behavior regarding `SaveMode`, we should make it possible to migrate file source to data source v2.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22009#discussion_r211660836
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/BatchReadSupportProvider.java ---
    @@ -18,48 +18,44 @@
     package org.apache.spark.sql.sources.v2;
     
     import org.apache.spark.annotation.InterfaceStability;
    -import org.apache.spark.sql.sources.DataSourceRegister;
    -import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
    +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils;
    +import org.apache.spark.sql.sources.v2.reader.BatchReadSupport;
     import org.apache.spark.sql.types.StructType;
     
     /**
      * A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to
    - * provide data reading ability and scan the data from the data source.
    + * provide data reading ability for batch processing.
    + *
    + * This interface is used to create {@link BatchReadSupport} instances when end users run
    + * {@code SparkSession.read.format(...).option(...).load()}.
      */
     @InterfaceStability.Evolving
    -public interface ReadSupport extends DataSourceV2 {
    +public interface BatchReadSupportProvider extends DataSourceV2 {
     
       /**
    -   * Creates a {@link DataSourceReader} to scan the data from this data source.
    +   * Creates a {@link BatchReadSupport} instance to load the data from this data source with a user
    --- End diff --
    
    although Spark is OK to use a same `ReadSupport` instance across queries, this interface (`BatchReadSupportProvider`) is not going to support it. It's instantiated by reflection everytime `DataFrameReader.load` is called, so the implementation can not reuse a same `ReadSupport` instance.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22009#discussion_r208808788
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ReadSupport.java ---
    @@ -0,0 +1,53 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.sources.v2.reader;
    +
    +import org.apache.spark.annotation.InterfaceStability;
    +import org.apache.spark.sql.types.StructType;
    +
    +/**
    + * The base interface for all the batch and streaming read supports. Data sources should implement
    + * concrete read support interfaces like {@link BatchReadSupport}.
    + */
    +@InterfaceStability.Evolving
    +public interface ReadSupport {
    +
    +  /**
    +   * Returns the full schema of this data source, which is usually the physical schema of the
    +   * underlying storage. This full schema should not be affected by column pruning or other
    +   * optimizations.
    +   *
    +   * If this method fails (by throwing an exception), the action will fail and no Spark job will be
    +   * submitted.
    +   */
    +  StructType fullSchema();
    --- End diff --
    
    This is needed for both batch and streaming.
    
    For streaming source, it must have a consistent schema across epochs. So the `fullSchema` should be defined in `ReadSupport` not `ScanConfig`. Each epoch may have different `readSchema` though, assuming different epochs can apply column pruning differently.
    
    For batch, Spark needs the data source to report schema before any optimization happens, at that time the `ScanConfig` is not created yet.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/2169/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    **[Test build #94336 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94336/testReport)** for PR 22009 at commit [`cab6d28`](https://github.com/apache/spark/commit/cab6d2828dacaca6e62d3409c684d18a1fc861f2).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/1857/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by jose-torres <gi...@git.apache.org>.
Github user jose-torres commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22009#discussion_r209018297
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousReadSupport.java ---
    @@ -0,0 +1,79 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.sources.v2.reader.streaming;
    +
    +import org.apache.spark.annotation.InterfaceStability;
    +import org.apache.spark.sql.execution.streaming.BaseStreamingSource;
    +import org.apache.spark.sql.sources.v2.reader.InputPartition;
    +import org.apache.spark.sql.sources.v2.reader.ScanConfig;
    +import org.apache.spark.sql.sources.v2.reader.ScanConfigBuilder;
    +
    +/**
    + * An interface that defines how to scan the data from data source for continuous streaming
    + * processing.
    + *
    + * The execution engine will create an instance of this interface at the start of a streaming query,
    + * then call {@link #newScanConfigBuilder(Offset)} and create an instance of {@link ScanConfig} for
    + * the duration of the streaming query or until {@link #needsReconfiguration(ScanConfig)} is true.
    + * The {@link ScanConfig} will be used to create input partitions and reader factory to process data
    + * for its duration. At the end {@link #stop()} will be called when the streaming execution is
    + * completed. Note that a single query may have multiple executions due to restart or failure
    + * recovery.
    + */
    +@InterfaceStability.Evolving
    +public interface ContinuousReadSupport extends StreamingReadSupport, BaseStreamingSource {
    +
    +  /**
    +   * Returns a builder of {@link ScanConfig}. The builder can take some query specific information
    +   * to do operators pushdown, streaming offsets, etc., and keep these information in the
    +   * created {@link ScanConfig}.
    +   *
    +   * This is the first step of the data scan. All other methods in {@link ContinuousReadSupport}
    +   * needs to take {@link ScanConfig} as an input.
    +   *
    +   * If this method fails (by throwing an exception), the action will fail and no Spark job will be
    +   * submitted.
    +   */
    +  ScanConfigBuilder newScanConfigBuilder(Offset start);
    +
    +  /**
    +   * Returns a factory to produce {@link ContinuousPartitionReader}s for {@link InputPartition}s.
    +   *
    +   * If this method fails (by throwing an exception), the action will fail and no Spark job will be
    +   * submitted.
    +   */
    +  ContinuousPartitionReaderFactory createContinuousReaderFactory(ScanConfig config);
    +
    +  /**
    +   * Merge partitioned offsets coming from {@link ContinuousPartitionReader} instances
    +   * for each partition to a single global offset.
    +   */
    +  Offset mergeOffsets(PartitionOffset[] offsets);
    +
    +  /**
    +   * The execution engine will call this method in every epoch to determine if new input
    +   * partitions need to be generated, which may be required if for example the underlying
    +   * source system has had partitions added or removed.
    +   *
    +   * If true, the query will be shut down and restarted with a new {@link ContinuousReadSupport}
    --- End diff --
    
    No, I think we just need a new `ScanConfig`. (But this PR is already very large and that will require execution layer changes, so I'd suggest filing a followup for that.)


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    Build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/94802/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    **[Test build #95033 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95033/testReport)** for PR 22009 at commit [`f938614`](https://github.com/apache/spark/commit/f93861467fc1660dd18f6913d0a18dc3e4488e20).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22009#discussion_r208372512
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/MicroBatchReadSupport.java ---
    @@ -0,0 +1,49 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.sources.v2.reader.streaming;
    +
    +import org.apache.spark.annotation.InterfaceStability;
    +import org.apache.spark.sql.execution.streaming.BaseStreamingSource;
    +import org.apache.spark.sql.sources.v2.reader.ScanConfig;
    +import org.apache.spark.sql.sources.v2.reader.ScanConfigBuilder;
    +
    +/**
    + * An interface which defines how to scan the data from data source for streaming processing with
    + * micro-batch mode.
    + */
    +@InterfaceStability.Evolving
    +public interface MicroBatchReadSupport extends StreamingReadSupport, BaseStreamingSource {
    +
    +  /**
    +   * Returns a builder of {@link ScanConfig}. The builder can take some query specific information
    +   * like which operators to pushdown, streaming offsets, etc., and keep these information in the
    +   * created {@link ScanConfig}.
    +   *
    +   * This is the first step of the data scan. All other methods in {@link MicroBatchReadSupport}
    +   * needs to take {@link ScanConfig} as an input.
    +   *
    +   * If this method fails (by throwing an exception), the action will fail and no Spark job will be
    +   * submitted.
    +   */
    +  ScanConfigBuilder newScanConfigBuilder(Offset start, Offset end);
    +
    +  /**
    +   * Returns the most recent offset available.
    +   */
    +  Offset latestOffset(Offset start);
    --- End diff --
    
    Why does this accept a starting offset?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/1886/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    **[Test build #94306 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94306/testReport)** for PR 22009 at commit [`779c0a0`](https://github.com/apache/spark/commit/779c0a07503e35cfcc9031a51faa93f65459e326).
     * This patch **fails Java style tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/1859/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22009#discussion_r208380370
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala ---
    @@ -51,18 +58,19 @@ class DataSourceRDD[T: ClassTag](
             valuePrepared
           }
     
    -      override def next(): T = {
    +      override def next(): Any = {
             if (!hasNext) {
               throw new java.util.NoSuchElementException("End of stream")
             }
             valuePrepared = false
             reader.get()
           }
         }
    -    new InterruptibleIterator(context, iter)
    +    // TODO: get rid of this type hack.
    +    new InterruptibleIterator(context, iter.asInstanceOf[Iterator[InternalRow]])
       }
     
       override def getPreferredLocations(split: Partition): Seq[String] = {
    -    split.asInstanceOf[DataSourceRDDPartition[T]].inputPartition.preferredLocations()
    +    split.asInstanceOf[DataSourceRDDPartition].inputPartition.preferredLocations()
    --- End diff --
    
    Why doesn't this use `match` to check locality and default to no locality?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by jose-torres <gi...@git.apache.org>.
Github user jose-torres commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22009#discussion_r211739221
  
    --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousTest.scala ---
    @@ -47,7 +47,9 @@ trait KafkaContinuousTest extends KafkaSourceTest {
         eventually(timeout(streamingTimeout)) {
           assert(
             query.lastExecution.logical.collectFirst {
    -          case StreamingDataSourceV2Relation(_, _, _, r: KafkaContinuousReader) => r
    +          case r: StreamingDataSourceV2Relation
    +              if r.readSupport.isInstanceOf[KafkaContinuousReadSupport] =>
    +            r.scanConfigBuilder.build().asInstanceOf[KafkaContinuousScanConfig]
    --- End diff --
    
    I think this logic is subtly incorrect (and what's causing the flakiness in the continuous test). It needs to get the actual scan config being used from DataSourceV2ScanExec in the physical plan; r.scanConfigBuilder.build() will always produce the most up-to-date `knownPartitions` value.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    **[Test build #94337 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94337/testReport)** for PR 22009 at commit [`2fc3b05`](https://github.com/apache/spark/commit/2fc3b05dde2c405e2af72b815700780f114767c1).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/94377/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22009#discussion_r208638600
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala ---
    @@ -93,21 +81,17 @@ case class DataSourceV2ScanExec(
             sparkContext,
             sqlContext.conf.continuousStreamingExecutorQueueSize,
             sqlContext.conf.continuousStreamingExecutorPollIntervalMs,
    -        partitions).asInstanceOf[RDD[InternalRow]]
    -
    -    case r: SupportsScanColumnarBatch if r.enableBatchRead() =>
    -      new DataSourceRDD(sparkContext, batchPartitions).asInstanceOf[RDD[InternalRow]]
    +        partitions,
    +        schema,
    +        partitionReaderFactory.asInstanceOf[ContinuousPartitionReaderFactory])
    --- End diff --
    
    However you want to do it is fine with me, but I've seen excessive casting in the SQL back-end so I'm against adding it when it isn't necessary, like this case.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/2135/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22009#discussion_r209301833
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/SessionConfigSupport.java ---
    @@ -27,10 +27,10 @@
     @InterfaceStability.Evolving
     public interface SessionConfigSupport extends DataSourceV2 {
    --- End diff --
    
    how about we keep `DataSourceOptions` a pure string-to-string map and move these `getPath`, `getTableName` methods to a wrapper class? Anyway we can do it in a follower up.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22009#discussion_r208340329
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/BatchReadSupport.java ---
    @@ -0,0 +1,39 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.sources.v2.reader;
    +
    +import org.apache.spark.annotation.InterfaceStability;
    +
    +/**
    + * An interface which defines how to scan the data from data source for batch processing.
    --- End diff --
    
    Nit: "which" -> "that"


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/2134/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by gengliangwang <gi...@git.apache.org>.
Github user gengliangwang commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    retest this please


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22009#discussion_r209022769
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/ContinuousReadSupportProvider.java ---
    @@ -0,0 +1,73 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.sources.v2;
    +
    +import org.apache.spark.annotation.InterfaceStability;
    +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils;
    +import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousReadSupport;
    +import org.apache.spark.sql.types.StructType;
    +
    +/**
    + * A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to
    + * provide data reading ability for continuous stream processing.
    + *
    + * This interface is used when end users want to use a data source implementation directly, e.g.
    + * {@code SparkSession.readStream.format(...).option(...).load()}.
    + */
    +@InterfaceStability.Evolving
    +public interface ContinuousReadSupportProvider extends DataSourceV2 {
    +
    +  /**
    +   * Creates a {@link ContinuousReadSupport} to scan the data from this streaming data source.
    +   *
    +   * If this method fails (by throwing an exception), the action will fail and no Spark job will be
    +   * submitted.
    +   *
    +   * @param schema the user provided schema.
    +   * @param checkpointLocation a path to Hadoop FS scratch space that can be used for failure
    +   *                           recovery. Readers for the same logical source in the same query
    +   *                           will be given the same checkpointLocation.
    +   * @param options the options for the returned data source reader, which is an immutable
    +   *                case-insensitive string-to-string map.
    +   *
    +   * By default this method throws {@link UnsupportedOperationException}, implementations should
    --- End diff --
    
    Javadoc style: this should go above the params.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    @cloud-fan, are there more things to be done here than rebasing?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    **[Test build #94798 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94798/testReport)** for PR 22009 at commit [`e6e599a`](https://github.com/apache/spark/commit/e6e599a9630801078c046d3aec398cf4f046945c).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    retest this please


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22009#discussion_r208636636
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala ---
    @@ -51,18 +58,19 @@ class DataSourceRDD[T: ClassTag](
             valuePrepared
           }
     
    -      override def next(): T = {
    +      override def next(): Any = {
             if (!hasNext) {
               throw new java.util.NoSuchElementException("End of stream")
             }
             valuePrepared = false
             reader.get()
           }
         }
    -    new InterruptibleIterator(context, iter)
    +    // TODO: get rid of this type hack.
    +    new InterruptibleIterator(context, iter.asInstanceOf[Iterator[InternalRow]])
    --- End diff --
    
    Where is the issue to fix this hack?
    
    This seems like something that should never have happened. We can simply have an additional trait on a `RDD[InternalRow]` to fetch the underlying iterator of `ColumnarBatch`. The codegen path should check for that and use it.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/1956/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22009#discussion_r223982672
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala ---
    @@ -169,15 +174,16 @@ object DataSourceV2Relation {
           options: Map[String, String],
           tableIdent: Option[TableIdentifier] = None,
           userSpecifiedSchema: Option[StructType] = None): DataSourceV2Relation = {
    -    val reader = source.createReader(options, userSpecifiedSchema)
    +    val readSupport = source.createReadSupport(options, userSpecifiedSchema)
    --- End diff --
    
    Looks not directly related with this PR but I think this is a good place to ask. Why do we make a readsupport in write path?
    
    https://github.com/apache/spark/blob/e06da95cd9423f55cdb154a2778b0bddf7be984c/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala#L249
    
    Retrieving the physical schema of the underlying storage is potentially expensive. Actually even worse: it looks odd that write path requires read side's schema. Which schema should we expect here in write path?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    **[Test build #94823 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94823/testReport)** for PR 22009 at commit [`2018981`](https://github.com/apache/spark/commit/2018981f1261dc8c0b63572be78410b3274d0f4a).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22009#discussion_r208334273
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/BatchReadSupportProvider.java ---
    @@ -19,18 +19,18 @@
     
     import org.apache.spark.annotation.InterfaceStability;
     import org.apache.spark.sql.sources.DataSourceRegister;
    -import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
    +import org.apache.spark.sql.sources.v2.reader.BatchReadSupport;
     import org.apache.spark.sql.types.StructType;
     
     /**
      * A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to
    - * provide data reading ability and scan the data from the data source.
    + * provide data reading ability for batch processing.
    --- End diff --
    
    I think this interface (and the continuous and micro-batch equivalents) should note that returning a `ReadSupport` from options is for sources with no catalog support or to use an implementation directly. Maybe we should add this after #21306 is in though. What do you think?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/94448/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    **[Test build #94667 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94667/testReport)** for PR 22009 at commit [`f4f85a8`](https://github.com/apache/spark/commit/f4f85a833ef319a6860134e12655574aca081ed6).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by jose-torres <gi...@git.apache.org>.
Github user jose-torres commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22009#discussion_r208425737
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/StreamingReadSupport.java ---
    @@ -0,0 +1,49 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.sources.v2.reader.streaming;
    +
    +import org.apache.spark.sql.sources.v2.reader.ReadSupport;
    +
    +/**
    + * A base interface for streaming read support. This is package private and is invisible to data
    + * sources. Data sources should implement concrete streaming read support interfaces:
    + * {@link MicroBatchReadSupport} or {@link ContinuousReadSupport}.
    + */
    +interface StreamingReadSupport extends ReadSupport {
    +
    +  /**
    +   * Returns the initial offset for a streaming query to start reading from. Note that the
    +   * streaming data source should not assume that it will start reading from its
    +   * {@link #initialOffset()} value: if Spark is restarting an existing query, it will restart from
    +   * the check-pointed offset rather than the initial one.
    +   */
    +  Offset initialOffset();
    +
    +  /**
    +   * Deserialize a JSON string into an Offset of the implementation-defined offset type.
    +   *
    +   * @throws IllegalArgumentException if the JSON does not encode a valid offset for this reader
    +   */
    +  Offset deserializeOffset(String json);
    --- End diff --
    
    I guess my core point is, we should stick with the existing serialization mechanism unless there's some kind of serialization we need to do which only a byte array can express. The serialization mechanism reaches deep into the execution layer, so coupling it with a connector API revamp is awkward.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22009#discussion_r209023367
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceV2.java ---
    @@ -23,8 +23,9 @@
      * The base interface for data source v2. Implementations must have a public, 0-arg constructor.
      *
      * Note that this is an empty interface. Data source implementations should mix-in at least one of
    - * the plug-in interfaces like {@link ReadSupport} and {@link WriteSupport}. Otherwise it's just
    - * a dummy data source which is un-readable/writable.
    + * the provider interfaces like {@link BatchReadSupportProvider} or
    --- End diff --
    
    I think this should say that implementations can mix-in BatchReadSupportProvider etc. to provide anonymous tables for use with the DF writer API. When `TableCatalog` is in, this won't be required at all.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    **[Test build #94443 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94443/testReport)** for PR 22009 at commit [`063fe27`](https://github.com/apache/spark/commit/063fe27bf3b31fbbb2442fa96fa39a98360e4a28).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by jose-torres <gi...@git.apache.org>.
Github user jose-torres commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22009#discussion_r208372469
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/StreamingReadSupport.java ---
    @@ -0,0 +1,49 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.sources.v2.reader.streaming;
    +
    +import org.apache.spark.sql.sources.v2.reader.ReadSupport;
    +
    +/**
    + * A base interface for streaming read support. This is package private and is invisible to data
    + * sources. Data sources should implement concrete streaming read support interfaces:
    + * {@link MicroBatchReadSupport} or {@link ContinuousReadSupport}.
    + */
    +interface StreamingReadSupport extends ReadSupport {
    +
    +  /**
    +   * Returns the initial offset for a streaming query to start reading from. Note that the
    +   * streaming data source should not assume that it will start reading from its
    +   * {@link #initialOffset()} value: if Spark is restarting an existing query, it will restart from
    +   * the check-pointed offset rather than the initial one.
    +   */
    +  Offset initialOffset();
    +
    +  /**
    +   * Deserialize a JSON string into an Offset of the implementation-defined offset type.
    +   *
    +   * @throws IllegalArgumentException if the JSON does not encode a valid offset for this reader
    +   */
    +  Offset deserializeOffset(String json);
    --- End diff --
    
    The offsets are ultimately exposed as JSON inside the JSON representation of StreamingQueryProgress. It's important for visibility and debuggability that progress events contain human-readable representations of offsets.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22009#discussion_r208384141
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala ---
    @@ -80,17 +80,17 @@ object DataSourceV2Strategy extends Strategy {
        */
       // TODO: nested column pruning.
       private def pruneColumns(
    -      reader: DataSourceReader,
    +      configBuilder: ScanConfigBuilder,
           relation: DataSourceV2Relation,
           exprs: Seq[Expression]): Seq[AttributeReference] = {
    -    reader match {
    +    configBuilder match {
           case r: SupportsPushDownRequiredColumns =>
             val requiredColumns = AttributeSet(exprs.flatMap(_.references))
             val neededOutput = relation.output.filter(requiredColumns.contains)
             if (neededOutput != relation.output) {
               r.pruneColumns(neededOutput.toStructType)
               val nameToAttr = relation.output.map(_.name).zip(relation.output).toMap
    -          r.readSchema().toAttributes.map {
    +          r.prunedSchema().toAttributes.map {
    --- End diff --
    
    As I noted earlier, this shouldn't get the scan's schema until the scan is fully configured.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22009#discussion_r224004348
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala ---
    @@ -169,15 +174,16 @@ object DataSourceV2Relation {
           options: Map[String, String],
           tableIdent: Option[TableIdentifier] = None,
           userSpecifiedSchema: Option[StructType] = None): DataSourceV2Relation = {
    -    val reader = source.createReader(options, userSpecifiedSchema)
    +    val readSupport = source.createReadSupport(options, userSpecifiedSchema)
    --- End diff --
    
    This is kind of a behavior change. Now when we append data to a data source, the data source must be readable, to provide a schema, which will be used to validate the input data schema.
    
    I don't have a strong feeling. Data source v2 is marked as involving so necessary behavior change is fine. cc @rdblue 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/94340/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    **[Test build #95033 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95033/testReport)** for PR 22009 at commit [`f938614`](https://github.com/apache/spark/commit/f93861467fc1660dd18f6913d0a18dc3e4488e20).
     * This patch **fails PySpark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/2051/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/95071/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    **[Test build #95029 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95029/testReport)** for PR 22009 at commit [`ca80080`](https://github.com/apache/spark/commit/ca80080c3069339a8b88356cbb604ad5838944d9).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22009#discussion_r209712363
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/BatchWriteSupportProvider.java ---
    @@ -21,33 +21,39 @@
     
     import org.apache.spark.annotation.InterfaceStability;
     import org.apache.spark.sql.SaveMode;
    -import org.apache.spark.sql.sources.v2.writer.DataSourceWriter;
    +import org.apache.spark.sql.sources.v2.writer.BatchWriteSupport;
     import org.apache.spark.sql.types.StructType;
     
     /**
      * A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to
    - * provide data writing ability and save the data to the data source.
    + * provide data writing ability for batch processing.
    + *
    + * This interface is used when end users want to use a data source implementation directly, e.g.
    + * {@code Dataset.write.format(...).option(...).save()}.
      */
     @InterfaceStability.Evolving
    -public interface WriteSupport extends DataSourceV2 {
    +public interface BatchWriteSupportProvider extends DataSourceV2 {
     
       /**
    -   * Creates an optional {@link DataSourceWriter} to save the data to this data source. Data
    +   * Creates an optional {@link BatchWriteSupport} to save the data to this data source. Data
        * sources can return None if there is no writing needed to be done according to the save mode.
        *
        * If this method fails (by throwing an exception), the action will fail and no Spark job will be
        * submitted.
        *
    -   * @param writeUUID A unique string for the writing job. It's possible that there are many writing
    -   *                  jobs running at the same time, and the returned {@link DataSourceWriter} can
    -   *                  use this job id to distinguish itself from other jobs.
    +   * @param queryId A unique string for the writing query. It's possible that there are many
    +   *                writing queries running at the same time, and the returned
    +   *                {@link BatchWriteSupport} can use this id to distinguish itself from others.
        * @param schema the schema of the data to be written.
        * @param mode the save mode which determines what to do when the data are already in this data
        *             source, please refer to {@link SaveMode} for more details.
        * @param options the options for the returned data source writer, which is an immutable
        *                case-insensitive string-to-string map.
    -   * @return a writer to append data to this data source
    +   * @return a write support to write data to this data source.
        */
    -  Optional<DataSourceWriter> createWriter(
    -      String writeUUID, StructType schema, SaveMode mode, DataSourceOptions options);
    +  Optional<BatchWriteSupport> createBatchWriteSupport(
    +      String queryId,
    +      StructType schema,
    +      SaveMode mode,
    --- End diff --
    
    I think that the v2 sources should only use the plans proposed in the SPIP. I also think that the v2 data source API should always tell the data source exactly what to do: for overwrite, what should be deleted and what should be added.
    
    That doesn't block fixing the v2 API here and doesn't prevent anyone from using it. But it would prevent people from relying on undefined behavior that results from passing an ambiguous `SaveMode` to a source.
    
    The only thing that would not be available by doing this is overwrite support by using the SaveMode, which isn't something anyone should rely on because it doesn't have defined behavior.
    
    I understand that this may seem like it would block migration from the v1 API to the v2 API. If so, that's the right thing to do until we clearly define how v2 sources behave.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by jose-torres <gi...@git.apache.org>.
Github user jose-torres commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22009#discussion_r209708483
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/BatchWriteSupportProvider.java ---
    @@ -21,33 +21,39 @@
     
     import org.apache.spark.annotation.InterfaceStability;
     import org.apache.spark.sql.SaveMode;
    -import org.apache.spark.sql.sources.v2.writer.DataSourceWriter;
    +import org.apache.spark.sql.sources.v2.writer.BatchWriteSupport;
     import org.apache.spark.sql.types.StructType;
     
     /**
      * A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to
    - * provide data writing ability and save the data to the data source.
    + * provide data writing ability for batch processing.
    + *
    + * This interface is used when end users want to use a data source implementation directly, e.g.
    + * {@code Dataset.write.format(...).option(...).save()}.
      */
     @InterfaceStability.Evolving
    -public interface WriteSupport extends DataSourceV2 {
    +public interface BatchWriteSupportProvider extends DataSourceV2 {
     
       /**
    -   * Creates an optional {@link DataSourceWriter} to save the data to this data source. Data
    +   * Creates an optional {@link BatchWriteSupport} to save the data to this data source. Data
        * sources can return None if there is no writing needed to be done according to the save mode.
        *
        * If this method fails (by throwing an exception), the action will fail and no Spark job will be
        * submitted.
        *
    -   * @param writeUUID A unique string for the writing job. It's possible that there are many writing
    -   *                  jobs running at the same time, and the returned {@link DataSourceWriter} can
    -   *                  use this job id to distinguish itself from other jobs.
    +   * @param queryId A unique string for the writing query. It's possible that there are many
    +   *                writing queries running at the same time, and the returned
    +   *                {@link BatchWriteSupport} can use this id to distinguish itself from others.
        * @param schema the schema of the data to be written.
        * @param mode the save mode which determines what to do when the data are already in this data
        *             source, please refer to {@link SaveMode} for more details.
        * @param options the options for the returned data source writer, which is an immutable
        *                case-insensitive string-to-string map.
    -   * @return a writer to append data to this data source
    +   * @return a write support to write data to this data source.
        */
    -  Optional<DataSourceWriter> createWriter(
    -      String writeUUID, StructType schema, SaveMode mode, DataSourceOptions options);
    +  Optional<BatchWriteSupport> createBatchWriteSupport(
    +      String queryId,
    +      StructType schema,
    +      SaveMode mode,
    --- End diff --
    
    To clarify, your proposal is that we should block the completion of DataSourceV2 until the new logical plans are in place?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    **[Test build #95076 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95076/testReport)** for PR 22009 at commit [`51cda76`](https://github.com/apache/spark/commit/51cda76897353344427aaa666e29be408263eeb1).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22009#discussion_r224023061
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala ---
    @@ -169,15 +174,16 @@ object DataSourceV2Relation {
           options: Map[String, String],
           tableIdent: Option[TableIdentifier] = None,
           userSpecifiedSchema: Option[StructType] = None): DataSourceV2Relation = {
    -    val reader = source.createReader(options, userSpecifiedSchema)
    +    val readSupport = source.createReadSupport(options, userSpecifiedSchema)
    --- End diff --
    
    Another point is which schema you would expect the datasource return in that case. For instance, `spark.range(1).write.format("source").save()`. It's odd that `source` should provide a schema. I mean it's logically weird. How does the source provide the schema?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    **[Test build #94732 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94732/testReport)** for PR 22009 at commit [`f4f85a8`](https://github.com/apache/spark/commit/f4f85a833ef319a6860134e12655574aca081ed6).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    Build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22009#discussion_r208808091
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala ---
    @@ -76,41 +76,43 @@ object DataSourceV2Strategy extends Strategy {
       /**
        * Applies column pruning to the data source, w.r.t. the references of the given expressions.
        *
    -   * @return new output attributes after column pruning.
    +   * @return the created `ScanConfig`(since column pruning is the last step of operator pushdown),
    +   *         and new output attributes after column pruning.
    --- End diff --
    
    Technically column pruning is not operator pushdown. It's kind of a property of the SQL operator, indicating which columns/nested fields the parent operator needs.
    
    Column pruning is orthogonal to any operator pushdown: when operator pushdown is finished, Spark will check the remaining query plans at Spark side(assuming some predicates are not pushed), calculate the required columns for the scan node, and pass it as a hint to the data source.
    
    I'm fine with the current API, but we should have a better API for column pruning in the future.
    
    BTW for now I think adding `prunedSchema` to `SupportsPushdownRequiredColumns` is better. Data sources that don't support operator pushdown don't need to implement `ScanConfig.readSchema`. But I don't have a strong opinion.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/2411/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/95076/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22009#discussion_r208333129
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/BatchWriteSupportProvider.java ---
    @@ -21,32 +21,32 @@
     
     import org.apache.spark.annotation.InterfaceStability;
     import org.apache.spark.sql.SaveMode;
    -import org.apache.spark.sql.sources.v2.writer.DataSourceWriter;
    +import org.apache.spark.sql.sources.v2.writer.BatchWriteSupport;
     import org.apache.spark.sql.types.StructType;
     
     /**
      * A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to
    - * provide data writing ability and save the data to the data source.
    + * provide data writing ability for batch processing.
      */
     @InterfaceStability.Evolving
    -public interface WriteSupport extends DataSourceV2 {
    +public interface BatchWriteSupportProvider extends DataSourceV2 {
     
       /**
    -   * Creates an optional {@link DataSourceWriter} to save the data to this data source. Data
    +   * Creates an optional {@link BatchWriteSupport} to save the data to this data source. Data
        * sources can return None if there is no writing needed to be done according to the save mode.
        *
        * If this method fails (by throwing an exception), the action will fail and no Spark job will be
        * submitted.
        *
        * @param jobId A unique string for the writing job. It's possible that there are many writing
    -   *              jobs running at the same time, and the returned {@link DataSourceWriter} can
    +   *              jobs running at the same time, and the returned {@link BatchWriteSupport} can
        *              use this job id to distinguish itself from other jobs.
        * @param schema the schema of the data to be written.
        * @param mode the save mode which determines what to do when the data are already in this data
        *             source, please refer to {@link SaveMode} for more details.
        * @param options the options for the returned data source writer, which is an immutable
        *                case-insensitive string-to-string map.
        */
    -  Optional<DataSourceWriter> createWriter(
    +  Optional<BatchWriteSupport> createBatchWriteSupport(
           String jobId, StructType schema, SaveMode mode, DataSourceOptions options);
    --- End diff --
    
    The proposal removes the `jobId` string and the `SaveMode`. Shouldn't that be done in this PR?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22009#discussion_r208368798
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousReadSupport.java ---
    @@ -0,0 +1,72 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.sources.v2.reader.streaming;
    +
    +import org.apache.spark.annotation.InterfaceStability;
    +import org.apache.spark.sql.execution.streaming.BaseStreamingSource;
    +import org.apache.spark.sql.sources.v2.reader.InputPartition;
    +import org.apache.spark.sql.sources.v2.reader.ScanConfig;
    +import org.apache.spark.sql.sources.v2.reader.ScanConfigBuilder;
    +
    +/**
    + * An interface which defines how to scan the data from data source for streaming processing with
    + * continuous mode.
    + */
    +@InterfaceStability.Evolving
    +public interface ContinuousReadSupport extends StreamingReadSupport, BaseStreamingSource {
    +
    +  /**
    +   * Returns a builder of {@link ScanConfig}. The builder can take some query specific information
    +   * like which operators to pushdown, streaming offsets, etc., and keep these information in the
    +   * created {@link ScanConfig}.
    +   *
    +   * This is the first step of the data scan. All other methods in {@link ContinuousReadSupport}
    +   * needs to take {@link ScanConfig} as an input.
    +   *
    +   * If this method fails (by throwing an exception), the action will fail and no Spark job will be
    +   * submitted.
    +   */
    +  ScanConfigBuilder newScanConfigBuilder(Offset start);
    +
    +  /**
    +   * Returns a factory to produce {@link ContinuousPartitionReader}s for {@link InputPartition}s.
    +   *
    +   * If this method fails (by throwing an exception), the action will fail and no Spark job will be
    +   * submitted.
    +   */
    +  @Override
    +  ContinuousPartitionReaderFactory createReaderFactory(ScanConfig config);
    +
    +  /**
    +   * Merge partitioned offsets coming from {@link ContinuousPartitionReader} instances
    +   * for each partition to a single global offset.
    +   */
    +  Offset mergeOffsets(PartitionOffset[] offsets);
    +
    +  /**
    +   * The execution engine will call this method in every epoch to determine if new input
    +   * partitions need to be generated, which may be required if for example the underlying
    +   * source system has had partitions added or removed.
    +   *
    +   * If true, the query will be shut down and restarted with a new {@link ContinuousReadSupport}
    +   * instance.
    +   */
    +  default boolean needsReconfiguration() {
    --- End diff --
    
    Why doesn't this accept a `ScanConfig`? Aren't changes to the source only relevant if they affect a scan?
    
    cc @jose-torres.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    retest this please


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    **[Test build #94301 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94301/testReport)** for PR 22009 at commit [`6bf8e9d`](https://github.com/apache/spark/commit/6bf8e9d72979adc0b7b452eaf152086622481fd3).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    **[Test build #95029 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95029/testReport)** for PR 22009 at commit [`ca80080`](https://github.com/apache/spark/commit/ca80080c3069339a8b88356cbb604ad5838944d9).
     * This patch **fails to build**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/94336/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by jose-torres <gi...@git.apache.org>.
Github user jose-torres commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22009#discussion_r208391449
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousReadSupport.java ---
    @@ -0,0 +1,72 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.sources.v2.reader.streaming;
    +
    +import org.apache.spark.annotation.InterfaceStability;
    +import org.apache.spark.sql.execution.streaming.BaseStreamingSource;
    +import org.apache.spark.sql.sources.v2.reader.InputPartition;
    +import org.apache.spark.sql.sources.v2.reader.ScanConfig;
    +import org.apache.spark.sql.sources.v2.reader.ScanConfigBuilder;
    +
    +/**
    + * An interface which defines how to scan the data from data source for streaming processing with
    + * continuous mode.
    + */
    +@InterfaceStability.Evolving
    +public interface ContinuousReadSupport extends StreamingReadSupport, BaseStreamingSource {
    +
    +  /**
    +   * Returns a builder of {@link ScanConfig}. The builder can take some query specific information
    +   * like which operators to pushdown, streaming offsets, etc., and keep these information in the
    +   * created {@link ScanConfig}.
    +   *
    +   * This is the first step of the data scan. All other methods in {@link ContinuousReadSupport}
    +   * needs to take {@link ScanConfig} as an input.
    +   *
    +   * If this method fails (by throwing an exception), the action will fail and no Spark job will be
    +   * submitted.
    +   */
    +  ScanConfigBuilder newScanConfigBuilder(Offset start);
    +
    +  /**
    +   * Returns a factory to produce {@link ContinuousPartitionReader}s for {@link InputPartition}s.
    +   *
    +   * If this method fails (by throwing an exception), the action will fail and no Spark job will be
    +   * submitted.
    +   */
    +  @Override
    +  ContinuousPartitionReaderFactory createReaderFactory(ScanConfig config);
    +
    +  /**
    +   * Merge partitioned offsets coming from {@link ContinuousPartitionReader} instances
    +   * for each partition to a single global offset.
    +   */
    +  Offset mergeOffsets(PartitionOffset[] offsets);
    +
    +  /**
    +   * The execution engine will call this method in every epoch to determine if new input
    +   * partitions need to be generated, which may be required if for example the underlying
    +   * source system has had partitions added or removed.
    +   *
    +   * If true, the query will be shut down and restarted with a new {@link ContinuousReadSupport}
    +   * instance.
    +   */
    +  default boolean needsReconfiguration() {
    --- End diff --
    
    Makes sense to me.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22009#discussion_r211628439
  
    --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReadSupport.scala ---
    @@ -47,70 +46,49 @@ import org.apache.spark.sql.types.StructType
      *                       scenarios, where some offsets after the specified initial ones can't be
      *                       properly read.
      */
    -class KafkaContinuousReader(
    +class KafkaContinuousReadSupport(
         offsetReader: KafkaOffsetReader,
         kafkaParams: ju.Map[String, Object],
         sourceOptions: Map[String, String],
         metadataPath: String,
         initialOffsets: KafkaOffsetRangeLimit,
         failOnDataLoss: Boolean)
    -  extends ContinuousReader with Logging {
    -
    -  private lazy val session = SparkSession.getActiveSession.get
    -  private lazy val sc = session.sparkContext
    +  extends ContinuousReadSupport with Logging {
     
       private val pollTimeoutMs = sourceOptions.getOrElse("kafkaConsumer.pollTimeoutMs", "512").toLong
     
    -  // Initialized when creating reader factories. If this diverges from the partitions at the latest
    -  // offsets, we need to reconfigure.
    -  // Exposed outside this object only for unit tests.
    -  @volatile private[sql] var knownPartitions: Set[TopicPartition] = _
    --- End diff --
    
    Moved to `KafkaContinuousScanConfig`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    **[Test build #94467 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94467/testReport)** for PR 22009 at commit [`6728d33`](https://github.com/apache/spark/commit/6728d334b154b1cef24a26d83134366df8ebbc21).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/94467/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    Does this replace the other PR? I haven't looked at that one yet. If this is ready to review and follows the doc, I can review it.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    **[Test build #94294 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94294/testReport)** for PR 22009 at commit [`770a43d`](https://github.com/apache/spark/commit/770a43dfdc1648dd0fb91eea2249da728cfdb360).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    @cloud-fan Post these follow-up tasks in the PR description?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22009#discussion_r209094259
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/StreamingReadSupport.java ---
    @@ -0,0 +1,49 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.sources.v2.reader.streaming;
    +
    +import org.apache.spark.sql.sources.v2.reader.ReadSupport;
    +
    +/**
    + * A base interface for streaming read support. This is package private and is invisible to data
    + * sources. Data sources should implement concrete streaming read support interfaces:
    + * {@link MicroBatchReadSupport} or {@link ContinuousReadSupport}.
    + */
    +interface StreamingReadSupport extends ReadSupport {
    +
    +  /**
    +   * Returns the initial offset for a streaming query to start reading from. Note that the
    +   * streaming data source should not assume that it will start reading from its
    +   * {@link #initialOffset()} value: if Spark is restarting an existing query, it will restart from
    +   * the check-pointed offset rather than the initial one.
    +   */
    +  Offset initialOffset();
    +
    +  /**
    +   * Deserialize a JSON string into an Offset of the implementation-defined offset type.
    +   *
    +   * @throws IllegalArgumentException if the JSON does not encode a valid offset for this reader
    +   */
    +  Offset deserializeOffset(String json);
    +
    +  /**
    +   * Informs the source that Spark has completed processing all data for offsets less than or
    +   * equal to `end` and will only request offsets greater than `end` in the future.
    +   */
    +  void commit(Offset end);
    --- End diff --
    
    Why not? A `ScanConfig` represents the scan. The thing that is scanned shouldn't store the scan state.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by jose-torres <gi...@git.apache.org>.
Github user jose-torres commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22009#discussion_r208642760
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/MicroBatchReadSupport.java ---
    @@ -0,0 +1,49 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.sources.v2.reader.streaming;
    +
    +import org.apache.spark.annotation.InterfaceStability;
    +import org.apache.spark.sql.execution.streaming.BaseStreamingSource;
    +import org.apache.spark.sql.sources.v2.reader.ScanConfig;
    +import org.apache.spark.sql.sources.v2.reader.ScanConfigBuilder;
    +
    +/**
    + * An interface which defines how to scan the data from data source for streaming processing with
    + * micro-batch mode.
    + */
    +@InterfaceStability.Evolving
    +public interface MicroBatchReadSupport extends StreamingReadSupport, BaseStreamingSource {
    +
    +  /**
    +   * Returns a builder of {@link ScanConfig}. The builder can take some query specific information
    +   * like which operators to pushdown, streaming offsets, etc., and keep these information in the
    +   * created {@link ScanConfig}.
    +   *
    +   * This is the first step of the data scan. All other methods in {@link MicroBatchReadSupport}
    +   * needs to take {@link ScanConfig} as an input.
    +   *
    +   * If this method fails (by throwing an exception), the action will fail and no Spark job will be
    +   * submitted.
    +   */
    +  ScanConfigBuilder newScanConfigBuilder(Offset start, Offset end);
    +
    +  /**
    +   * Returns the most recent offset available.
    +   */
    +  Offset latestOffset(Offset start);
    --- End diff --
    
    There's a weak form of rate control implemented by simply having sources lie about what the latest offset is. For example you might set maxOffsetsPerTrigger = 100, and then the Kafka source will pretend that only 100 more offsets exist even if there are really more available.
    
    Unfortunately, we're going to need to continue to support such options at least until the next major version after we have better rate limiting, so I don't think this can be removed from the source API right now.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22009#discussion_r209042148
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/StreamingReadSupport.java ---
    @@ -0,0 +1,49 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.sources.v2.reader.streaming;
    +
    +import org.apache.spark.sql.sources.v2.reader.ReadSupport;
    +
    +/**
    + * A base interface for streaming read support. This is package private and is invisible to data
    + * sources. Data sources should implement concrete streaming read support interfaces:
    + * {@link MicroBatchReadSupport} or {@link ContinuousReadSupport}.
    + */
    +interface StreamingReadSupport extends ReadSupport {
    +
    +  /**
    +   * Returns the initial offset for a streaming query to start reading from. Note that the
    +   * streaming data source should not assume that it will start reading from its
    +   * {@link #initialOffset()} value: if Spark is restarting an existing query, it will restart from
    +   * the check-pointed offset rather than the initial one.
    +   */
    +  Offset initialOffset();
    +
    +  /**
    +   * Deserialize a JSON string into an Offset of the implementation-defined offset type.
    +   *
    +   * @throws IllegalArgumentException if the JSON does not encode a valid offset for this reader
    +   */
    +  Offset deserializeOffset(String json);
    +
    +  /**
    +   * Informs the source that Spark has completed processing all data for offsets less than or
    +   * equal to `end` and will only request offsets greater than `end` in the future.
    +   */
    +  void commit(Offset end);
    --- End diff --
    
    In Kafka, isn't the ScanConfig what identifies the consumer group? @jose-torres 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    **[Test build #94484 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94484/testReport)** for PR 22009 at commit [`6728d33`](https://github.com/apache/spark/commit/6728d334b154b1cef24a26d83134366df8ebbc21).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    **[Test build #94732 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94732/testReport)** for PR 22009 at commit [`f4f85a8`](https://github.com/apache/spark/commit/f4f85a833ef319a6860134e12655574aca081ed6).
     * This patch **fails due to an unknown error code, -9**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    **[Test build #94779 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94779/testReport)** for PR 22009 at commit [`f4f85a8`](https://github.com/apache/spark/commit/f4f85a833ef319a6860134e12655574aca081ed6).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22009#discussion_r208660320
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala ---
    @@ -39,52 +36,43 @@ case class DataSourceV2ScanExec(
         @transient source: DataSourceV2,
         @transient options: Map[String, String],
         @transient pushedFilters: Seq[Expression],
    -    @transient reader: DataSourceReader)
    +    @transient readSupport: ReadSupport,
    +    @transient scanConfig: ScanConfig)
       extends LeafExecNode with DataSourceV2StringFormat with ColumnarBatchScan {
     
       override def simpleString: String = "ScanV2 " + metadataString
     
       // TODO: unify the equal/hashCode implementation for all data source v2 query plans.
       override def equals(other: Any): Boolean = other match {
         case other: DataSourceV2ScanExec =>
    -      output == other.output && reader.getClass == other.reader.getClass && options == other.options
    +      output == other.output && readSupport.getClass == other.readSupport.getClass &&
    +        options == other.options
         case _ => false
       }
     
       override def hashCode(): Int = {
         Seq(output, source, options).hashCode()
       }
     
    -  override def outputPartitioning: physical.Partitioning = reader match {
    -    case r: SupportsScanColumnarBatch if r.enableBatchRead() && batchPartitions.size == 1 =>
    -      SinglePartition
    -
    -    case r: SupportsScanColumnarBatch if !r.enableBatchRead() && partitions.size == 1 =>
    -      SinglePartition
    -
    -    case r if !r.isInstanceOf[SupportsScanColumnarBatch] && partitions.size == 1 =>
    +  override def outputPartitioning: physical.Partitioning = readSupport match {
    +    case _ if partitions.length == 1 =>
           SinglePartition
     
         case s: SupportsReportPartitioning =>
           new DataSourcePartitioning(
    -        s.outputPartitioning(), AttributeMap(output.map(a => a -> a.name)))
    +        s.outputPartitioning(scanConfig), AttributeMap(output.map(a => a -> a.name)))
     
         case _ => super.outputPartitioning
       }
     
    -  private lazy val partitions: Seq[InputPartition[InternalRow]] = {
    -    reader.planInputPartitions().asScala
    -  }
    +  private lazy val partitions: Seq[InputPartition] = readSupport.planInputPartitions(scanConfig)
     
    -  private lazy val batchPartitions: Seq[InputPartition[ColumnarBatch]] = reader match {
    -    case r: SupportsScanColumnarBatch if r.enableBatchRead() =>
    -      assert(!reader.isInstanceOf[ContinuousReader],
    -        "continuous stream reader does not support columnar read yet.")
    -      r.planBatchInputPartitions().asScala
    -  }
    +  private lazy val partitionReaderFactory = readSupport.createReaderFactory(scanConfig)
     
    -  private lazy val inputRDD: RDD[InternalRow] = reader match {
    -    case _: ContinuousReader =>
    +  private lazy val inputRDD: RDD[InternalRow] = readSupport match {
    +    case _: ContinuousReadSupport =>
    +      assert(!partitionReaderFactory.supportColumnarReads(),
    --- End diff --
    
    I think the answer is no. We may have columnar only data source, so I don't think it's a good idea to let Spark decide the scan mode.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    **[Test build #94448 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94448/testReport)** for PR 22009 at commit [`c4b5469`](https://github.com/apache/spark/commit/c4b5469bc12ac77906f4f4b7cf94fd355ba4b7be).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22009#discussion_r208338263
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/StreamingWriteSupportProvider.java ---
    @@ -29,24 +28,24 @@
      * provide data writing ability for structured streaming.
      */
     @InterfaceStability.Evolving
    -public interface StreamWriteSupport extends DataSourceV2, BaseStreamingSink {
    +public interface StreamingWriteSupportProvider extends DataSourceV2, BaseStreamingSink {
     
    -    /**
    -     * Creates an optional {@link StreamWriter} to save the data to this data source. Data
    -     * sources can return None if there is no writing needed to be done.
    -     *
    -     * @param queryId A unique string for the writing query. It's possible that there are many
    -     *                writing queries running at the same time, and the returned
    -     *                {@link DataSourceWriter} can use this id to distinguish itself from others.
    -     * @param schema the schema of the data to be written.
    -     * @param mode the output mode which determines what successive epoch output means to this
    -     *             sink, please refer to {@link OutputMode} for more details.
    -     * @param options the options for the returned data source writer, which is an immutable
    -     *                case-insensitive string-to-string map.
    -     */
    -    StreamWriter createStreamWriter(
    -        String queryId,
    -        StructType schema,
    -        OutputMode mode,
    -        DataSourceOptions options);
    +  /**
    +   * Creates an optional {@link StreamingWriteSupport} to save the data to this data source. Data
    +   * sources can return None if there is no writing needed to be done.
    +   *
    +   * @param queryId A unique string for the writing query. It's possible that there are many
    +   *                writing queries running at the same time, and the returned
    +   *                {@link StreamingWriteSupport} can use this id to distinguish itself from others.
    +   * @param schema the schema of the data to be written.
    +   * @param mode the output mode which determines what successive epoch output means to this
    +   *             sink, please refer to {@link OutputMode} for more details.
    +   * @param options the options for the returned data source writer, which is an immutable
    +   *                case-insensitive string-to-string map.
    +   */
    +  StreamingWriteSupport createStreamingWritSupport(
    +    String queryId,
    --- End diff --
    
    This should also remove the query ID (equivalent to job ID?).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22009#discussion_r209996219
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala ---
    @@ -76,41 +76,43 @@ object DataSourceV2Strategy extends Strategy {
       /**
        * Applies column pruning to the data source, w.r.t. the references of the given expressions.
        *
    -   * @return new output attributes after column pruning.
    +   * @return the created `ScanConfig`(since column pruning is the last step of operator pushdown),
    +   *         and new output attributes after column pruning.
    --- End diff --
    
    Okay, I see what you mean that we can't currently push down everything that is done by a projection. I'm not sure how relevant that point is, though. The implementation is still allowed to project more columns than Spark requests based on the other operations that are pushed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22009#discussion_r208098178
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala ---
    @@ -132,35 +134,15 @@ class MemoryV2CustomMetrics(sink: MemorySinkV2) extends CustomMetrics {
       override def json(): String = Serialization.write(Map("numRows" -> sink.numRows))
     }
     
    -class MemoryWriter(sink: MemorySinkV2, batchId: Long, outputMode: OutputMode, schema: StructType)
    --- End diff --
    
    this is actually a batch writer not micro-batch, and is only used in the test. For writer API, micro-batch and continuous share the same interface, so we only need one streaming write implementation.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22009#discussion_r208371927
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousReadSupport.java ---
    @@ -0,0 +1,72 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.sources.v2.reader.streaming;
    +
    +import org.apache.spark.annotation.InterfaceStability;
    +import org.apache.spark.sql.execution.streaming.BaseStreamingSource;
    +import org.apache.spark.sql.sources.v2.reader.InputPartition;
    +import org.apache.spark.sql.sources.v2.reader.ScanConfig;
    +import org.apache.spark.sql.sources.v2.reader.ScanConfigBuilder;
    +
    +/**
    + * An interface which defines how to scan the data from data source for streaming processing with
    + * continuous mode.
    + */
    +@InterfaceStability.Evolving
    +public interface ContinuousReadSupport extends StreamingReadSupport, BaseStreamingSource {
    +
    +  /**
    +   * Returns a builder of {@link ScanConfig}. The builder can take some query specific information
    +   * like which operators to pushdown, streaming offsets, etc., and keep these information in the
    +   * created {@link ScanConfig}.
    +   *
    +   * This is the first step of the data scan. All other methods in {@link ContinuousReadSupport}
    +   * needs to take {@link ScanConfig} as an input.
    +   *
    +   * If this method fails (by throwing an exception), the action will fail and no Spark job will be
    +   * submitted.
    +   */
    +  ScanConfigBuilder newScanConfigBuilder(Offset start);
    +
    +  /**
    +   * Returns a factory to produce {@link ContinuousPartitionReader}s for {@link InputPartition}s.
    +   *
    +   * If this method fails (by throwing an exception), the action will fail and no Spark job will be
    +   * submitted.
    +   */
    +  @Override
    +  ContinuousPartitionReaderFactory createReaderFactory(ScanConfig config);
    --- End diff --
    
    Shouldn't this be `createContinuousReaderFactory`? If the method is the same across `BatchReadSupport`, `MicroBatchReadSupport`, and `ContinuousReadSupport`, then implementing both batch and continuous would require a factory that always returns both continuous and batch readers. Separate methods would allow each implementation to use a base class and add continuous or micro-batch support to different classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22009: [SPARK-24882][SQL] improve data source v2 API

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22009
  
    **[Test build #94823 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94823/testReport)** for PR 22009 at commit [`2018981`](https://github.com/apache/spark/commit/2018981f1261dc8c0b63572be78410b3274d0f4a).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org