You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Shubham Chaurasia <sh...@gmail.com> on 2018/10/09 06:31:25 UTC

DataSourceV2 APIs creating multiple instances of DataSourceReader and hence not preserving the state

Hi All,

--Spark built with *tags/v2.4.0-rc2*

Consider following DataSourceReader implementation:

public class MyDataSourceReader implements DataSourceReader,
SupportsScanColumnarBatch {

  StructType schema = null;
  Map<String, String> options;

  public MyDataSourceReader(Map<String, String> options) {
    System.out.println("MyDataSourceReader.MyDataSourceReader:
Instantiated...." + this);
    this.options = options;
  }

  @Override
  public List<InputPartition<ColumnarBatch>> planBatchInputPartitions() {
    //variable this.schema is null here since readSchema() was called
on a different instance
    System.out.println("MyDataSourceReader.planBatchInputPartitions: "
+ this + " schema: " + this.schema);
    //more logic......
    return null;
  }

  @Override
  public StructType readSchema() {
    //some logic to discover schema
    this.schema = (new StructType())
        .add("col1", "int")
        .add("col2", "string");
    System.out.println("MyDataSourceReader.readSchema: " + this + "
schema: " + this.schema);
    return this.schema;
  }
}

1) First readSchema() is called on MyDataSourceReader@instance1 which
sets class variable schema.
2) Now when planBatchInputPartitions() is called, it is being called
on a different instance of MyDataSourceReader and hence I am not
getting the value of schema in method planBatchInputPartitions().

How can I get value of schema which was set in readSchema() method, in
planBatchInputPartitions() method?

Console Logs:

scala> mysource.executeQuery("select * from movie").show

MyDataSourceReader.MyDataSourceReader:
Instantiated....MyDataSourceReader@59ea8f1b
MyDataSourceReader.readSchema: MyDataSourceReader@59ea8f1b schema:
StructType(StructField(col1,IntegerType,true),
StructField(col2,StringType,true))
MyDataSourceReader.MyDataSourceReader:
Instantiated....MyDataSourceReader@a3cd3ff
MyDataSourceReader.planBatchInputPartitions:
MyDataSourceReader@a3cd3ff schema: null


Thanks,
Shubham

Re: DataSourceV2 APIs creating multiple instances of DataSourceReader and hence not preserving the state

Posted by Ryan Blue <rb...@netflix.com.INVALID>.
I think this is expected behavior, though not what I think is reasonable in
the long term. To my knowledge, this is how the v1 sources behave, and v2
just reuses the same mechanism to instantiate sources and uses a new
interface for v2 features.

I think that the right approach is to use catalogs, which I've proposed in
#21306 <https://github.com/apache/spark/pull/21306>. A catalog would be
loaded by reflection just once and then configured. After that, the same
instance for a given Spark SQL session would be reused.

Because the catalog instantiates table instances that expose read and write
capabilities (ReadSupport, WriteSupport), it can choose how to manage the
life-cycle of those tables and can also cache instances to control how
table state changes after a table is loaded. (Iceberg does this to use a
fixed snapshot for all reads until the table is written to or is garbage
collected.)

rb

On Tue, Oct 9, 2018 at 8:30 PM Hyukjin Kwon <gu...@gmail.com> wrote:

> I took a look for the codes.
>
> val source = classOf[MyDataSource].getCanonicalName
> spark.read.format(source).load().collect()
>
> Looks indeed it calls twice.
>
> First all: Looks it creates it first to read the schema for a logical plan
>
> test.org.apache.spark.sql.sources.v2.MyDataSourceReader.<init>(MyDataSourceReader.java:36)
> test.org.apache.spark.sql.sources.v2.MyDataSource.createReader(MyDataSource.java:35)
> org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation$SourceHelpers.createReader(DataSourceV2Relation.scala:155)
> org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation$.create(DataSourceV2Relation.scala:172)
> org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:204)
> org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:167)
>
> Second call: it creates another for its actual partitions in a physcal plan
>
> test.org.apache.spark.sql.sources.v2.MyDataSourceReader.<init>(MyDataSourceReader.java:36)
> test.org.apache.spark.sql.sources.v2.MyDataSource.createReader(MyDataSource.java:35)
> org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation$SourceHelpers.createReader(DataSourceV2Relation.scala:155)
> org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation.newReader(DataSourceV2Relation.scala:61)
> org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy$.apply(DataSourceV2Strategy.scala:103)
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:63)
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:63)
> scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
> scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
> scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
> org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:78)
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:75)
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> scala.collection.Iterator$class.foreach(Iterator.scala:891)
> scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
> scala.collection.AbstractIterator.foldLeft(Iterator.scala:1334)
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:75)
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:67)
> scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
> scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
> org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
> org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:72)
> org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:68)
> org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:77)
> org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:77)
> org.apache.spark.sql.Dataset.withAction(Dataset.scala:3360)
> org.apache.spark.sql.Dataset.collect(Dataset.scala:2783)
>
>
> Skimming the API doc at DataSourceReader at branch-2.4, I haven’t found
> the guarantee that the readers are created only once. If that’s documented
> somewhere, we should fix it in 2.4.0. If not, I think it fine since both
> calls are in driver side and it’s something able to work around for
> instance static class or thread local in this case.
>
> Forwarding to dev mailing list in case that this is something we haven't
> foreseen.
>
> 2018년 10월 9일 (화) 오후 9:39, Shubham Chaurasia <sh...@gmail.com>님이
> 작성:
>
>> Alright, so it is a big project which uses a SQL store underneath.
>> I extracted out the minimal code and made a smaller project out of it and
>> still it is creating multiple instances.
>>
>> Here is my project:
>>
>> ├── my-datasource.iml
>> ├── pom.xml
>> ├── src
>> │   ├── main
>> │   │   ├── java
>> │   │   │   └── com
>> │   │   │       └── shubham
>> │   │   │           ├── MyDataSource.java
>> │   │   │           └── reader
>> │   │   │               └── MyDataSourceReader.java
>>
>>
>> MyDataSource.java
>> -------------------------------------------------
>>
>> package com.shubham;
>>
>> import com.shubham.reader.MyDataSourceReader;
>> import org.apache.spark.sql.SaveMode;
>> import org.apache.spark.sql.sources.v2.DataSourceOptions;
>> import org.apache.spark.sql.sources.v2.DataSourceV2;
>> import org.apache.spark.sql.sources.v2.ReadSupport;
>> import org.apache.spark.sql.sources.v2.WriteSupport;
>> import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
>> import org.apache.spark.sql.sources.v2.writer.DataSourceWriter;
>> import org.apache.spark.sql.types.StructType;
>>
>> import java.util.Optional;
>>
>> public class MyDataSource implements DataSourceV2, ReadSupport, WriteSupport {
>>
>>   public DataSourceReader createReader(DataSourceOptions options) {
>>     System.out.println("MyDataSource.createReader: Going to create a new MyDataSourceReader");
>>     return new MyDataSourceReader(options.asMap());
>>   }
>>
>>   public Optional<DataSourceWriter> createWriter(String writeUUID, StructType schema, SaveMode mode, DataSourceOptions options) {
>>     return Optional.empty();
>>   }
>> }
>>
>>
>> MyDataSourceReader.java
>> -------------------------------------------------
>>
>> package com.shubham.reader;
>>
>> import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
>> import org.apache.spark.sql.sources.v2.reader.InputPartition;
>> import org.apache.spark.sql.sources.v2.reader.SupportsScanColumnarBatch;
>> import org.apache.spark.sql.types.StructType;
>> import org.apache.spark.sql.vectorized.ColumnarBatch;
>>
>> import java.util.ArrayList;
>> import java.util.List;
>> import java.util.Map;
>>
>> public class MyDataSourceReader implements DataSourceReader, SupportsScanColumnarBatch {
>>
>>   private Map<String, String> options;
>>   private StructType schema;
>>
>>   public MyDataSourceReader(Map<String, String> options) {
>>     System.out.println("MyDataSourceReader.MyDataSourceReader: Instantiated...." + this);
>>     this.options = options;
>>   }
>>
>>   @Override
>>   public StructType readSchema() {
>>     this.schema = (new StructType())
>>         .add("col1", "int")
>>         .add("col2", "string");
>>     System.out.println("MyDataSourceReader.readSchema: " + this + " schema: " + this.schema);
>>     return this.schema;
>>   }
>>
>>   @Override
>>   public List<InputPartition<ColumnarBatch>> planBatchInputPartitions() {
>>     System.out.println("MyDataSourceReader.planBatchInputPartitions: " + this + " schema: " + this.schema);
>>     return new ArrayList<>();
>>   }
>> }
>>
>>
>> ----------------------------------------
>> spark-shell output
>> ----------------------------------------
>> scala> spark.read.format("com.shubham.MyDataSource").option("query",
>> "select * from some_table").load.show
>>
>> MyDataSource.createReader: Going to create a new MyDataSourceReader
>> MyDataSourceReader.MyDataSourceReader:
>> Instantiated....com.shubham.reader.MyDataSourceReader@69fa5536
>> MyDataSourceReader.readSchema:
>> com.shubham.reader.MyDataSourceReader@69fa5536 schema:
>> StructType(StructField(col1,IntegerType,true),
>> StructField(col2,StringType,true))
>> MyDataSource.createReader: Going to create a new MyDataSourceReader
>> MyDataSourceReader.MyDataSourceReader:
>> Instantiated....com.shubham.reader.MyDataSourceReader@3095c449
>> MyDataSourceReader.planBatchInputPartitions:
>> com.shubham.reader.MyDataSourceReader@3095c449 schema: null
>> +----+----+
>> |col1|col2|
>> +----+----+
>> +----+----+
>>
>>
>> Here 2 instances of reader, MyDataSourceReader@69fa5536 and
>> MyDataSourceReader@3095c449 are being created. Consequently schema is
>> null in MyDataSourceReader@3095c449.
>>
>> Am I not doing it the correct way?
>>
>> Thanks,
>> Shubham
>>
>> On Tue, Oct 9, 2018 at 4:43 PM Mendelson, Assaf <As...@rsa.com>
>> wrote:
>>
>>> I am using v2.4.0-RC2
>>>
>>>
>>>
>>> The code as is wouldn’t run (e.g. planBatchInputPartitions returns
>>> null). How are you calling it?
>>>
>>>
>>>
>>> When I do:
>>>
>>> Val df = spark.read.format(mypackage).load().show()
>>>
>>> I am getting a single creation, how are you creating the reader?
>>>
>>>
>>>
>>> Thanks,
>>>
>>>         Assaf
>>>
>>>
>>>
>>> *From:* Shubham Chaurasia [mailto:shubh.chaurasia@gmail.com]
>>> *Sent:* Tuesday, October 9, 2018 2:02 PM
>>> *To:* Mendelson, Assaf; user@spark.apache.org
>>> *Subject:* Re: DataSourceV2 APIs creating multiple instances of
>>> DataSourceReader and hence not preserving the state
>>>
>>>
>>>
>>> [EXTERNAL EMAIL]
>>> Please report any suspicious attachments, links, or requests for
>>> sensitive information.
>>>
>>> Thanks Assaf, you tried with *tags/v2.4.0-rc2?*
>>>
>>>
>>>
>>> Full Code:
>>>
>>>
>>>
>>> MyDataSource is the entry point which simply creates Reader and Writer
>>>
>>>
>>>
>>> public class MyDataSource implements DataSourceV2, WriteSupport,
>>> ReadSupport, SessionConfigSupport {
>>>
>>>
>>>
>>>   @Override public DataSourceReader createReader(DataSourceOptions
>>> options) {
>>>
>>>     return new MyDataSourceReader(options.asMap());
>>>
>>>   }
>>>
>>>
>>>
>>>   @Override
>>>
>>>   public Optional<DataSourceWriter> createWriter(String jobId,
>>> StructType schema,
>>>
>>>       SaveMode mode, DataSourceOptions options) {
>>>
>>>     // creates a dataSourcewriter here..
>>>
>>>     return Optional.of(dataSourcewriter);
>>>
>>>   }
>>>
>>>
>>>
>>>   @Override public String keyPrefix() {
>>>
>>>     return "myprefix";
>>>
>>>   }
>>>
>>>
>>>
>>> }
>>>
>>>
>>>
>>> public class MyDataSourceReader implements DataSourceReader,
>>> SupportsScanColumnarBatch {
>>>
>>>
>>>
>>>   StructType schema = null;
>>>
>>>   Map<String, String> options;
>>>
>>>
>>>
>>>   public MyDataSourceReader(Map<String, String> options) {
>>>
>>>     System.out.println("MyDataSourceReader.MyDataSourceReader:
>>> Instantiated...." + this);
>>>
>>>     this.options = options;
>>>
>>>   }
>>>
>>>
>>>
>>>   @Override
>>>
>>>   public List<InputPartition<ColumnarBatch>> planBatchInputPartitions() {
>>>
>>>     //variable this.schema is null here since readSchema() was called on
>>> a different instance
>>>
>>>     System.out.println("MyDataSourceReader.planBatchInputPartitions: " +
>>> this + " schema: " + this.schema);
>>>
>>>     //more logic......
>>>
>>>     return null;
>>>
>>>   }
>>>
>>>
>>>
>>>   @Override
>>>
>>>   public StructType readSchema() {
>>>
>>>     //some logic to discover schema
>>>
>>>     this.schema = (new StructType())
>>>
>>>         .add("col1", "int")
>>>
>>>         .add("col2", "string");
>>>
>>>     System.out.println("MyDataSourceReader.readSchema: " + this + "
>>> schema: " + this.schema);
>>>
>>>     return this.schema;
>>>
>>>   }
>>>
>>> }
>>>
>>>
>>>
>>> Thanks,
>>>
>>> Shubham
>>>
>>>
>>>
>>> On Tue, Oct 9, 2018 at 3:59 PM Mendelson, Assaf <As...@rsa.com>
>>> wrote:
>>>
>>> Could you add a fuller code example? I tried to reproduce it in my
>>> environment and I am getting just one instance of the reader…
>>>
>>>
>>>
>>> Thanks,
>>>
>>>         Assaf
>>>
>>>
>>>
>>> *From:* Shubham Chaurasia [mailto:shubh.chaurasia@gmail.com]
>>> *Sent:* Tuesday, October 9, 2018 9:31 AM
>>> *To:* user@spark.apache.org
>>> *Subject:* DataSourceV2 APIs creating multiple instances of
>>> DataSourceReader and hence not preserving the state
>>>
>>>
>>>
>>> [EXTERNAL EMAIL]
>>> Please report any suspicious attachments, links, or requests for
>>> sensitive information.
>>>
>>> Hi All,
>>>
>>>
>>>
>>> --Spark built with *tags/v2.4.0-rc2*
>>>
>>>
>>>
>>> Consider following DataSourceReader implementation:
>>>
>>>
>>>
>>> *public class *MyDataSourceReader *implements *DataSourceReader, SupportsScanColumnarBatch {
>>>
>>>   StructType *schema *= *null*;
>>>   Map<String, String> *options*;
>>>
>>>   *public *MyDataSourceReader(Map<String, String> options) {
>>>     System.*out*.println(*"MyDataSourceReader.MyDataSourceReader: Instantiated...." *+ *this*);
>>>     *this*.*options *= options;
>>>   }
>>>
>>>   @Override
>>>   *public *List<InputPartition<ColumnarBatch>> planBatchInputPartitions() {
>>>
>>> *//variable this.schema is null here since readSchema() was called on a different instance    *System.*out*.println(*"MyDataSourceReader.planBatchInputPartitions: " *+ *this *+ *" schema: " *+ *this*.*schema*);
>>>
>>> *//more logic......    **return null*;
>>>   }
>>>
>>>   @Override
>>>   *public *StructType readSchema() {
>>>
>>> *//some logic to discover schema    **this*.*schema *= (*new *StructType())
>>>         .add(*"col1"*, *"int"*)
>>>         .add(*"col2"*, *"string"*);
>>>     System.*out*.println(*"MyDataSourceReader.readSchema: " *+ *this *+ *" schema: " *+ *this*.*schema*);
>>>     *return this*.*schema*;
>>>   }
>>> }
>>>
>>> 1) First readSchema() is called on MyDataSourceReader@instance1 which sets class variable schema.
>>>
>>> 2) Now when planBatchInputPartitions() is called, it is being called on a different instance of MyDataSourceReader and hence I am not getting the value of schema in method planBatchInputPartitions().
>>>
>>>
>>>
>>> How can I get value of schema which was set in readSchema() method, in planBatchInputPartitions() method?
>>>
>>>
>>>
>>> Console Logs:
>>>
>>>
>>>
>>> scala> mysource.executeQuery("select * from movie").show
>>>
>>>
>>>
>>> MyDataSourceReader.MyDataSourceReader: Instantiated....MyDataSourceReader@59ea8f1b
>>>
>>> MyDataSourceReader.readSchema: MyDataSourceReader@59ea8f1b schema: StructType(StructField(col1,IntegerType,true), StructField(col2,StringType,true))
>>>
>>> MyDataSourceReader.MyDataSourceReader: Instantiated....MyDataSourceReader@a3cd3ff
>>>
>>> MyDataSourceReader.planBatchInputPartitions: MyDataSourceReader@a3cd3ff schema: null
>>>
>>>
>>>
>>> Thanks,
>>>
>>> Shubham
>>>
>>>
>>>
>>>

