You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Shubham Chaurasia (Jira)" <ji...@apache.org> on 2020/02/03 14:05:00 UTC

[jira] [Updated] (SPARK-30714) DSV2: Vectorized datasource does not have handling for ProlepticCalendar

     [ https://issues.apache.org/jira/browse/SPARK-30714?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Shubham Chaurasia updated SPARK-30714:
--------------------------------------
    Description: 
Consider the following scenarios - 
1)
{code:scala}
scala> spark.read.format("com.shubham.MyDataSource").option("ts_millis", "1580736255261").load.show(false)
MyDataSourceReader.createDataReaderFactories: ts_millis:  1580736255261
Using LocalDateTime: 2020-02-03T13:24:15
+-----------------------+
|my_ts                  |
+-----------------------+
|2020-02-03 13:24:15.261|
+-----------------------+
{code}

In above output, we can see that both the timestamps (the one logged and the one in dataframe) are equal(upto seconds)

2)
{code:scala}
scala> spark.read.format("com.shubham.MyDataSource").option("ts_millis", "-62135596800000").load.show(false)
MyDataSourceReader.createDataReaderFactories: ts_millis:  -62135596800000
Using LocalDateTime: 0001-01-01T00:00
+-------------------+
|my_ts              |
+-------------------+
|0001-01-03 00:00:00|
+-------------------+
{code}

Here in 2), we can see that timestamp coming from DataReader is not converted properly according to proleptic calendar and hence the two timestamps are different. 

Code to Repro
DataSourceReader 
{code:java}
public class MyDataSourceReader implements DataSourceReader, SupportsScanColumnarBatch {

  private StructType schema;

  private long timestampToProduce = -62135596800000L;

  public MyDataSourceReader(Map<String, String> options) {
    initOptions(options);
  }

  private void initOptions(Map<String, String> options) {
    String ts = options.get("ts_millis");
    if (ts != null) {
      timestampToProduce = Long.parseLong(ts);
    }
  }

  @Override public StructType readSchema() {
    StructField[] fields = new StructField[1];
    fields[0] = new StructField("my_ts", DataTypes.TimestampType, true, Metadata.empty());
    schema = new StructType(fields);
    return schema;
  }

  @Override public List<DataReaderFactory<ColumnarBatch>> createBatchDataReaderFactories() {
    System.out.println("MyDataSourceReader.createDataReaderFactories: ts_millis:  " + timestampToProduce);
    System.out.println("Using LocalDateTime: " +  LocalDateTime.ofEpochSecond(timestampToProduce/1000, 0, ZoneOffset.UTC));
    List<DataReaderFactory<ColumnarBatch>> dataReaderFactories = new ArrayList<>();
    dataReaderFactories.add(new MyVectorizedTSProducerFactory(schema, timestampToProduce));
    return dataReaderFactories;
  }
{code}

DataReaderFactory & DataReader
{code:java}
public class MyVectorizedTSProducerFactory implements DataReaderFactory<ColumnarBatch> {

  private final StructType schema;
  private long timestampValueToProduce;

  public MyVectorizedTSProducerFactory(StructType schema, long timestampValueToProduce) {
    this.schema = schema;
    this.timestampValueToProduce = timestampValueToProduce;
  }

  @Override public DataReader<ColumnarBatch> createDataReader() {
    return new MyVectorizedProducerReader(schema, timestampValueToProduce);
  }

  public static class MyVectorizedProducerReader implements DataReader<ColumnarBatch> {

    private final StructType schema;
    private long timestampValueToProduce;

    private ColumnarBatch columnarBatch;

    // return just one batch for now
    private boolean batchRemaining = true;

    public MyVectorizedProducerReader(StructType schema, long timestampValueToProduce) {
      this.schema = schema;
      this.timestampValueToProduce = timestampValueToProduce;
    }

    @Override public boolean next() {
      return batchRemaining;
    }

    @Override public ColumnarBatch get() {
      batchRemaining = false;
      OnHeapColumnVector[] onHeapColumnVectors = OnHeapColumnVector.allocateColumns(1, schema);
      for (OnHeapColumnVector vector : onHeapColumnVectors) {
        // convert millis to micros
        vector.putLong(0, timestampValueToProduce * 1000);
      }

      columnarBatch = new ColumnarBatch(onHeapColumnVectors);
      columnarBatch.setNumRows(1);
      return columnarBatch;
    }

    @Override public void close() {
      if (columnarBatch != null) {
        columnarBatch.close();
      }
    }
  }
}
{code}

Any workarounds/solutions for this? 



