You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Shubham Chaurasia (Jira)" <ji...@apache.org> on 2020/02/03 13:54:01 UTC

[jira] [Created] (SPARK-30714) DSV2: Vectorized datasource does not have handling for ProlepticCalendar

Shubham Chaurasia created SPARK-30714:
-----------------------------------------

             Summary: DSV2: Vectorized datasource does not have handling for ProlepticCalendar
                 Key: SPARK-30714
                 URL: https://issues.apache.org/jira/browse/SPARK-30714
             Project: Spark
          Issue Type: Bug
          Components: SQL
    Affects Versions: 2.3.2
            Reporter: Shubham Chaurasia


Consider the following scenarios - 
1)
{code:scala}
scala> spark.read.format("com.shubham.MyDataSource").option("ts_millis", "1580736255261").load.show(false)
MyDataSourceReader.createDataReaderFactories: ts_millis:  1580736255261
Using LocalDateTime: 2020-02-03T13:24:15
+-----------------------+
|my_ts                  |
+-----------------------+
|2020-02-03 13:24:15.261|
+-----------------------+
{code}

In above output, we can see that both the timestamps (the one logged and the one in dataframe).

2)
{code:scala}
scala> spark.read.format("com.shubham.MyDataSource").option("ts_millis", "-62135596800000").load.show(false)
MyDataSourceReader.createDataReaderFactories: ts_millis:  -62135596800000
Using LocalDateTime: 0001-01-01T00:00
+-------------------+
|my_ts              |
+-------------------+
|0001-01-03 00:00:00|
+-------------------+
{code}

Here in 2), we can see that timestamp coming from DataReader is not converted properly according to proleptic calendar and hence the two timestamps are different. 

Code to Repro
DataSourceReader 
{code:java}
public class MyDataSourceReader implements DataSourceReader, SupportsScanColumnarBatch {

  private StructType schema;

  private long timestampToProduce = -62135596800000L;

  public MyDataSourceReader(Map<String, String> options) {
    initOptions(options);
  }

  private void initOptions(Map<String, String> options) {
    String ts = options.get("ts_millis");
    if (ts != null) {
      timestampToProduce = Long.parseLong(ts);
    }
  }

  @Override public StructType readSchema() {
    StructField[] fields = new StructField[1];
    fields[0] = new StructField("my_ts", DataTypes.TimestampType, true, Metadata.empty());
    schema = new StructType(fields);
    return schema;
  }

  @Override public List<DataReaderFactory<ColumnarBatch>> createBatchDataReaderFactories() {
    System.out.println("MyDataSourceReader.createDataReaderFactories: ts_millis:  " + timestampToProduce);
    System.out.println("Using LocalDateTime: " +  LocalDateTime.ofEpochSecond(timestampToProduce/1000, 0, ZoneOffset.UTC));
    List<DataReaderFactory<ColumnarBatch>> dataReaderFactories = new ArrayList<>();
    dataReaderFactories.add(new MyVectorizedTSProducerFactory(schema, timestampToProduce));
    return dataReaderFactories;
  }
{code}

DataReaderFactory & DataReader
{code:java}
public class MyVectorizedTSProducerFactory implements DataReaderFactory<ColumnarBatch> {

  private final StructType schema;
  private long timestampValueToProduce;

  public MyVectorizedTSProducerFactory(StructType schema, long timestampValueToProduce) {
    this.schema = schema;
    this.timestampValueToProduce = timestampValueToProduce;
  }

  @Override public DataReader<ColumnarBatch> createDataReader() {
    return new MyVectorizedProducerReader(schema, timestampValueToProduce);
  }

  public static class MyVectorizedProducerReader implements DataReader<ColumnarBatch> {

    private final StructType schema;
    private long timestampValueToProduce;

    private ColumnarBatch columnarBatch;

    // return just one batch for now
    private boolean batchRemaining = true;

    public MyVectorizedProducerReader(StructType schema, long timestampValueToProduce) {
      this.schema = schema;
      this.timestampValueToProduce = timestampValueToProduce;
    }

    @Override public boolean next() {
      return batchRemaining;
    }

    @Override public ColumnarBatch get() {
      batchRemaining = false;
      OnHeapColumnVector[] onHeapColumnVectors = OnHeapColumnVector.allocateColumns(1, schema);
      for (OnHeapColumnVector vector : onHeapColumnVectors) {
        // convert millis to micros
        vector.putLong(0, timestampValueToProduce * 1000);
      }

      columnarBatch = new ColumnarBatch(onHeapColumnVectors);
      columnarBatch.setNumRows(1);
      return columnarBatch;
    }

    @Override public void close() {
      if (columnarBatch != null) {
        columnarBatch.close();
      }
    }
  }
}
{code}

Any workarounds/solutions for this? 





--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org