-- 
Ryan Blue
Software Engineer
Netflix

Re: DataSourceV2 APIs creating multiple instances of DataSourceReader and hence not preserving the state

Posted by Ryan Blue <rb...@netflix.com.INVALID>.
I think this is expected behavior, though not what I think is reasonable in
the long term. To my knowledge, this is how the v1 sources behave, and v2
just reuses the same mechanism to instantiate sources and uses a new
interface for v2 features.

I think that the right approach is to use catalogs, which I've proposed in
#21306 <https://github.com/apache/spark/pull/21306>. A catalog would be
loaded by reflection just once and then configured. After that, the same
instance for a given Spark SQL session would be reused.

Because the catalog instantiates table instances that expose read and write
capabilities (ReadSupport, WriteSupport), it can choose how to manage the
life-cycle of those tables and can also cache instances to control how
table state changes after a table is loaded. (Iceberg does this to use a
fixed snapshot for all reads until the table is written to or is garbage
collected.)

rb

On Tue, Oct 9, 2018 at 8:30 PM Hyukjin Kwon <gu...@gmail.com> wrote:

> I took a look for the codes.
>
> val source = classOf[MyDataSource].getCanonicalName
> spark.read.format(source).load().collect()
>
> Looks indeed it calls twice.
>
> First all: Looks it creates it first to read the schema for a logical plan
>
> test.org.apache.spark.sql.sources.v2.MyDataSourceReader.<init>(MyDataSourceReader.java:36)
> test.org.apache.spark.sql.sources.v2.MyDataSource.createReader(MyDataSource.java:35)
> org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation$SourceHelpers.createReader(DataSourceV2Relation.scala:155)
> org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation$.create(DataSourceV2Relation.scala:172)
> org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:204)
> org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:167)
>
> Second call: it creates another for its actual partitions in a physcal plan
>
> test.org.apache.spark.sql.sources.v2.MyDataSourceReader.<init>(MyDataSourceReader.java:36)
> test.org.apache.spark.sql.sources.v2.MyDataSource.createReader(MyDataSource.java:35)
> org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation$SourceHelpers.createReader(DataSourceV2Relation.scala:155)
> org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation.newReader(DataSourceV2Relation.scala:61)
> org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy$.apply(DataSourceV2Strategy.scala:103)
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:63)
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:63)
> scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
> scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
> scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
> org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:78)
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:75)
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> scala.collection.Iterator$class.foreach(Iterator.scala:891)
> scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
> scala.collection.AbstractIterator.foldLeft(Iterator.scala:1334)
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:75)
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:67)
> scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
> scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
> org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
> org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:72)
> org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:68)
> org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:77)
> org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:77)
> org.apache.spark.sql.Dataset.withAction(Dataset.scala:3360)
> org.apache.spark.sql.Dataset.collect(Dataset.scala:2783)
>
>
> Skimming the API doc at DataSourceReader at branch-2.4, I haven’t found
> the guarantee that the readers are created only once. If that’s documented
> somewhere, we should fix it in 2.4.0. If not, I think it fine since both
> calls are in driver side and it’s something able to work around for
> instance static class or thread local in this case.
>
> Forwarding to dev mailing list in case that this is something we haven't
> foreseen.
>
> 2018년 10월 9일 (화) 오후 9:39, Shubham Chaurasia <sh...@gmail.com>님이
> 작성:
>
>> Alright, so it is a big project which uses a SQL store underneath.
>> I extracted out the minimal code and made a smaller project out of it and
>> still it is creating multiple instances.
>>
>> Here is my project:
>>
>> ├── my-datasource.iml
>> ├── pom.xml
>> ├── src
>> │   ├── main
>> │   │   ├── java
>> │   │   │   └── com
>> │   │   │       └── shubham
>> │   │   │           ├── MyDataSource.java
>> │   │   │           └── reader
>> │   │   │               └── MyDataSourceReader.java
>>
>>
>> MyDataSource.java
>> -------------------------------------------------
>>
>> package com.shubham;
>>
>> import com.shubham.reader.MyDataSourceReader;
>> import org.apache.spark.sql.SaveMode;
>> import org.apache.spark.sql.sources.v2.DataSourceOptions;
>> import org.apache.spark.sql.sources.v2.DataSourceV2;
>> import org.apache.spark.sql.sources.v2.ReadSupport;
>> import org.apache.spark.sql.sources.v2.WriteSupport;
>> import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
>> import org.apache.spark.sql.sources.v2.writer.DataSourceWriter;
>> import org.apache.spark.sql.types.StructType;
>>
>> import java.util.Optional;
>>
>> public class MyDataSource implements DataSourceV2, ReadSupport, WriteSupport {
>>
>>   public DataSourceReader createReader(DataSourceOptions options) {
>>     System.out.println("MyDataSource.createReader: Going to create a new MyDataSourceReader");
>>     return new MyDataSourceReader(options.asMap());
>>   }
>>
>>   public Optional<DataSourceWriter> createWriter(String writeUUID, StructType schema, SaveMode mode, DataSourceOptions options) {
>>     return Optional.empty();
>>   }
>> }
>>
>>
>> MyDataSourceReader.java
>> -------------------------------------------------
>>
>> package com.shubham.reader;
>>
>> import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
>> import org.apache.spark.sql.sources.v2.reader.InputPartition;
>> import org.apache.spark.sql.sources.v2.reader.SupportsScanColumnarBatch;
>> import org.apache.spark.sql.types.StructType;
>> import org.apache.spark.sql.vectorized.ColumnarBatch;
>>
>> import java.util.ArrayList;
>> import java.util.List;
>> import java.util.Map;
>>
>> public class MyDataSourceReader implements DataSourceReader, SupportsScanColumnarBatch {
>>
>>   private Map<String, String> options;
>>   private StructType schema;
>>
>>   public MyDataSourceReader(Map<String, String> options) {
>>     System.out.println("MyDataSourceReader.MyDataSourceReader: Instantiated...." + this);
>>     this.options = options;
>>   }
>>
>>   @Override
>>   public StructType readSchema() {
>>     this.schema = (new StructType())
>>         .add("col1", "int")
>>         .add("col2", "string");
>>     System.out.println("MyDataSourceReader.readSchema: " + this + " schema: " + this.schema);
>>     return this.schema;
>>   }
>>
>>   @Override
>>   public List<InputPartition<ColumnarBatch>> planBatchInputPartitions() {
>>     System.out.println("MyDataSourceReader.planBatchInputPartitions: " + this + " schema: " + this.schema);
>>     return new ArrayList<>();
>>   }
>> }
>>
>>
>> ----------------------------------------
>> spark-shell output
>> ----------------------------------------
>> scala> spark.read.format("com.shubham.MyDataSource").option("query",
>> "select * from some_table").load.show
>>
>> MyDataSource.createReader: Going to create a new MyDataSourceReader
>> MyDataSourceReader.MyDataSourceReader:
>> Instantiated....com.shubham.reader.MyDataSourceReader@69fa5536
>> MyDataSourceReader.readSchema:
>> com.shubham.reader.MyDataSourceReader@69fa5536 schema:
>> StructType(StructField(col1,IntegerType,true),
>> StructField(col2,StringType,true))
>> MyDataSource.createReader: Going to create a new MyDataSourceReader
>> MyDataSourceReader.MyDataSourceReader:
>> Instantiated....com.shubham.reader.MyDataSourceReader@3095c449
>> MyDataSourceReader.planBatchInputPartitions:
>> com.shubham.reader.MyDataSourceReader@3095c449 schema: null
>> +----+----+
>> |col1|col2|
>> +----+----+
>> +----+----+
>>
>>
>> Here 2 instances of reader, MyDataSourceReader@69fa5536 and
>> MyDataSourceReader@3095c449 are being created. Consequently schema is
>> null in MyDataSourceReader@3095c449.
>>
>> Am I not doing it the correct way?
>>
>> Thanks,
>> Shubham
>>
>> On Tue, Oct 9, 2018 at 4:43 PM Mendelson, Assaf <As...@rsa.com>
>> wrote:
>>
>>> I am using v2.4.0-RC2
>>>
>>>
>>>
>>> The code as is wouldn’t run (e.g. planBatchInputPartitions returns
>>> null). How are you calling it?
>>>
>>>
>>>
>>> When I do:
>>>
>>> Val df = spark.read.format(mypackage).load().show()
>>>
>>> I am getting a single creation, how are you creating the reader?
>>>
>>>
>>>
>>> Thanks,
>>>
>>>         Assaf
>>>
>>>
>>>
>>> *From:* Shubham Chaurasia [mailto:shubh.chaurasia@gmail.com]
>>> *Sent:* Tuesday, October 9, 2018 2:02 PM
>>> *To:* Mendelson, Assaf; user@spark.apache.org
>>> *Subject:* Re: DataSourceV2 APIs creating multiple instances of
>>> DataSourceReader and hence not preserving the state
>>>
>>>
>>>
>>> [EXTERNAL EMAIL]
>>> Please report any suspicious attachments, links, or requests for
>>> sensitive information.
>>>
>>> Thanks Assaf, you tried with *tags/v2.4.0-rc2?*
>>>
>>>
>>>
>>> Full Code:
>>>
>>>
>>>
>>> MyDataSource is the entry point which simply creates Reader and Writer
>>>
>>>
>>>
>>> public class MyDataSource implements DataSourceV2, WriteSupport,
>>> ReadSupport, SessionConfigSupport {
>>>
>>>
>>>
>>>   @Override public DataSourceReader createReader(DataSourceOptions
>>> options) {
>>>
>>>     return new MyDataSourceReader(options.asMap());
>>>
>>>   }
>>>
>>>
>>>
>>>   @Override
>>>
>>>   public Optional<DataSourceWriter> createWriter(String jobId,
>>> StructType schema,
>>>
>>>       SaveMode mode, DataSourceOptions options) {
>>>
>>>     // creates a dataSourcewriter here..
>>>
>>>     return Optional.of(dataSourcewriter);
>>>
>>>   }
>>>
>>>
>>>
>>>   @Override public String keyPrefix() {
>>>
>>>     return "myprefix";
>>>
>>>   }
>>>
>>>
>>>
>>> }
>>>
>>>
>>>
>>> public class MyDataSourceReader implements DataSourceReader,
>>> SupportsScanColumnarBatch {
>>>
>>>
>>>
>>>   StructType schema = null;
>>>
>>>   Map<String, String> options;
>>>
>>>
>>>
>>>   public MyDataSourceReader(Map<String, String> options) {
>>>
>>>     System.out.println("MyDataSourceReader.MyDataSourceReader:
>>> Instantiated...." + this);
>>>
>>>     this.options = options;
>>>
>>>   }
>>>
>>>
>>>
>>>   @Override
>>>
>>>   public List<InputPartition<ColumnarBatch>> planBatchInputPartitions() {
>>>
>>>     //variable this.schema is null here since readSchema() was called on
>>> a different instance
>>>
>>>     System.out.println("MyDataSourceReader.planBatchInputPartitions: " +
>>> this + " schema: " + this.schema);
>>>
>>>     //more logic......
>>>
>>>     return null;
>>>
>>>   }
>>>
>>>
>>>
>>>   @Override
>>>
>>>   public StructType readSchema() {
>>>
>>>     //some logic to discover schema
>>>
>>>     this.schema = (new StructType())
>>>
>>>         .add("col1", "int")
>>>
>>>         .add("col2", "string");
>>>
>>>     System.out.println("MyDataSourceReader.readSchema: " + this + "
>>> schema: " + this.schema);
>>>
>>>     return this.schema;
>>>
>>>   }
>>>
>>> }
>>>
>>>
>>>
>>> Thanks,
>>>
>>> Shubham
>>>
>>>
>>>
>>> On Tue, Oct 9, 2018 at 3:59 PM Mendelson, Assaf <As...@rsa.com>
>>> wrote:
>>>
>>> Could you add a fuller code example? I tried to reproduce it in my
>>> environment and I am getting just one instance of the reader…
>>>
>>>
>>>
>>> Thanks,
>>>
>>>         Assaf
>>>
>>>
>>>
>>> *From:* Shubham Chaurasia [mailto:shubh.chaurasia@gmail.com]
>>> *Sent:* Tuesday, October 9, 2018 9:31 AM
>>> *To:* user@spark.apache.org
>>> *Subject:* DataSourceV2 APIs creating multiple instances of
>>> DataSourceReader and hence not preserving the state
>>>
>>>
>>>
>>> [EXTERNAL EMAIL]
>>> Please report any suspicious attachments, links, or requests for
>>> sensitive information.
>>>
>>> Hi All,
>>>
>>>
>>>
>>> --Spark built with *tags/v2.4.0-rc2*
>>>
>>>
>>>
>>> Consider following DataSourceReader implementation:
>>>
>>>
>>>
>>> *public class *MyDataSourceReader *implements *DataSourceReader, SupportsScanColumnarBatch {
>>>
>>>   StructType *schema *= *null*;
>>>   Map<String, String> *options*;
>>>
>>>   *public *MyDataSourceReader(Map<String, String> options) {
>>>     System.*out*.println(*"MyDataSourceReader.MyDataSourceReader: Instantiated...." *+ *this*);
>>>     *this*.*options *= options;
>>>   }
>>>
>>>   @Override
>>>   *public *List<InputPartition<ColumnarBatch>> planBatchInputPartitions() {
>>>
>>> *//variable this.schema is null here since readSchema() was called on a different instance    *System.*out*.println(*"MyDataSourceReader.planBatchInputPartitions: " *+ *this *+ *" schema: " *+ *this*.*schema*);
>>>
>>> *//more logic......    **return null*;
>>>   }
>>>
>>>   @Override
>>>   *public *StructType readSchema() {
>>>
>>> *//some logic to discover schema    **this*.*schema *= (*new *StructType())
>>>         .add(*"col1"*, *"int"*)
>>>         .add(*"col2"*, *"string"*);
>>>     System.*out*.println(*"MyDataSourceReader.readSchema: " *+ *this *+ *" schema: " *+ *this*.*schema*);
>>>     *return this*.*schema*;
>>>   }
>>> }
>>>
>>> 1) First readSchema() is called on MyDataSourceReader@instance1 which sets class variable schema.
>>>
>>> 2) Now when planBatchInputPartitions() is called, it is being called on a different instance of MyDataSourceReader and hence I am not getting the value of schema in method planBatchInputPartitions().
>>>
>>>
>>>
>>> How can I get value of schema which was set in readSchema() method, in planBatchInputPartitions() method?
>>>
>>>
>>>
>>> Console Logs:
>>>
>>>
>>>
>>> scala> mysource.executeQuery("select * from movie").show
>>>
>>>
>>>
>>> MyDataSourceReader.MyDataSourceReader: Instantiated....MyDataSourceReader@59ea8f1b
>>>
>>> MyDataSourceReader.readSchema: MyDataSourceReader@59ea8f1b schema: StructType(StructField(col1,IntegerType,true), StructField(col2,StringType,true))
>>>
>>> MyDataSourceReader.MyDataSourceReader: Instantiated....MyDataSourceReader@a3cd3ff
>>>
>>> MyDataSourceReader.planBatchInputPartitions: MyDataSourceReader@a3cd3ff schema: null
>>>
>>>
>>>
>>> Thanks,
>>>
>>> Shubham
>>>
>>>
>>>
>>>

