You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Shubham Chaurasia (Jira)" <ji...@apache.org> on 2020/02/03 14:05:00 UTC
[jira] [Updated] (SPARK-30714) DSV2: Vectorized datasource does not
have handling for ProlepticCalendar
[ https://issues.apache.org/jira/browse/SPARK-30714?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Shubham Chaurasia updated SPARK-30714:
--------------------------------------
Description:
Consider the following scenarios -
1)
{code:scala}
scala> spark.read.format("com.shubham.MyDataSource").option("ts_millis", "1580736255261").load.show(false)
MyDataSourceReader.createDataReaderFactories: ts_millis: 1580736255261
Using LocalDateTime: 2020-02-03T13:24:15
+-----------------------+
|my_ts |
+-----------------------+
|2020-02-03 13:24:15.261|
+-----------------------+
{code}
In above output, we can see that both the timestamps (the one logged and the one in dataframe) are equal(upto seconds)
2)
{code:scala}
scala> spark.read.format("com.shubham.MyDataSource").option("ts_millis", "-62135596800000").load.show(false)
MyDataSourceReader.createDataReaderFactories: ts_millis: -62135596800000
Using LocalDateTime: 0001-01-01T00:00
+-------------------+
|my_ts |
+-------------------+
|0001-01-03 00:00:00|
+-------------------+
{code}
Here in 2), we can see that timestamp coming from DataReader is not converted properly according to proleptic calendar and hence the two timestamps are different.
Code to Repro
DataSourceReader
{code:java}
public class MyDataSourceReader implements DataSourceReader, SupportsScanColumnarBatch {
private StructType schema;
private long timestampToProduce = -62135596800000L;
public MyDataSourceReader(Map<String, String> options) {
initOptions(options);
}
private void initOptions(Map<String, String> options) {
String ts = options.get("ts_millis");
if (ts != null) {
timestampToProduce = Long.parseLong(ts);
}
}
@Override public StructType readSchema() {
StructField[] fields = new StructField[1];
fields[0] = new StructField("my_ts", DataTypes.TimestampType, true, Metadata.empty());
schema = new StructType(fields);
return schema;
}
@Override public List<DataReaderFactory<ColumnarBatch>> createBatchDataReaderFactories() {
System.out.println("MyDataSourceReader.createDataReaderFactories: ts_millis: " + timestampToProduce);
System.out.println("Using LocalDateTime: " + LocalDateTime.ofEpochSecond(timestampToProduce/1000, 0, ZoneOffset.UTC));
List<DataReaderFactory<ColumnarBatch>> dataReaderFactories = new ArrayList<>();
dataReaderFactories.add(new MyVectorizedTSProducerFactory(schema, timestampToProduce));
return dataReaderFactories;
}
{code}
DataReaderFactory & DataReader
{code:java}
public class MyVectorizedTSProducerFactory implements DataReaderFactory<ColumnarBatch> {
private final StructType schema;
private long timestampValueToProduce;
public MyVectorizedTSProducerFactory(StructType schema, long timestampValueToProduce) {
this.schema = schema;
this.timestampValueToProduce = timestampValueToProduce;
}
@Override public DataReader<ColumnarBatch> createDataReader() {
return new MyVectorizedProducerReader(schema, timestampValueToProduce);
}
public static class MyVectorizedProducerReader implements DataReader<ColumnarBatch> {
private final StructType schema;
private long timestampValueToProduce;
private ColumnarBatch columnarBatch;
// return just one batch for now
private boolean batchRemaining = true;
public MyVectorizedProducerReader(StructType schema, long timestampValueToProduce) {
this.schema = schema;
this.timestampValueToProduce = timestampValueToProduce;
}
@Override public boolean next() {
return batchRemaining;
}
@Override public ColumnarBatch get() {
batchRemaining = false;
OnHeapColumnVector[] onHeapColumnVectors = OnHeapColumnVector.allocateColumns(1, schema);
for (OnHeapColumnVector vector : onHeapColumnVectors) {
// convert millis to micros
vector.putLong(0, timestampValueToProduce * 1000);
}
columnarBatch = new ColumnarBatch(onHeapColumnVectors);
columnarBatch.setNumRows(1);
return columnarBatch;
}
@Override public void close() {
if (columnarBatch != null) {
columnarBatch.close();
}
}
}
}
{code}
Any workarounds/solutions for this?
was:
Consider the following scenarios -
1)
{code:scala}
scala> spark.read.format("com.shubham.MyDataSource").option("ts_millis", "1580736255261").load.show(false)
MyDataSourceReader.createDataReaderFactories: ts_millis: 1580736255261
Using LocalDateTime: 2020-02-03T13:24:15
+-----------------------+
|my_ts |
+-----------------------+
|2020-02-03 13:24:15.261|
+-----------------------+
{code}
In above output, we can see that both the timestamps (the one logged and the one in dataframe).
2)
{code:scala}
scala> spark.read.format("com.shubham.MyDataSource").option("ts_millis", "-62135596800000").load.show(false)
MyDataSourceReader.createDataReaderFactories: ts_millis: -62135596800000
Using LocalDateTime: 0001-01-01T00:00
+-------------------+
|my_ts |
+-------------------+
|0001-01-03 00:00:00|
+-------------------+
{code}
Here in 2), we can see that timestamp coming from DataReader is not converted properly according to proleptic calendar and hence the two timestamps are different.
Code to Repro
DataSourceReader
{code:java}
public class MyDataSourceReader implements DataSourceReader, SupportsScanColumnarBatch {
private StructType schema;
private long timestampToProduce = -62135596800000L;
public MyDataSourceReader(Map<String, String> options) {
initOptions(options);
}
private void initOptions(Map<String, String> options) {
String ts = options.get("ts_millis");
if (ts != null) {
timestampToProduce = Long.parseLong(ts);
}
}
@Override public StructType readSchema() {
StructField[] fields = new StructField[1];
fields[0] = new StructField("my_ts", DataTypes.TimestampType, true, Metadata.empty());
schema = new StructType(fields);
return schema;
}
@Override public List<DataReaderFactory<ColumnarBatch>> createBatchDataReaderFactories() {
System.out.println("MyDataSourceReader.createDataReaderFactories: ts_millis: " + timestampToProduce);
System.out.println("Using LocalDateTime: " + LocalDateTime.ofEpochSecond(timestampToProduce/1000, 0, ZoneOffset.UTC));
List<DataReaderFactory<ColumnarBatch>> dataReaderFactories = new ArrayList<>();
dataReaderFactories.add(new MyVectorizedTSProducerFactory(schema, timestampToProduce));
return dataReaderFactories;
}
{code}
DataReaderFactory & DataReader
{code:java}
public class MyVectorizedTSProducerFactory implements DataReaderFactory<ColumnarBatch> {
private final StructType schema;
private long timestampValueToProduce;
public MyVectorizedTSProducerFactory(StructType schema, long timestampValueToProduce) {
this.schema = schema;
this.timestampValueToProduce = timestampValueToProduce;
}
@Override public DataReader<ColumnarBatch> createDataReader() {
return new MyVectorizedProducerReader(schema, timestampValueToProduce);
}
public static class MyVectorizedProducerReader implements DataReader<ColumnarBatch> {
private final StructType schema;
private long timestampValueToProduce;
private ColumnarBatch columnarBatch;
// return just one batch for now
private boolean batchRemaining = true;
public MyVectorizedProducerReader(StructType schema, long timestampValueToProduce) {
this.schema = schema;
this.timestampValueToProduce = timestampValueToProduce;
}
@Override public boolean next() {
return batchRemaining;
}
@Override public ColumnarBatch get() {
batchRemaining = false;
OnHeapColumnVector[] onHeapColumnVectors = OnHeapColumnVector.allocateColumns(1, schema);
for (OnHeapColumnVector vector : onHeapColumnVectors) {
// convert millis to micros
vector.putLong(0, timestampValueToProduce * 1000);
}
columnarBatch = new ColumnarBatch(onHeapColumnVectors);
columnarBatch.setNumRows(1);
return columnarBatch;
}
@Override public void close() {
if (columnarBatch != null) {
columnarBatch.close();
}
}
}
}
{code}
Any workarounds/solutions for this?
> DSV2: Vectorized datasource does not have handling for ProlepticCalendar
> ------------------------------------------------------------------------
>
> Key: SPARK-30714
> URL: https://issues.apache.org/jira/browse/SPARK-30714
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 2.3.2
> Reporter: Shubham Chaurasia
> Priority: Major
>
> Consider the following scenarios -
> 1)
> {code:scala}
> scala> spark.read.format("com.shubham.MyDataSource").option("ts_millis", "1580736255261").load.show(false)
> MyDataSourceReader.createDataReaderFactories: ts_millis: 1580736255261
> Using LocalDateTime: 2020-02-03T13:24:15
> +-----------------------+
> |my_ts |
> +-----------------------+
> |2020-02-03 13:24:15.261|
> +-----------------------+
> {code}
> In above output, we can see that both the timestamps (the one logged and the one in dataframe) are equal(upto seconds)
> 2)
> {code:scala}
> scala> spark.read.format("com.shubham.MyDataSource").option("ts_millis", "-62135596800000").load.show(false)
> MyDataSourceReader.createDataReaderFactories: ts_millis: -62135596800000
> Using LocalDateTime: 0001-01-01T00:00
> +-------------------+
> |my_ts |
> +-------------------+
> |0001-01-03 00:00:00|
> +-------------------+
> {code}
> Here in 2), we can see that timestamp coming from DataReader is not converted properly according to proleptic calendar and hence the two timestamps are different.
> Code to Repro
> DataSourceReader
> {code:java}
> public class MyDataSourceReader implements DataSourceReader, SupportsScanColumnarBatch {
> private StructType schema;
> private long timestampToProduce = -62135596800000L;
> public MyDataSourceReader(Map<String, String> options) {
> initOptions(options);
> }
> private void initOptions(Map<String, String> options) {
> String ts = options.get("ts_millis");
> if (ts != null) {
> timestampToProduce = Long.parseLong(ts);
> }
> }
> @Override public StructType readSchema() {
> StructField[] fields = new StructField[1];
> fields[0] = new StructField("my_ts", DataTypes.TimestampType, true, Metadata.empty());
> schema = new StructType(fields);
> return schema;
> }
> @Override public List<DataReaderFactory<ColumnarBatch>> createBatchDataReaderFactories() {
> System.out.println("MyDataSourceReader.createDataReaderFactories: ts_millis: " + timestampToProduce);
> System.out.println("Using LocalDateTime: " + LocalDateTime.ofEpochSecond(timestampToProduce/1000, 0, ZoneOffset.UTC));
> List<DataReaderFactory<ColumnarBatch>> dataReaderFactories = new ArrayList<>();
> dataReaderFactories.add(new MyVectorizedTSProducerFactory(schema, timestampToProduce));
> return dataReaderFactories;
> }
> {code}
> DataReaderFactory & DataReader
> {code:java}
> public class MyVectorizedTSProducerFactory implements DataReaderFactory<ColumnarBatch> {
> private final StructType schema;
> private long timestampValueToProduce;
> public MyVectorizedTSProducerFactory(StructType schema, long timestampValueToProduce) {
> this.schema = schema;
> this.timestampValueToProduce = timestampValueToProduce;
> }
> @Override public DataReader<ColumnarBatch> createDataReader() {
> return new MyVectorizedProducerReader(schema, timestampValueToProduce);
> }
> public static class MyVectorizedProducerReader implements DataReader<ColumnarBatch> {
> private final StructType schema;
> private long timestampValueToProduce;
> private ColumnarBatch columnarBatch;
> // return just one batch for now
> private boolean batchRemaining = true;
> public MyVectorizedProducerReader(StructType schema, long timestampValueToProduce) {
> this.schema = schema;
> this.timestampValueToProduce = timestampValueToProduce;
> }
> @Override public boolean next() {
> return batchRemaining;
> }
> @Override public ColumnarBatch get() {
> batchRemaining = false;
> OnHeapColumnVector[] onHeapColumnVectors = OnHeapColumnVector.allocateColumns(1, schema);
> for (OnHeapColumnVector vector : onHeapColumnVectors) {
> // convert millis to micros
> vector.putLong(0, timestampValueToProduce * 1000);
> }
> columnarBatch = new ColumnarBatch(onHeapColumnVectors);
> columnarBatch.setNumRows(1);
> return columnarBatch;
> }
> @Override public void close() {
> if (columnarBatch != null) {
> columnarBatch.close();
> }
> }
> }
> }
> {code}
> Any workarounds/solutions for this?
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org