  was:
Consider the following scenarios - 
1)
{code:scala}
scala> spark.read.format("com.shubham.MyDataSource").option("ts_millis", "1580736255261").load.show(false)
MyDataSourceReader.createDataReaderFactories: ts_millis:  1580736255261
Using LocalDateTime: 2020-02-03T13:24:15
+-----------------------+
|my_ts                  |
+-----------------------+
|2020-02-03 13:24:15.261|
+-----------------------+
{code}

In above output, we can see that both the timestamps (the one logged and the one in dataframe).

2)
{code:scala}
scala> spark.read.format("com.shubham.MyDataSource").option("ts_millis", "-62135596800000").load.show(false)
MyDataSourceReader.createDataReaderFactories: ts_millis:  -62135596800000
Using LocalDateTime: 0001-01-01T00:00
+-------------------+
|my_ts              |
+-------------------+
|0001-01-03 00:00:00|
+-------------------+
{code}

Here in 2), we can see that timestamp coming from DataReader is not converted properly according to proleptic calendar and hence the two timestamps are different. 

Code to Repro
DataSourceReader 
{code:java}
public class MyDataSourceReader implements DataSourceReader, SupportsScanColumnarBatch {

  private StructType schema;

  private long timestampToProduce = -62135596800000L;

  public MyDataSourceReader(Map<String, String> options) {
    initOptions(options);
  }

  private void initOptions(Map<String, String> options) {
    String ts = options.get("ts_millis");
    if (ts != null) {
      timestampToProduce = Long.parseLong(ts);
    }
  }

  @Override public StructType readSchema() {
    StructField[] fields = new StructField[1];
    fields[0] = new StructField("my_ts", DataTypes.TimestampType, true, Metadata.empty());
    schema = new StructType(fields);
    return schema;
  }

  @Override public List<DataReaderFactory<ColumnarBatch>> createBatchDataReaderFactories() {
    System.out.println("MyDataSourceReader.createDataReaderFactories: ts_millis:  " + timestampToProduce);
    System.out.println("Using LocalDateTime: " +  LocalDateTime.ofEpochSecond(timestampToProduce/1000, 0, ZoneOffset.UTC));
    List<DataReaderFactory<ColumnarBatch>> dataReaderFactories = new ArrayList<>();
    dataReaderFactories.add(new MyVectorizedTSProducerFactory(schema, timestampToProduce));
    return dataReaderFactories;
  }
{code}

DataReaderFactory & DataReader
{code:java}
public class MyVectorizedTSProducerFactory implements DataReaderFactory<ColumnarBatch> {

  private final StructType schema;
  private long timestampValueToProduce;

  public MyVectorizedTSProducerFactory(StructType schema, long timestampValueToProduce) {
    this.schema = schema;
    this.timestampValueToProduce = timestampValueToProduce;
  }

  @Override public DataReader<ColumnarBatch> createDataReader() {
    return new MyVectorizedProducerReader(schema, timestampValueToProduce);
  }

  public static class MyVectorizedProducerReader implements DataReader<ColumnarBatch> {

    private final StructType schema;
    private long timestampValueToProduce;

    private ColumnarBatch columnarBatch;

    // return just one batch for now
    private boolean batchRemaining = true;

    public MyVectorizedProducerReader(StructType schema, long timestampValueToProduce) {
      this.schema = schema;
      this.timestampValueToProduce = timestampValueToProduce;
    }

    @Override public boolean next() {
      return batchRemaining;
    }

    @Override public ColumnarBatch get() {
      batchRemaining = false;
      OnHeapColumnVector[] onHeapColumnVectors = OnHeapColumnVector.allocateColumns(1, schema);
      for (OnHeapColumnVector vector : onHeapColumnVectors) {
        // convert millis to micros
        vector.putLong(0, timestampValueToProduce * 1000);
      }

      columnarBatch = new ColumnarBatch(onHeapColumnVectors);
      columnarBatch.setNumRows(1);
      return columnarBatch;
    }