-- 
Ryan Blue
Software Engineer
Netflix

Re: DataSourceV2 APIs creating multiple instances of DataSourceReader and hence not preserving the state

Posted by Hyukjin Kwon <gu...@gmail.com>.
I took a look for the codes.

val source = classOf[MyDataSource].getCanonicalName
spark.read.format(source).load().collect()

Looks indeed it calls twice.

First all: Looks it creates it first to read the schema for a logical plan

test.org.apache.spark.sql.sources.v2.MyDataSourceReader.<init>(MyDataSourceReader.java:36)
test.org.apache.spark.sql.sources.v2.MyDataSource.createReader(MyDataSource.java:35)
org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation$SourceHelpers.createReader(DataSourceV2Relation.scala:155)
org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation$.create(DataSourceV2Relation.scala:172)
org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:204)
org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:167)

Second call: it creates another for its actual partitions in a physcal plan

test.org.apache.spark.sql.sources.v2.MyDataSourceReader.<init>(MyDataSourceReader.java:36)
test.org.apache.spark.sql.sources.v2.MyDataSource.createReader(MyDataSource.java:35)
org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation$SourceHelpers.createReader(DataSourceV2Relation.scala:155)
org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation.newReader(DataSourceV2Relation.scala:61)
org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy$.apply(DataSourceV2Strategy.scala:103)
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:63)
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:63)
scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:78)
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:75)
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
scala.collection.Iterator$class.foreach(Iterator.scala:891)
scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
scala.collection.AbstractIterator.foldLeft(Iterator.scala:1334)
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:75)
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:67)
scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:72)
org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:68)
org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:77)
org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:77)
org.apache.spark.sql.Dataset.withAction(Dataset.scala:3360)
org.apache.spark.sql.Dataset.collect(Dataset.scala:2783)


Skimming the API doc at DataSourceReader at branch-2.4, I haven’t found the
guarantee that the readers are created only once. If that’s documented
somewhere, we should fix it in 2.4.0. If not, I think it fine since both
calls are in driver side and it’s something able to work around for
instance static class or thread local in this case.

Forwarding to dev mailing list in case that this is something we haven't
foreseen.

2018년 10월 9일 (화) 오후 9:39, Shubham Chaurasia <sh...@gmail.com>님이
작성:

> Alright, so it is a big project which uses a SQL store underneath.
> I extracted out the minimal code and made a smaller project out of it and
> still it is creating multiple instances.
>
> Here is my project:
>
> ├── my-datasource.iml
> ├── pom.xml
> ├── src
> │   ├── main
> │   │   ├── java
> │   │   │   └── com
> │   │   │       └── shubham
> │   │   │           ├── MyDataSource.java
> │   │   │           └── reader
> │   │   │               └── MyDataSourceReader.java
>
>
> MyDataSource.java
> -------------------------------------------------
>
> package com.shubham;
>
> import com.shubham.reader.MyDataSourceReader;
> import org.apache.spark.sql.SaveMode;
> import org.apache.spark.sql.sources.v2.DataSourceOptions;
> import org.apache.spark.sql.sources.v2.DataSourceV2;
> import org.apache.spark.sql.sources.v2.ReadSupport;
> import org.apache.spark.sql.sources.v2.WriteSupport;
> import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
> import org.apache.spark.sql.sources.v2.writer.DataSourceWriter;
> import org.apache.spark.sql.types.StructType;
>
> import java.util.Optional;
>
> public class MyDataSource implements DataSourceV2, ReadSupport, WriteSupport {
>
>   public DataSourceReader createReader(DataSourceOptions options) {
>     System.out.println("MyDataSource.createReader: Going to create a new MyDataSourceReader");
>     return new MyDataSourceReader(options.asMap());
>   }
>
>   public Optional<DataSourceWriter> createWriter(String writeUUID, StructType schema, SaveMode mode, DataSourceOptions options) {
>     return Optional.empty();
>   }
> }
>
>
> MyDataSourceReader.java
> -------------------------------------------------
>
> package com.shubham.reader;
>
> import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
> import org.apache.spark.sql.sources.v2.reader.InputPartition;
> import org.apache.spark.sql.sources.v2.reader.SupportsScanColumnarBatch;
> import org.apache.spark.sql.types.StructType;
> import org.apache.spark.sql.vectorized.ColumnarBatch;
>
> import java.util.ArrayList;
> import java.util.List;
> import java.util.Map;
>
> public class MyDataSourceReader implements DataSourceReader, SupportsScanColumnarBatch {
>
>   private Map<String, String> options;
>   private StructType schema;
>
>   public MyDataSourceReader(Map<String, String> options) {
>     System.out.println("MyDataSourceReader.MyDataSourceReader: Instantiated...." + this);
>     this.options = options;
>   }
>
>   @Override
>   public StructType readSchema() {
>     this.schema = (new StructType())
>         .add("col1", "int")
>         .add("col2", "string");
>     System.out.println("MyDataSourceReader.readSchema: " + this + " schema: " + this.schema);
>     return this.schema;
>   }
>
>   @Override
>   public List<InputPartition<ColumnarBatch>> planBatchInputPartitions() {
>     System.out.println("MyDataSourceReader.planBatchInputPartitions: " + this + " schema: " + this.schema);
>     return new ArrayList<>();
>   }
> }
>
>
> ----------------------------------------
> spark-shell output
> ----------------------------------------
> scala> spark.read.format("com.shubham.MyDataSource").option("query",
> "select * from some_table").load.show
>
> MyDataSource.createReader: Going to create a new MyDataSourceReader
> MyDataSourceReader.MyDataSourceReader:
> Instantiated....com.shubham.reader.MyDataSourceReader@69fa5536
> MyDataSourceReader.readSchema:
> com.shubham.reader.MyDataSourceReader@69fa5536 schema:
> StructType(StructField(col1,IntegerType,true),
> StructField(col2,StringType,true))
> MyDataSource.createReader: Going to create a new MyDataSourceReader
> MyDataSourceReader.MyDataSourceReader:
> Instantiated....com.shubham.reader.MyDataSourceReader@3095c449
> MyDataSourceReader.planBatchInputPartitions:
> com.shubham.reader.MyDataSourceReader@3095c449 schema: null
> +----+----+
> |col1|col2|
> +----+----+
> +----+----+
>
>
> Here 2 instances of reader, MyDataSourceReader@69fa5536 and
> MyDataSourceReader@3095c449 are being created. Consequently schema is
> null in MyDataSourceReader@3095c449.
>
> Am I not doing it the correct way?
>
> Thanks,
> Shubham
>
> On Tue, Oct 9, 2018 at 4:43 PM Mendelson, Assaf <As...@rsa.com>
> wrote:
>
>> I am using v2.4.0-RC2
>>
>>
>>
>> The code as is wouldn’t run (e.g. planBatchInputPartitions returns null).
>> How are you calling it?
>>
>>
>>
>> When I do:
>>
>> Val df = spark.read.format(mypackage).load().show()
>>
>> I am getting a single creation, how are you creating the reader?
>>
>>
>>
>> Thanks,
>>
>>         Assaf
>>
>>
>>
>> *From:* Shubham Chaurasia [mailto:shubh.chaurasia@gmail.com]
>> *Sent:* Tuesday, October 9, 2018 2:02 PM
>> *To:* Mendelson, Assaf; user@spark.apache.org
>> *Subject:* Re: DataSourceV2 APIs creating multiple instances of
>> DataSourceReader and hence not preserving the state
>>
>>
>>
>> [EXTERNAL EMAIL]
>> Please report any suspicious attachments, links, or requests for
>> sensitive information.
>>
>> Thanks Assaf, you tried with *tags/v2.4.0-rc2?*
>>
>>
>>
>> Full Code:
>>
>>
>>
>> MyDataSource is the entry point which simply creates Reader and Writer
>>
>>
>>
>> public class MyDataSource implements DataSourceV2, WriteSupport,
>> ReadSupport, SessionConfigSupport {
>>
>>
>>
>>   @Override public DataSourceReader createReader(DataSourceOptions
>> options) {
>>
>>     return new MyDataSourceReader(options.asMap());
>>
>>   }
>>
>>
>>
>>   @Override
>>
>>   public Optional<DataSourceWriter> createWriter(String jobId, StructType
>> schema,
>>
>>       SaveMode mode, DataSourceOptions options) {
>>
>>     // creates a dataSourcewriter here..
>>
>>     return Optional.of(dataSourcewriter);
>>
>>   }
>>
>>
>>
>>   @Override public String keyPrefix() {
>>
>>     return "myprefix";
>>
>>   }
>>
>>
>>
>> }
>>
>>
>>
>> public class MyDataSourceReader implements DataSourceReader,
>> SupportsScanColumnarBatch {
>>
>>
>>
>>   StructType schema = null;
>>
>>   Map<String, String> options;
>>
>>
>>
>>   public MyDataSourceReader(Map<String, String> options) {
>>
>>     System.out.println("MyDataSourceReader.MyDataSourceReader:
>> Instantiated...." + this);
>>
>>     this.options = options;
>>
>>   }
>>
>>
>>
>>   @Override
>>
>>   public List<InputPartition<ColumnarBatch>> planBatchInputPartitions() {
>>
>>     //variable this.schema is null here since readSchema() was called on
>> a different instance
>>
>>     System.out.println("MyDataSourceReader.planBatchInputPartitions: " +
>> this + " schema: " + this.schema);
>>
>>     //more logic......
>>
>>     return null;
>>
>>   }
>>
>>
>>
>>   @Override
>>
>>   public StructType readSchema() {
>>
>>     //some logic to discover schema
>>
>>     this.schema = (new StructType())
>>
>>         .add("col1", "int")
>>
>>         .add("col2", "string");
>>
>>     System.out.println("MyDataSourceReader.readSchema: " + this + "
>> schema: " + this.schema);
>>
>>     return this.schema;
>>
>>   }
>>
>> }
>>
>>
>>
>> Thanks,
>>
>> Shubham
>>
>>
>>
>> On Tue, Oct 9, 2018 at 3:59 PM Mendelson, Assaf <As...@rsa.com>
>> wrote:
>>
>> Could you add a fuller code example? I tried to reproduce it in my
>> environment and I am getting just one instance of the reader…
>>
>>
>>
>> Thanks,
>>
>>         Assaf
>>
>>
>>
>> *From:* Shubham Chaurasia [mailto:shubh.chaurasia@gmail.com]
>> *Sent:* Tuesday, October 9, 2018 9:31 AM
>> *To:* user@spark.apache.org
>> *Subject:* DataSourceV2 APIs creating multiple instances of
>> DataSourceReader and hence not preserving the state
>>
>>
>>
>> [EXTERNAL EMAIL]
>> Please report any suspicious attachments, links, or requests for
>> sensitive information.
>>
>> Hi All,
>>
>>
>>
>> --Spark built with *tags/v2.4.0-rc2*
>>
>>
>>
>> Consider following DataSourceReader implementation:
>>
>>
>>
>> *public class *MyDataSourceReader *implements *DataSourceReader, SupportsScanColumnarBatch {
>>
>>   StructType *schema *= *null*;
>>   Map<String, String> *options*;
>>
>>   *public *MyDataSourceReader(Map<String, String> options) {
>>     System.*out*.println(*"MyDataSourceReader.MyDataSourceReader: Instantiated...." *+ *this*);
>>     *this*.*options *= options;
>>   }
>>
>>   @Override
>>   *public *List<InputPartition<ColumnarBatch>> planBatchInputPartitions() {
>>
>> *//variable this.schema is null here since readSchema() was called on a different instance    *System.*out*.println(*"MyDataSourceReader.planBatchInputPartitions: " *+ *this *+ *" schema: " *+ *this*.*schema*);
>>
>> *//more logic......    **return null*;
>>   }
>>
>>   @Override
>>   *public *StructType readSchema() {
>>
>> *//some logic to discover schema    **this*.*schema *= (*new *StructType())
>>         .add(*"col1"*, *"int"*)
>>         .add(*"col2"*, *"string"*);
>>     System.*out*.println(*"MyDataSourceReader.readSchema: " *+ *this *+ *" schema: " *+ *this*.*schema*);
>>     *return this*.*schema*;
>>   }
>> }
>>
>> 1) First readSchema() is called on MyDataSourceReader@instance1 which sets class variable schema.
>>
>> 2) Now when planBatchInputPartitions() is called, it is being called on a different instance of MyDataSourceReader and hence I am not getting the value of schema in method planBatchInputPartitions().
>>
>>
>>
>> How can I get value of schema which was set in readSchema() method, in planBatchInputPartitions() method?
>>
>>
>>
>> Console Logs:
>>
>>
>>
>> scala> mysource.executeQuery("select * from movie").show
>>
>>
>>
>> MyDataSourceReader.MyDataSourceReader: Instantiated....MyDataSourceReader@59ea8f1b
>>
>> MyDataSourceReader.readSchema: MyDataSourceReader@59ea8f1b schema: StructType(StructField(col1,IntegerType,true), StructField(col2,StringType,true))
>>
>> MyDataSourceReader.MyDataSourceReader: Instantiated....MyDataSourceReader@a3cd3ff
>>
>> MyDataSourceReader.planBatchInputPartitions: MyDataSourceReader@a3cd3ff schema: null
>>
>>
>>
>> Thanks,
>>
>> Shubham
>>
>>
>>
>>

Re: DataSourceV2 APIs creating multiple instances of DataSourceReader and hence not preserving the state

Posted by Jörn Franke <jo...@gmail.com>.
Generally please avoid System.out.println, but use a logger -even for examples. People may take these examples from here and put it in their production code.

