You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Markus Riedl-Ehrenleitner (Jira)" <ji...@apache.org> on 2021/03/23 14:06:00 UTC
[jira] [Created] (SPARK-34836) DataSourceV2Relation with column
filter fails with ClassCastException at collectAsList
Markus Riedl-Ehrenleitner created SPARK-34836:
-------------------------------------------------
Summary: DataSourceV2Relation with column filter fails with ClassCastException at collectAsList
Key: SPARK-34836
URL: https://issues.apache.org/jira/browse/SPARK-34836
Project: Spark
Issue Type: Bug
Components: Spark Core
Affects Versions: 3.1.1
Reporter: Markus Riedl-Ehrenleitner
After trying to upgrade to 3.1.1. multiple of our test cases fail with a ClassCastException at *DataFrame.collectAsList()*
Produced exception:
{noformat}
java.lang.ClassCastException: class org.apache.spark.sql.catalyst.expressions.GenericInternalRow cannot be cast to class org.apache.spark.sql.catalyst.expressions.UnsafeRow (org.apache.spark.sql.catalyst.expressions.GenericInternalRow and org.apache.spark.sql.catalyst.expressions.UnsafeRow are in unnamed module of loader 'app')
at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:346) ~[spark-sql_2.12-3.1.1.jar:3.1.1]
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898) ~[spark-core_2.12-3.1.1.jar:3.1.1]
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898) ~[spark-core_2.12-3.1.1.jar:3.1.1]
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) ~[spark-core_2.12-3.1.1.jar:3.1.1]
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) ~[spark-core_2.12-3.1.1.jar:3.1.1]
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) ~[spark-core_2.12-3.1.1.jar:3.1.1]
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) ~[spark-core_2.12-3.1.1.jar:3.1.1]
at org.apache.spark.scheduler.Task.run(Task.scala:131) ~[spark-core_2.12-3.1.1.jar:3.1.1]
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497) ~[spark-core_2.12-3.1.1.jar:3.1.1]
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439) ~[spark-core_2.12-3.1.1.jar:3.1.1]
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500) [spark-core_2.12-3.1.1.jar:3.1.1]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
at java.lang.Thread.run(Thread.java:834) [?:?]
2021-03-23 13:00:26.974 WARN [org.apache.spark.scheduler.TaskSetManager] Lost task 0.0 in stage 0.0 (TID 0) (192.168.0.6 executor driver): java.lang.ClassCastException: class org.apache.spark.sql.catalyst.expressions.GenericInternalRow cannot be cast to class org.apache.spark.sql.catalyst.expressions.UnsafeRow (org.apache.spark.sql.catalyst.expressions.GenericInternalRow and org.apache.spark.sql.catalyst.expressions.UnsafeRow are in unnamed module of loader 'app')
at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:346)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834){noformat}
All test cases follow this pattern:
{code:java}
StructField aDouble = DataTypes.createStructField("aDouble", DataTypes.DoubleType, true);
StructField aDoubleArray = DataTypes.createStructField("aDoubleArray", DataTypes.createArrayType(DataTypes.DoubleType), true);
StructType schema = DataTypes.createStructType(Arrays.asList(aDouble, aDoubleArray));
Column aDoubleFilter = functions.column("aDouble").equalTo(functions.lit(1d));
Column aDoubleArrayFilter = functions.aggregate(
functions.col("aDoubleArray"),
functions.lit(false),
(seed, column) -> seed.or(column.equalTo(functions.lit(1d))));
Column filter = aDoubleFilter.or(aDoubleArrayFilter);
CaseInsensitiveStringMap options = new CaseInsensitiveStringMap(new HashMap<>());
Dataset<Row> dataset = Dataset.ofRows(sparkSession, DataSourceV2Relation.create(new ReadTable(schema), null, null, options));
Dataset<Row> filtered = dataset.filter(filter);
List<Row> collected = filtered.collectAsList();
{code}
ReadTable is a *org.apache.spark.sql.connector.catalog.Table*, with the given schema above.
*PartitionReader* returns these rows:
{code:java}
InternalRow.fromSeq(
JavaConverters.asScalaBuffer(Arrays.asList(1d, ArrayData.toArrayData(new double[]{1d, 2d})))
)
InternalRow.fromSeq(
JavaConverters.asScalaBuffer(Arrays.asList(3d, ArrayData.toArrayData(new double[]{4d, 5d})))
)
{code}
I couldn't reproduce the failure with e.g.,
{code:java}
List<Row> rows = Arrays.asList(
RowFactory.create(1d, Arrays.asList(1d, 2d)),
RowFactory.create(2d, Arrays.asList(3d, 4d))
);
Dataset<Row> dataFrame = sparkSession.createDataFrame(rows, schema);{code}
The tests succeed with 3.0.2.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org