    @Override public void close() {
      if (columnarBatch != null) {
        columnarBatch.close();
      }
    }
  }
}
{code}

Any workarounds/solutions for this? 




> DSV2: Vectorized datasource does not have handling for ProlepticCalendar
> ------------------------------------------------------------------------
>
>                 Key: SPARK-30714
>                 URL: https://issues.apache.org/jira/browse/SPARK-30714
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.3.2
>            Reporter: Shubham Chaurasia
>            Priority: Major
>
> Consider the following scenarios - 
> 1)
> {code:scala}
> scala> spark.read.format("com.shubham.MyDataSource").option("ts_millis", "1580736255261").load.show(false)
> MyDataSourceReader.createDataReaderFactories: ts_millis:  1580736255261
> Using LocalDateTime: 2020-02-03T13:24:15
> +-----------------------+
> |my_ts                  |
> +-----------------------+
> |2020-02-03 13:24:15.261|
> +-----------------------+
> {code}
> In above output, we can see that both the timestamps (the one logged and the one in dataframe) are equal(upto seconds)
> 2)
> {code:scala}
> scala> spark.read.format("com.shubham.MyDataSource").option("ts_millis", "-62135596800000").load.show(false)
> MyDataSourceReader.createDataReaderFactories: ts_millis:  -62135596800000
> Using LocalDateTime: 0001-01-01T00:00
> +-------------------+
> |my_ts              |
> +-------------------+
> |0001-01-03 00:00:00|
> +-------------------+
> {code}
> Here in 2), we can see that timestamp coming from DataReader is not converted properly according to proleptic calendar and hence the two timestamps are different. 
> Code to Repro
> DataSourceReader 
> {code:java}
> public class MyDataSourceReader implements DataSourceReader, SupportsScanColumnarBatch {
>   private StructType schema;
>   private long timestampToProduce = -62135596800000L;
>   public MyDataSourceReader(Map<String, String> options) {
>     initOptions(options);
>   }
>   private void initOptions(Map<String, String> options) {
>     String ts = options.get("ts_millis");
>     if (ts != null) {
>       timestampToProduce = Long.parseLong(ts);
>     }
>   }
>   @Override public StructType readSchema() {
>     StructField[] fields = new StructField[1];
>     fields[0] = new StructField("my_ts", DataTypes.TimestampType, true, Metadata.empty());
>     schema = new StructType(fields);
>     return schema;
>   }
>   @Override public List<DataReaderFactory<ColumnarBatch>> createBatchDataReaderFactories() {
>     System.out.println("MyDataSourceReader.createDataReaderFactories: ts_millis:  " + timestampToProduce);
>     System.out.println("Using LocalDateTime: " +  LocalDateTime.ofEpochSecond(timestampToProduce/1000, 0, ZoneOffset.UTC));
>     List<DataReaderFactory<ColumnarBatch>> dataReaderFactories = new ArrayList<>();
>     dataReaderFactories.add(new MyVectorizedTSProducerFactory(schema, timestampToProduce));
>     return dataReaderFactories;
>   }
> {code}
> DataReaderFactory & DataReader
> {code:java}
> public class MyVectorizedTSProducerFactory implements DataReaderFactory<ColumnarBatch> {
>   private final StructType schema;
>   private long timestampValueToProduce;
>   public MyVectorizedTSProducerFactory(StructType schema, long timestampValueToProduce) {
>     this.schema = schema;
>     this.timestampValueToProduce = timestampValueToProduce;
>   }
>   @Override public DataReader<ColumnarBatch> createDataReader() {
>     return new MyVectorizedProducerReader(schema, timestampValueToProduce);
>   }
>   public static class MyVectorizedProducerReader implements DataReader<ColumnarBatch> {
>     private final StructType schema;
>     private long timestampValueToProduce;
>     private ColumnarBatch columnarBatch;
>     // return just one batch for now
>     private boolean batchRemaining = true;
>     public MyVectorizedProducerReader(StructType schema, long timestampValueToProduce) {
>       this.schema = schema;
>       this.timestampValueToProduce = timestampValueToProduce;
>     }
>     @Override public boolean next() {
>       return batchRemaining;
>     }
>     @Override public ColumnarBatch get() {
>       batchRemaining = false;
>       OnHeapColumnVector[] onHeapColumnVectors = OnHeapColumnVector.allocateColumns(1, schema);
>       for (OnHeapColumnVector vector : onHeapColumnVectors) {
>         // convert millis to micros
>         vector.putLong(0, timestampValueToProduce * 1000);
>       }
>       columnarBatch = new ColumnarBatch(onHeapColumnVectors);
>       columnarBatch.setNumRows(1);
>       return columnarBatch;
>     }
>     @Override public void close() {
>       if (columnarBatch != null) {
>         columnarBatch.close();
>       }
>     }
>   }
> }
> {code}
> Any workarounds/solutions for this? 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org