> Am 09.10.2018 um 15:39 schrieb Shubham Chaurasia <sh...@gmail.com>:
> 
> Alright, so it is a big project which uses a SQL store underneath.
> I extracted out the minimal code and made a smaller project out of it and still it is creating multiple instances. 
> 
> Here is my project:
> 
> ├── my-datasource.iml
> ├── pom.xml
> ├── src
> │   ├── main
> │   │   ├── java
> │   │   │   └── com
> │   │   │       └── shubham
> │   │   │           ├── MyDataSource.java
> │   │   │           └── reader
> │   │   │               └── MyDataSourceReader.java
> 
> 
> MyDataSource.java
> -------------------------------------------------
> package com.shubham;
> 
> import com.shubham.reader.MyDataSourceReader;
> import org.apache.spark.sql.SaveMode;
> import org.apache.spark.sql.sources.v2.DataSourceOptions;
> import org.apache.spark.sql.sources.v2.DataSourceV2;
> import org.apache.spark.sql.sources.v2.ReadSupport;
> import org.apache.spark.sql.sources.v2.WriteSupport;
> import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
> import org.apache.spark.sql.sources.v2.writer.DataSourceWriter;
> import org.apache.spark.sql.types.StructType;
> 
> import java.util.Optional;
> 
> public class MyDataSource implements DataSourceV2, ReadSupport, WriteSupport {
> 
>   public DataSourceReader createReader(DataSourceOptions options) {
>     System.out.println("MyDataSource.createReader: Going to create a new MyDataSourceReader");
>     return new MyDataSourceReader(options.asMap());
>   }
> 
>   public Optional<DataSourceWriter> createWriter(String writeUUID, StructType schema, SaveMode mode, DataSourceOptions options) {
>     return Optional.empty();
>   }
> }
> 
> MyDataSourceReader.java
> -------------------------------------------------
> package com.shubham.reader;
> 
> import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
> import org.apache.spark.sql.sources.v2.reader.InputPartition;
> import org.apache.spark.sql.sources.v2.reader.SupportsScanColumnarBatch;
> import org.apache.spark.sql.types.StructType;
> import org.apache.spark.sql.vectorized.ColumnarBatch;
> 
> import java.util.ArrayList;
> import java.util.List;
> import java.util.Map;
> 
> public class MyDataSourceReader implements DataSourceReader, SupportsScanColumnarBatch {
> 
>   private Map<String, String> options;
>   private StructType schema;
> 
>   public MyDataSourceReader(Map<String, String> options) {
>     System.out.println("MyDataSourceReader.MyDataSourceReader: Instantiated...." + this);
>     this.options = options;
>   }
> 
>   @Override
>   public StructType readSchema() {
>     this.schema = (new StructType())
>         .add("col1", "int")
>         .add("col2", "string");
>     System.out.println("MyDataSourceReader.readSchema: " + this + " schema: " + this.schema);
>     return this.schema;
>   }
> 
>   @Override
>   public List<InputPartition<ColumnarBatch>> planBatchInputPartitions() {
>     System.out.println("MyDataSourceReader.planBatchInputPartitions: " + this + " schema: " + this.schema);
>     return new ArrayList<>();
>   }
> }
> 
> ----------------------------------------
> spark-shell output
> ----------------------------------------
> scala> spark.read.format("com.shubham.MyDataSource").option("query", "select * from some_table").load.show
> 
> MyDataSource.createReader: Going to create a new MyDataSourceReader
> MyDataSourceReader.MyDataSourceReader: Instantiated....com.shubham.reader.MyDataSourceReader@69fa5536
> MyDataSourceReader.readSchema: com.shubham.reader.MyDataSourceReader@69fa5536 schema: StructType(StructField(col1,IntegerType,true), StructField(col2,StringType,true))
> MyDataSource.createReader: Going to create a new MyDataSourceReader
> MyDataSourceReader.MyDataSourceReader: Instantiated....com.shubham.reader.MyDataSourceReader@3095c449
> MyDataSourceReader.planBatchInputPartitions: com.shubham.reader.MyDataSourceReader@3095c449 schema: null
> +----+----+
> |col1|col2|
> +----+----+
> +----+----+
> 
> 
> Here 2 instances of reader, MyDataSourceReader@69fa5536 and MyDataSourceReader@3095c449 are being created. Consequently schema is null in MyDataSourceReader@3095c449.
> 
> Am I not doing it the correct way?
> 
> Thanks,
> Shubham
> 
>> On Tue, Oct 9, 2018 at 4:43 PM Mendelson, Assaf <As...@rsa.com> wrote:
>> I am using v2.4.0-RC2
>> 
>>  
>> 
>> The code as is wouldn’t run (e.g. planBatchInputPartitions returns null). How are you calling it?
>> 
>>  
>> 
>> When I do:
>> 
>> Val df = spark.read.format(mypackage).load().show()
>> 
>> I am getting a single creation, how are you creating the reader?
>> 
>>  
>> 
>> Thanks,
>> 
>>         Assaf
>> 
>>  
>> 
>> From: Shubham Chaurasia [mailto:shubh.chaurasia@gmail.com] 
>> Sent: Tuesday, October 9, 2018 2:02 PM
>> To: Mendelson, Assaf; user@spark.apache.org
>> Subject: Re: DataSourceV2 APIs creating multiple instances of DataSourceReader and hence not preserving the state
>> 
>>  
>> 
>> [EXTERNAL EMAIL] 
>> Please report any suspicious attachments, links, or requests for sensitive information.
>> 
>> Thanks Assaf, you tried with tags/v2.4.0-rc2?
>> 
>>  
>> 
>> Full Code:
>> 
>>  
>> 
>> MyDataSource is the entry point which simply creates Reader and Writer
>> 
>>  
>> 
>> public class MyDataSource implements DataSourceV2, WriteSupport, ReadSupport, SessionConfigSupport {
>> 
>>  
>> 
>>   @Override public DataSourceReader createReader(DataSourceOptions options) {
>> 
>>     return new MyDataSourceReader(options.asMap());
>> 
>>   }
>> 
>>  
>> 
>>   @Override
>> 
>>   public Optional<DataSourceWriter> createWriter(String jobId, StructType schema,
>> 
>>       SaveMode mode, DataSourceOptions options) {
>> 
>>     // creates a dataSourcewriter here..
>> 
>>     return Optional.of(dataSourcewriter);
>> 
>>   }
>> 
>>  
>> 
>>   @Override public String keyPrefix() {
>> 
>>     return "myprefix";
>> 
>>   }
>> 
>>  
>> 
>> }
>> 
>>  
>> 
>> public class MyDataSourceReader implements DataSourceReader, SupportsScanColumnarBatch {
>> 
>>  
>> 
>>   StructType schema = null;
>> 
>>   Map<String, String> options;
>> 
>>  
>> 
>>   public MyDataSourceReader(Map<String, String> options) {
>> 
>>     System.out.println("MyDataSourceReader.MyDataSourceReader: Instantiated...." + this);
>> 
>>     this.options = options;
>> 
>>   }
>> 
>>  
>> 
>>   @Override
>> 
>>   public List<InputPartition<ColumnarBatch>> planBatchInputPartitions() {
>> 
>>     //variable this.schema is null here since readSchema() was called on a different instance
>> 
>>     System.out.println("MyDataSourceReader.planBatchInputPartitions: " + this + " schema: " + this.schema);
>> 
>>     //more logic......
>> 
>>     return null;
>> 
>>   }
>> 
>>  
>> 
>>   @Override
>> 
>>   public StructType readSchema() {
>> 
>>     //some logic to discover schema
>> 
>>     this.schema = (new StructType())
>> 
>>         .add("col1", "int")
>> 
>>         .add("col2", "string");
>> 
>>     System.out.println("MyDataSourceReader.readSchema: " + this + " schema: " + this.schema);
>> 
>>     return this.schema;
>> 
>>   }
>> 
>> }
>> 
>>  
>> 
>> Thanks,
>> 
>> Shubham
>> 
>>  
>> 
>> On Tue, Oct 9, 2018 at 3:59 PM Mendelson, Assaf <As...@rsa.com> wrote:
>> 
>> Could you add a fuller code example? I tried to reproduce it in my environment and I am getting just one instance of the reader…
>> 
>>  
>> 
>> Thanks,
>> 
>>         Assaf
>> 
>>  
>> 
>> From: Shubham Chaurasia [mailto:shubh.chaurasia@gmail.com] 
>> Sent: Tuesday, October 9, 2018 9:31 AM
>> To: user@spark.apache.org
>> Subject: DataSourceV2 APIs creating multiple instances of DataSourceReader and hence not preserving the state
>> 
>>  
>> 
>> [EXTERNAL EMAIL] 
>> Please report any suspicious attachments, links, or requests for sensitive information.
>> 
>> Hi All,
>> 
>>  
>> 
>> --Spark built with tags/v2.4.0-rc2
>> 
>>  
>> 
>> Consider following DataSourceReader implementation:
>> 
>>  
>> 
>> public class MyDataSourceReader implements DataSourceReader, SupportsScanColumnarBatch {
>> 
>>   StructType schema = null;
>>   Map<String, String> options;
>> 
>>   public MyDataSourceReader(Map<String, String> options) {
>>     System.out.println("MyDataSourceReader.MyDataSourceReader: Instantiated...." + this);
>>     this.options = options;
>>   }
>> 
>>   @Override
>>   public List<InputPartition<ColumnarBatch>> planBatchInputPartitions() {
>>     //variable this.schema is null here since readSchema() was called on a different instance
>>     System.out.println("MyDataSourceReader.planBatchInputPartitions: " + this + " schema: " + this.schema);
>>     //more logic......
>>     return null;
>>   }
>> 
>>   @Override
>>   public StructType readSchema() {
>>     //some logic to discover schema
>>     this.schema = (new StructType())
>>         .add("col1", "int")
>>         .add("col2", "string");
>>     System.out.println("MyDataSourceReader.readSchema: " + this + " schema: " + this.schema);
>>     return this.schema;
>>   }
>> }
>> 1) First readSchema() is called on MyDataSourceReader@instance1 which sets class variable schema.
>> 2) Now when planBatchInputPartitions() is called, it is being called on a different instance of MyDataSourceReader and hence I am not getting the value of schema in method planBatchInputPartitions().
>>  
>> How can I get value of schema which was set in readSchema() method, in planBatchInputPartitions() method?
>>  
>> Console Logs:
>>  
>> scala> mysource.executeQuery("select * from movie").show
>>  
>> MyDataSourceReader.MyDataSourceReader: Instantiated....MyDataSourceReader@59ea8f1b
>> MyDataSourceReader.readSchema: MyDataSourceReader@59ea8f1b schema: StructType(StructField(col1,IntegerType,true), StructField(col2,StringType,true))
>> MyDataSourceReader.MyDataSourceReader: Instantiated....MyDataSourceReader@a3cd3ff
>> MyDataSourceReader.planBatchInputPartitions: MyDataSourceReader@a3cd3ff schema: null
>>  
>> 
>> Thanks,
>> 
>> Shubham
>> 
>>  

Re: DataSourceV2 APIs creating multiple instances of DataSourceReader and hence not preserving the state

Posted by Hyukjin Kwon <gu...@gmail.com>.
I took a look for the codes.

val source = classOf[MyDataSource].getCanonicalName
spark.read.format(source).load().collect()

Looks indeed it calls twice.

First all: Looks it creates it first to read the schema for a logical plan

test.org.apache.spark.sql.sources.v2.MyDataSourceReader.<init>(MyDataSourceReader.java:36)
test.org.apache.spark.sql.sources.v2.MyDataSource.createReader(MyDataSource.java:35)
org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation$SourceHelpers.createReader(DataSourceV2Relation.scala:155)
org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation$.create(DataSourceV2Relation.scala:172)
org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:204)
org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:167)

Second call: it creates another for its actual partitions in a physcal plan

test.org.apache.spark.sql.sources.v2.MyDataSourceReader.<init>(MyDataSourceReader.java:36)
test.org.apache.spark.sql.sources.v2.MyDataSource.createReader(MyDataSource.java:35)
org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation$SourceHelpers.createReader(DataSourceV2Relation.scala:155)
org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation.newReader(DataSourceV2Relation.scala:61)
org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy$.apply(DataSourceV2Strategy.scala:103)
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:63)
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:63)
scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:78)
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:75)
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
scala.collection.Iterator$class.foreach(Iterator.scala:891)
scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
scala.collection.AbstractIterator.foldLeft(Iterator.scala:1334)
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:75)
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:67)
scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:72)
org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:68)
org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:77)
org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:77)
org.apache.spark.sql.Dataset.withAction(Dataset.scala:3360)
org.apache.spark.sql.Dataset.collect(Dataset.scala:2783)


Skimming the API doc at DataSourceReader at branch-2.4, I haven’t found the
guarantee that the readers are created only once. If that’s documented
somewhere, we should fix it in 2.4.0. If not, I think it fine since both
calls are in driver side and it’s something able to work around for
instance static class or thread local in this case.

Forwarding to dev mailing list in case that this is something we haven't
foreseen.

2018년 10월 9일 (화) 오후 9:39, Shubham Chaurasia <sh...@gmail.com>님이
작성:

> Alright, so it is a big project which uses a SQL store underneath.
> I extracted out the minimal code and made a smaller project out of it and
> still it is creating multiple instances.
>
> Here is my project:
>
> ├── my-datasource.iml
> ├── pom.xml
> ├── src
> │   ├── main
> │   │   ├── java
> │   │   │   └── com
> │   │   │       └── shubham
> │   │   │           ├── MyDataSource.java
> │   │   │           └── reader
> │   │   │               └── MyDataSourceReader.java
>
>
> MyDataSource.java
> -------------------------------------------------
>
> package com.shubham;
>
> import com.shubham.reader.MyDataSourceReader;
> import org.apache.spark.sql.SaveMode;
> import org.apache.spark.sql.sources.v2.DataSourceOptions;
> import org.apache.spark.sql.sources.v2.DataSourceV2;
> import org.apache.spark.sql.sources.v2.ReadSupport;
> import org.apache.spark.sql.sources.v2.WriteSupport;
> import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
> import org.apache.spark.sql.sources.v2.writer.DataSourceWriter;
> import org.apache.spark.sql.types.StructType;
>
> import java.util.Optional;
>
> public class MyDataSource implements DataSourceV2, ReadSupport, WriteSupport {
>
>   public DataSourceReader createReader(DataSourceOptions options) {
>     System.out.println("MyDataSource.createReader: Going to create a new MyDataSourceReader");
>     return new MyDataSourceReader(options.asMap());
>   }
>
>   public Optional<DataSourceWriter> createWriter(String writeUUID, StructType schema, SaveMode mode, DataSourceOptions options) {
>     return Optional.empty();
>   }
> }
>
>
> MyDataSourceReader.java
> -------------------------------------------------
>
> package com.shubham.reader;
>
> import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
> import org.apache.spark.sql.sources.v2.reader.InputPartition;
> import org.apache.spark.sql.sources.v2.reader.SupportsScanColumnarBatch;
> import org.apache.spark.sql.types.StructType;
> import org.apache.spark.sql.vectorized.ColumnarBatch;
>
> import java.util.ArrayList;
> import java.util.List;
> import java.util.Map;
>
> public class MyDataSourceReader implements DataSourceReader, SupportsScanColumnarBatch {
>
>   private Map<String, String> options;
>   private StructType schema;
>
>   public MyDataSourceReader(Map<String, String> options) {
>     System.out.println("MyDataSourceReader.MyDataSourceReader: Instantiated...." + this);
>     this.options = options;
>   }
>
>   @Override
>   public StructType readSchema() {
>     this.schema = (new StructType())
>         .add("col1", "int")
>         .add("col2", "string");
>     System.out.println("MyDataSourceReader.readSchema: " + this + " schema: " + this.schema);
>     return this.schema;
>   }
>
>   @Override
>   public List<InputPartition<ColumnarBatch>> planBatchInputPartitions() {
>     System.out.println("MyDataSourceReader.planBatchInputPartitions: " + this + " schema: " + this.schema);
>     return new ArrayList<>();
>   }
> }
>
>
> ----------------------------------------
> spark-shell output
> ----------------------------------------
> scala> spark.read.format("com.shubham.MyDataSource").option("query",
> "select * from some_table").load.show
>
> MyDataSource.createReader: Going to create a new MyDataSourceReader
> MyDataSourceReader.MyDataSourceReader:
> Instantiated....com.shubham.reader.MyDataSourceReader@69fa5536
> MyDataSourceReader.readSchema:
> com.shubham.reader.MyDataSourceReader@69fa5536 schema:
> StructType(StructField(col1,IntegerType,true),
> StructField(col2,StringType,true))
> MyDataSource.createReader: Going to create a new MyDataSourceReader
> MyDataSourceReader.MyDataSourceReader:
> Instantiated....com.shubham.reader.MyDataSourceReader@3095c449
> MyDataSourceReader.planBatchInputPartitions:
> com.shubham.reader.MyDataSourceReader@3095c449 schema: null
> +----+----+
> |col1|col2|
> +----+----+
> +----+----+
>
>
> Here 2 instances of reader, MyDataSourceReader@69fa5536 and
> MyDataSourceReader@3095c449 are being created. Consequently schema is
> null in MyDataSourceReader@3095c449.
>
> Am I not doing it the correct way?
>
> Thanks,
> Shubham
>
> On Tue, Oct 9, 2018 at 4:43 PM Mendelson, Assaf <As...@rsa.com>
> wrote:
>
>> I am using v2.4.0-RC2
>>
>>
>>
>> The code as is wouldn’t run (e.g. planBatchInputPartitions returns null).
>> How are you calling it?
>>
>>
>>
>> When I do:
>>
>> Val df = spark.read.format(mypackage).load().show()
>>
>> I am getting a single creation, how are you creating the reader?
>>
>>
>>
>> Thanks,
>>
>>         Assaf
>>
>>
>>
>> *From:* Shubham Chaurasia [mailto:shubh.chaurasia@gmail.com]
>> *Sent:* Tuesday, October 9, 2018 2:02 PM
>> *To:* Mendelson, Assaf; user@spark.apache.org
>> *Subject:* Re: DataSourceV2 APIs creating multiple instances of
>> DataSourceReader and hence not preserving the state
>>
>>
>>
>> [EXTERNAL EMAIL]
>> Please report any suspicious attachments, links, or requests for
>> sensitive information.
>>
>> Thanks Assaf, you tried with *tags/v2.4.0-rc2?*
>>
>>
>>
>> Full Code:
>>
>>
>>
>> MyDataSource is the entry point which simply creates Reader and Writer
>>
>>
>>
>> public class MyDataSource implements DataSourceV2, WriteSupport,
>> ReadSupport, SessionConfigSupport {
>>
>>
>>
>>   @Override public DataSourceReader createReader(DataSourceOptions
>> options) {
>>
>>     return new MyDataSourceReader(options.asMap());
>>
>>   }
>>
>>
>>
>>   @Override
>>
>>   public Optional<DataSourceWriter> createWriter(String jobId, StructType
>> schema,
>>
>>       SaveMode mode, DataSourceOptions options) {
>>
>>     // creates a dataSourcewriter here..
>>
>>     return Optional.of(dataSourcewriter);
>>
>>   }
>>
>>
>>
>>   @Override public String keyPrefix() {
>>
>>     return "myprefix";
>>
>>   }
>>
>>
>>
>> }
>>
>>
>>
>> public class MyDataSourceReader implements DataSourceReader,
>> SupportsScanColumnarBatch {
>>
>>
>>
>>   StructType schema = null;
>>
>>   Map<String, String> options;
>>
>>
>>
>>   public MyDataSourceReader(Map<String, String> options) {
>>
>>     System.out.println("MyDataSourceReader.MyDataSourceReader:
>> Instantiated...." + this);
>>
>>     this.options = options;
>>
>>   }
>>
>>
>>
>>   @Override
>>
>>   public List<InputPartition<ColumnarBatch>> planBatchInputPartitions() {
>>
>>     //variable this.schema is null here since readSchema() was called on
>> a different instance
>>
>>     System.out.println("MyDataSourceReader.planBatchInputPartitions: " +
>> this + " schema: " + this.schema);
>>
>>     //more logic......
>>
>>     return null;
>>
>>   }
>>
>>
>>
>>   @Override
>>
>>   public StructType readSchema() {
>>
>>     //some logic to discover schema
>>
>>     this.schema = (new StructType())
>>
>>         .add("col1", "int")
>>
>>         .add("col2", "string");
>>
>>     System.out.println("MyDataSourceReader.readSchema: " + this + "
>> schema: " + this.schema);
>>
>>     return this.schema;
>>
>>   }
>>
>> }
>>
>>
>>
>> Thanks,
>>
>> Shubham
>>
>>
>>
>> On Tue, Oct 9, 2018 at 3:59 PM Mendelson, Assaf <As...@rsa.com>
>> wrote:
>>
>> Could you add a fuller code example? I tried to reproduce it in my
>> environment and I am getting just one instance of the reader…
>>
>>
>>
>> Thanks,
>>
>>         Assaf
>>
>>
>>
>> *From:* Shubham Chaurasia [mailto:shubh.chaurasia@gmail.com]
>> *Sent:* Tuesday, October 9, 2018 9:31 AM
>> *To:* user@spark.apache.org
>> *Subject:* DataSourceV2 APIs creating multiple instances of
>> DataSourceReader and hence not preserving the state
>>
>>
>>
>> [EXTERNAL EMAIL]
>> Please report any suspicious attachments, links, or requests for
>> sensitive information.
>>
>> Hi All,
>>
>>
>>
>> --Spark built with *tags/v2.4.0-rc2*
>>
>>
>>
>> Consider following DataSourceReader implementation:
>>
>>
>>
>> *public class *MyDataSourceReader *implements *DataSourceReader, SupportsScanColumnarBatch {
>>
>>   StructType *schema *= *null*;
>>   Map<String, String> *options*;
>>
>>   *public *MyDataSourceReader(Map<String, String> options) {
>>     System.*out*.println(*"MyDataSourceReader.MyDataSourceReader: Instantiated...." *+ *this*);
>>     *this*.*options *= options;
>>   }
>>
>>   @Override
>>   *public *List<InputPartition<ColumnarBatch>> planBatchInputPartitions() {
>>
>> *//variable this.schema is null here since readSchema() was called on a different instance    *System.*out*.println(*"MyDataSourceReader.planBatchInputPartitions: " *+ *this *+ *" schema: " *+ *this*.*schema*);
>>
>> *//more logic......    **return null*;
>>   }
>>
>>   @Override
>>   *public *StructType readSchema() {
>>
>> *//some logic to discover schema    **this*.*schema *= (*new *StructType())
>>         .add(*"col1"*, *"int"*)
>>         .add(*"col2"*, *"string"*);
>>     System.*out*.println(*"MyDataSourceReader.readSchema: " *+ *this *+ *" schema: " *+ *this*.*schema*);
>>     *return this*.*schema*;
>>   }
>> }
>>
>> 1) First readSchema() is called on MyDataSourceReader@instance1 which sets class variable schema.
>>
>> 2) Now when planBatchInputPartitions() is called, it is being called on a different instance of MyDataSourceReader and hence I am not getting the value of schema in method planBatchInputPartitions().
>>
>>
>>
>> How can I get value of schema which was set in readSchema() method, in planBatchInputPartitions() method?
>>
>>
>>
>> Console Logs:
>>
>>
>>
>> scala> mysource.executeQuery("select * from movie").show
>>
>>
>>
>> MyDataSourceReader.MyDataSourceReader: Instantiated....MyDataSourceReader@59ea8f1b
>>
>> MyDataSourceReader.readSchema: MyDataSourceReader@59ea8f1b schema: StructType(StructField(col1,IntegerType,true), StructField(col2,StringType,true))
>>
>> MyDataSourceReader.MyDataSourceReader: Instantiated....MyDataSourceReader@a3cd3ff
>>
>> MyDataSourceReader.planBatchInputPartitions: MyDataSourceReader@a3cd3ff schema: null
>>
>>
>>
>> Thanks,
>>
>> Shubham
>>
>>
>>
>>

Re: DataSourceV2 APIs creating multiple instances of DataSourceReader and hence not preserving the state

Posted by Shubham Chaurasia <sh...@gmail.com>.
Alright, so it is a big project which uses a SQL store underneath.
I extracted out the minimal code and made a smaller project out of it and
still it is creating multiple instances.

Here is my project:

├── my-datasource.iml
├── pom.xml
├── src
│   ├── main
│   │   ├── java
│   │   │   └── com
│   │   │       └── shubham
│   │   │           ├── MyDataSource.java
│   │   │           └── reader
│   │   │               └── MyDataSourceReader.java


MyDataSource.java
-------------------------------------------------

package com.shubham;

import com.shubham.reader.MyDataSourceReader;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.sources.v2.DataSourceOptions;
import org.apache.spark.sql.sources.v2.DataSourceV2;
import org.apache.spark.sql.sources.v2.ReadSupport;
import org.apache.spark.sql.sources.v2.WriteSupport;
import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
import org.apache.spark.sql.sources.v2.writer.DataSourceWriter;
import org.apache.spark.sql.types.StructType;

import java.util.Optional;

public class MyDataSource implements DataSourceV2, ReadSupport, WriteSupport {

  public DataSourceReader createReader(DataSourceOptions options) {
    System.out.println("MyDataSource.createReader: Going to create a
new MyDataSourceReader");
    return new MyDataSourceReader(options.asMap());
  }

  public Optional<DataSourceWriter> createWriter(String writeUUID,
StructType schema, SaveMode mode, DataSourceOptions options) {
    return Optional.empty();
  }
}


MyDataSourceReader.java
-------------------------------------------------

package com.shubham.reader;

import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
import org.apache.spark.sql.sources.v2.reader.InputPartition;
import org.apache.spark.sql.sources.v2.reader.SupportsScanColumnarBatch;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.vectorized.ColumnarBatch;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

public class MyDataSourceReader implements DataSourceReader,
SupportsScanColumnarBatch {

  private Map<String, String> options;
  private StructType schema;

  public MyDataSourceReader(Map<String, String> options) {
    System.out.println("MyDataSourceReader.MyDataSourceReader:
Instantiated...." + this);
    this.options = options;
  }

  @Override
  public StructType readSchema() {
    this.schema = (new StructType())
        .add("col1", "int")
        .add("col2", "string");
    System.out.println("MyDataSourceReader.readSchema: " + this + "
schema: " + this.schema);
    return this.schema;
  }

  @Override
  public List<InputPartition<ColumnarBatch>> planBatchInputPartitions() {
    System.out.println("MyDataSourceReader.planBatchInputPartitions: "
+ this + " schema: " + this.schema);
    return new ArrayList<>();
  }
}


----------------------------------------
spark-shell output
----------------------------------------
scala> spark.read.format("com.shubham.MyDataSource").option("query",
"select * from some_table").load.show

MyDataSource.createReader: Going to create a new MyDataSourceReader
MyDataSourceReader.MyDataSourceReader:
Instantiated....com.shubham.reader.MyDataSourceReader@69fa5536
MyDataSourceReader.readSchema:
com.shubham.reader.MyDataSourceReader@69fa5536 schema:
StructType(StructField(col1,IntegerType,true),
StructField(col2,StringType,true))
MyDataSource.createReader: Going to create a new MyDataSourceReader
MyDataSourceReader.MyDataSourceReader:
Instantiated....com.shubham.reader.MyDataSourceReader@3095c449
MyDataSourceReader.planBatchInputPartitions:
com.shubham.reader.MyDataSourceReader@3095c449 schema: null
+----+----+
|col1|col2|
+----+----+
+----+----+


Here 2 instances of reader, MyDataSourceReader@69fa5536 and
MyDataSourceReader@3095c449 are being created. Consequently schema is null
in MyDataSourceReader@3095c449.

Am I not doing it the correct way?

Thanks,
Shubham

On Tue, Oct 9, 2018 at 4:43 PM Mendelson, Assaf <As...@rsa.com>
wrote:

> I am using v2.4.0-RC2
>
>
>
> The code as is wouldn’t run (e.g. planBatchInputPartitions returns null).
> How are you calling it?
>
>
>
> When I do:
>
> Val df = spark.read.format(mypackage).load().show()
>
> I am getting a single creation, how are you creating the reader?
>
>
>
> Thanks,
>
>         Assaf
>
>
>
> *From:* Shubham Chaurasia [mailto:shubh.chaurasia@gmail.com]
> *Sent:* Tuesday, October 9, 2018 2:02 PM
> *To:* Mendelson, Assaf; user@spark.apache.org
> *Subject:* Re: DataSourceV2 APIs creating multiple instances of
> DataSourceReader and hence not preserving the state
>
>
>
> [EXTERNAL EMAIL]
> Please report any suspicious attachments, links, or requests for sensitive
> information.
>
> Thanks Assaf, you tried with *tags/v2.4.0-rc2?*
>
>
>
> Full Code:
>
>
>
> MyDataSource is the entry point which simply creates Reader and Writer
>
>
>
> public class MyDataSource implements DataSourceV2, WriteSupport,
> ReadSupport, SessionConfigSupport {
>
>
>
>   @Override public DataSourceReader createReader(DataSourceOptions
> options) {
>
>     return new MyDataSourceReader(options.asMap());
>
>   }
>
>
>
>   @Override
>
>   public Optional<DataSourceWriter> createWriter(String jobId, StructType
> schema,
>
>       SaveMode mode, DataSourceOptions options) {
>
>     // creates a dataSourcewriter here..
>
>     return Optional.of(dataSourcewriter);
>
>   }
>
>
>
>   @Override public String keyPrefix() {
>
>     return "myprefix";
>
>   }
>
>
>
> }
>
>
>
> public class MyDataSourceReader implements DataSourceReader,
> SupportsScanColumnarBatch {
>
>
>
>   StructType schema = null;
>
>   Map<String, String> options;
>
>
>
>   public MyDataSourceReader(Map<String, String> options) {
>
>     System.out.println("MyDataSourceReader.MyDataSourceReader:
> Instantiated...." + this);
>
>     this.options = options;
>
>   }
>
>
>
>   @Override
>
>   public List<InputPartition<ColumnarBatch>> planBatchInputPartitions() {
>
>     //variable this.schema is null here since readSchema() was called on a
> different instance
>
>     System.out.println("MyDataSourceReader.planBatchInputPartitions: " +
> this + " schema: " + this.schema);
>
>     //more logic......
>
>     return null;
>
>   }
>
>
>
>   @Override
>
>   public StructType readSchema() {
>
>     //some logic to discover schema
>
>     this.schema = (new StructType())
>
>         .add("col1", "int")
>
>         .add("col2", "string");
>
>     System.out.println("MyDataSourceReader.readSchema: " + this + "
> schema: " + this.schema);
>
>     return this.schema;
>
>   }
>
> }
>
>
>
> Thanks,
>
> Shubham
>
>
>
> On Tue, Oct 9, 2018 at 3:59 PM Mendelson, Assaf <As...@rsa.com>
> wrote:
>
> Could you add a fuller code example? I tried to reproduce it in my
> environment and I am getting just one instance of the reader…
>
>
>
> Thanks,
>
>         Assaf
>
>
>
> *From:* Shubham Chaurasia [mailto:shubh.chaurasia@gmail.com]
> *Sent:* Tuesday, October 9, 2018 9:31 AM
> *To:* user@spark.apache.org
> *Subject:* DataSourceV2 APIs creating multiple instances of
> DataSourceReader and hence not preserving the state
>
>
>
> [EXTERNAL EMAIL]
> Please report any suspicious attachments, links, or requests for sensitive
> information.
>
> Hi All,
>
>
>
> --Spark built with *tags/v2.4.0-rc2*
>
>
>
> Consider following DataSourceReader implementation:
>
>
>
> *public class *MyDataSourceReader *implements *DataSourceReader, SupportsScanColumnarBatch {
>
>   StructType *schema *= *null*;
>   Map<String, String> *options*;
>
>   *public *MyDataSourceReader(Map<String, String> options) {
>     System.*out*.println(*"MyDataSourceReader.MyDataSourceReader: Instantiated...." *+ *this*);
>     *this*.*options *= options;
>   }
>
>   @Override
>   *public *List<InputPartition<ColumnarBatch>> planBatchInputPartitions() {
>
> *//variable this.schema is null here since readSchema() was called on a different instance    *System.*out*.println(*"MyDataSourceReader.planBatchInputPartitions: " *+ *this *+ *" schema: " *+ *this*.*schema*);
>
> *//more logic......    **return null*;
>   }
>
>   @Override
>   *public *StructType readSchema() {
>
> *//some logic to discover schema    **this*.*schema *= (*new *StructType())
>         .add(*"col1"*, *"int"*)
>         .add(*"col2"*, *"string"*);
>     System.*out*.println(*"MyDataSourceReader.readSchema: " *+ *this *+ *" schema: " *+ *this*.*schema*);
>     *return this*.*schema*;
>   }
> }
>
> 1) First readSchema() is called on MyDataSourceReader@instance1 which sets class variable schema.
>
> 2) Now when planBatchInputPartitions() is called, it is being called on a different instance of MyDataSourceReader and hence I am not getting the value of schema in method planBatchInputPartitions().
>
>
>
> How can I get value of schema which was set in readSchema() method, in planBatchInputPartitions() method?
>
>
>
> Console Logs:
>
>
>
> scala> mysource.executeQuery("select * from movie").show
>
>
>
> MyDataSourceReader.MyDataSourceReader: Instantiated....MyDataSourceReader@59ea8f1b
>
> MyDataSourceReader.readSchema: MyDataSourceReader@59ea8f1b schema: StructType(StructField(col1,IntegerType,true), StructField(col2,StringType,true))
>
> MyDataSourceReader.MyDataSourceReader: Instantiated....MyDataSourceReader@a3cd3ff
>
> MyDataSourceReader.planBatchInputPartitions: MyDataSourceReader@a3cd3ff schema: null
>
>
>
> Thanks,
>
> Shubham
>
>
>
>

RE: DataSourceV2 APIs creating multiple instances of DataSourceReader and hence not preserving the state

Posted by "Mendelson, Assaf" <As...@rsa.com>.
I am using v2.4.0-RC2

The code as is wouldn’t run (e.g. planBatchInputPartitions returns null). How are you calling it?

When I do:
Val df = spark.read.format(mypackage).load().show()
I am getting a single creation, how are you creating the reader?

Thanks,
        Assaf

From: Shubham Chaurasia [mailto:shubh.chaurasia@gmail.com]
Sent: Tuesday, October 9, 2018 2:02 PM
To: Mendelson, Assaf; user@spark.apache.org
Subject: Re: DataSourceV2 APIs creating multiple instances of DataSourceReader and hence not preserving the state


[EXTERNAL EMAIL]
Please report any suspicious attachments, links, or requests for sensitive information.
Thanks Assaf, you tried with tags/v2.4.0-rc2?

Full Code:

MyDataSource is the entry point which simply creates Reader and Writer

public class MyDataSource implements DataSourceV2, WriteSupport, ReadSupport, SessionConfigSupport {

  @Override public DataSourceReader createReader(DataSourceOptions options) {
    return new MyDataSourceReader(options.asMap());
  }

  @Override
  public Optional<DataSourceWriter> createWriter(String jobId, StructType schema,
      SaveMode mode, DataSourceOptions options) {
    // creates a dataSourcewriter here..
    return Optional.of(dataSourcewriter);
  }

  @Override public String keyPrefix() {
    return "myprefix";
  }

}

public class MyDataSourceReader implements DataSourceReader, SupportsScanColumnarBatch {

  StructType schema = null;
  Map<String, String> options;

  public MyDataSourceReader(Map<String, String> options) {
    System.out.println("MyDataSourceReader.MyDataSourceReader: Instantiated...." + this);
    this.options = options;
  }

  @Override
  public List<InputPartition<ColumnarBatch>> planBatchInputPartitions() {
    //variable this.schema is null here since readSchema() was called on a different instance
    System.out.println("MyDataSourceReader.planBatchInputPartitions: " + this + " schema: " + this.schema);
    //more logic......
    return null;
  }

  @Override
  public StructType readSchema() {
    //some logic to discover schema
    this.schema = (new StructType())
        .add("col1", "int")
        .add("col2", "string");
    System.out.println("MyDataSourceReader.readSchema: " + this + " schema: " + this.schema);
    return this.schema;
  }
}

Thanks,
Shubham

On Tue, Oct 9, 2018 at 3:59 PM Mendelson, Assaf <As...@rsa.com>> wrote:
Could you add a fuller code example? I tried to reproduce it in my environment and I am getting just one instance of the reader…

Thanks,
        Assaf

From: Shubham Chaurasia [mailto:shubh.chaurasia@gmail.com<ma...@gmail.com>]
Sent: Tuesday, October 9, 2018 9:31 AM
To: user@spark.apache.org<ma...@spark.apache.org>
Subject: DataSourceV2 APIs creating multiple instances of DataSourceReader and hence not preserving the state


[EXTERNAL EMAIL]
Please report any suspicious attachments, links, or requests for sensitive information.
Hi All,

--Spark built with tags/v2.4.0-rc2

Consider following DataSourceReader implementation:


public class MyDataSourceReader implements DataSourceReader, SupportsScanColumnarBatch {

  StructType schema = null;
  Map<String, String> options;

  public MyDataSourceReader(Map<String, String> options) {
    System.out.println("MyDataSourceReader.MyDataSourceReader: Instantiated...." + this);
    this.options = options;
  }

  @Override
  public List<InputPartition<ColumnarBatch>> planBatchInputPartitions() {
    //variable this.schema is null here since readSchema() was called on a different instance
    System.out.println("MyDataSourceReader.planBatchInputPartitions: " + this + " schema: " + this.schema);
    //more logic......
    return null;
  }

  @Override
  public StructType readSchema() {
    //some logic to discover schema
    this.schema = (new StructType())
        .add("col1", "int")
        .add("col2", "string");
    System.out.println("MyDataSourceReader.readSchema: " + this + " schema: " + this.schema);
    return this.schema;
  }
}

1) First readSchema() is called on MyDataSourceReader@instance1 which sets class variable schema.

2) Now when planBatchInputPartitions() is called, it is being called on a different instance of MyDataSourceReader and hence I am not getting the value of schema in method planBatchInputPartitions().



How can I get value of schema which was set in readSchema() method, in planBatchInputPartitions() method?



Console Logs:



scala> mysource.executeQuery("select * from movie").show



MyDataSourceReader.MyDataSourceReader: Instantiated....MyDataSourceReader@59ea8f1b<ma...@59ea8f1b>

MyDataSourceReader.readSchema: MyDataSourceReader@59ea8f1b schema: StructType(StructField(col1,IntegerType,true), StructField(col2,StringType,true))

MyDataSourceReader.MyDataSourceReader: Instantiated....MyDataSourceReader@a3cd3ff<ma...@a3cd3ff>

MyDataSourceReader.planBatchInputPartitions: MyDataSourceReader@a3cd3ff schema: null

Thanks,
Shubham



Re: DataSourceV2 APIs creating multiple instances of DataSourceReader and hence not preserving the state

Posted by Shubham Chaurasia <sh...@gmail.com>.
Thanks Assaf, you tried with *tags/v2.4.0-rc2?*

Full Code:

MyDataSource is the entry point which simply creates Reader and Writer

public class MyDataSource implements DataSourceV2, WriteSupport,
ReadSupport, SessionConfigSupport {

  @Override public DataSourceReader createReader(DataSourceOptions options)
{
    return new MyDataSourceReader(options.asMap());
  }

  @Override
  public Optional<DataSourceWriter> createWriter(String jobId, StructType
schema,
      SaveMode mode, DataSourceOptions options) {
    // creates a dataSourcewriter here..
    return Optional.of(dataSourcewriter);
  }

  @Override public String keyPrefix() {
    return "myprefix";
  }

}

public class MyDataSourceReader implements DataSourceReader,
SupportsScanColumnarBatch {

  StructType schema = null;
  Map<String, String> options;

  public MyDataSourceReader(Map<String, String> options) {
    System.out.println("MyDataSourceReader.MyDataSourceReader:
Instantiated...." + this);
    this.options = options;
  }

  @Override
  public List<InputPartition<ColumnarBatch>> planBatchInputPartitions() {
    //variable this.schema is null here since readSchema() was called on a
different instance
    System.out.println("MyDataSourceReader.planBatchInputPartitions: " +
this + " schema: " + this.schema);
    //more logic......
    return null;
  }

  @Override
  public StructType readSchema() {
    //some logic to discover schema
    this.schema = (new StructType())
        .add("col1", "int")
        .add("col2", "string");
    System.out.println("MyDataSourceReader.readSchema: " + this + " schema:
" + this.schema);
    return this.schema;
  }
}

Thanks,
Shubham

On Tue, Oct 9, 2018 at 3:59 PM Mendelson, Assaf <As...@rsa.com>
wrote:

> Could you add a fuller code example? I tried to reproduce it in my
> environment and I am getting just one instance of the reader…
>
>
>
> Thanks,
>
>         Assaf
>
>
>
> *From:* Shubham Chaurasia [mailto:shubh.chaurasia@gmail.com]
> *Sent:* Tuesday, October 9, 2018 9:31 AM
> *To:* user@spark.apache.org
> *Subject:* DataSourceV2 APIs creating multiple instances of
> DataSourceReader and hence not preserving the state
>
>
>
> [EXTERNAL EMAIL]
> Please report any suspicious attachments, links, or requests for sensitive
> information.
>
> Hi All,
>
>
>
> --Spark built with *tags/v2.4.0-rc2*
>
>
>
> Consider following DataSourceReader implementation:
>
>
>
> *public class *MyDataSourceReader *implements *DataSourceReader, SupportsScanColumnarBatch {
>
>   StructType *schema *= *null*;
>   Map<String, String> *options*;
>
>   *public *MyDataSourceReader(Map<String, String> options) {
>     System.*out*.println(*"MyDataSourceReader.MyDataSourceReader: Instantiated...." *+ *this*);
>     *this*.*options *= options;
>   }
>
>   @Override
>   *public *List<InputPartition<ColumnarBatch>> planBatchInputPartitions() {
>
> *//variable this.schema is null here since readSchema() was called on a different instance    *System.*out*.println(*"MyDataSourceReader.planBatchInputPartitions: " *+ *this *+ *" schema: " *+ *this*.*schema*);
>
> *//more logic......    **return null*;
>   }
>
>   @Override
>   *public *StructType readSchema() {
>
> *//some logic to discover schema    **this*.*schema *= (*new *StructType())
>         .add(*"col1"*, *"int"*)
>         .add(*"col2"*, *"string"*);
>     System.*out*.println(*"MyDataSourceReader.readSchema: " *+ *this *+ *" schema: " *+ *this*.*schema*);
>     *return this*.*schema*;
>   }
> }
>
> 1) First readSchema() is called on MyDataSourceReader@instance1 which sets class variable schema.
>
> 2) Now when planBatchInputPartitions() is called, it is being called on a different instance of MyDataSourceReader and hence I am not getting the value of schema in method planBatchInputPartitions().
>
>
>
> How can I get value of schema which was set in readSchema() method, in planBatchInputPartitions() method?
>
>
>
> Console Logs:
>
>
>
> scala> mysource.executeQuery("select * from movie").show
>
>
>
> MyDataSourceReader.MyDataSourceReader: Instantiated....MyDataSourceReader@59ea8f1b
>
> MyDataSourceReader.readSchema: MyDataSourceReader@59ea8f1b schema: StructType(StructField(col1,IntegerType,true), StructField(col2,StringType,true))
>
> MyDataSourceReader.MyDataSourceReader: Instantiated....MyDataSourceReader@a3cd3ff
>
> MyDataSourceReader.planBatchInputPartitions: MyDataSourceReader@a3cd3ff schema: null
>
>
>
> Thanks,
>
> Shubham
>
>
>
>

RE: DataSourceV2 APIs creating multiple instances of DataSourceReader and hence not preserving the state

Posted by "Mendelson, Assaf" <As...@rsa.com>.
Could you add a fuller code example? I tried to reproduce it in my environment and I am getting just one instance of the reader…

Thanks,
        Assaf

From: Shubham Chaurasia [mailto:shubh.chaurasia@gmail.com]
Sent: Tuesday, October 9, 2018 9:31 AM
To: user@spark.apache.org
Subject: DataSourceV2 APIs creating multiple instances of DataSourceReader and hence not preserving the state


[EXTERNAL EMAIL]
Please report any suspicious attachments, links, or requests for sensitive information.
Hi All,

--Spark built with tags/v2.4.0-rc2

Consider following DataSourceReader implementation:


public class MyDataSourceReader implements DataSourceReader, SupportsScanColumnarBatch {

  StructType schema = null;
  Map<String, String> options;

  public MyDataSourceReader(Map<String, String> options) {
    System.out.println("MyDataSourceReader.MyDataSourceReader: Instantiated...." + this);
    this.options = options;
  }

  @Override
  public List<InputPartition<ColumnarBatch>> planBatchInputPartitions() {
    //variable this.schema is null here since readSchema() was called on a different instance
    System.out.println("MyDataSourceReader.planBatchInputPartitions: " + this + " schema: " + this.schema);
    //more logic......
    return null;
  }

  @Override
  public StructType readSchema() {
    //some logic to discover schema
    this.schema = (new StructType())
        .add("col1", "int")
        .add("col2", "string");
    System.out.println("MyDataSourceReader.readSchema: " + this + " schema: " + this.schema);
    return this.schema;
  }
}

1) First readSchema() is called on MyDataSourceReader@instance1 which sets class variable schema.

2) Now when planBatchInputPartitions() is called, it is being called on a different instance of MyDataSourceReader and hence I am not getting the value of schema in method planBatchInputPartitions().



How can I get value of schema which was set in readSchema() method, in planBatchInputPartitions() method?



Console Logs:



scala> mysource.executeQuery("select * from movie").show



MyDataSourceReader.MyDataSourceReader: Instantiated....MyDataSourceReader@59ea8f1b<ma...@59ea8f1b>

MyDataSourceReader.readSchema: MyDataSourceReader@59ea8f1b schema: StructType(StructField(col1,IntegerType,true), StructField(col2,StringType,true))

MyDataSourceReader.MyDataSourceReader: Instantiated....MyDataSourceReader@a3cd3ff<ma...@a3cd3ff>

MyDataSourceReader.planBatchInputPartitions: MyDataSourceReader@a3cd3ff schema: null

Thanks,
Shubham