You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "AJ Bousquet (Jira)" <ji...@apache.org> on 2022/01/29 00:02:00 UTC
[jira] [Commented] (SPARK-38059) Incorrect query ordering with flatMap() and distinct()
[ https://issues.apache.org/jira/browse/SPARK-38059?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17484023#comment-17484023 ]
AJ Bousquet commented on SPARK-38059:
-------------------------------------
I ported this to use the Spark Java Quick Start stand-alone app and see the same results:
{code:java}
/* SimpleApp.java */
import org.apache.spark.sql.SparkSession;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.catalyst.encoders.RowEncoder;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;
import java.util.stream.LongStream;
import java.util.List;
import java.util.stream.Collectors;
public class SimpleApp {
public static void main(String[] args) {
SparkSession spark = SparkSession.builder().appName("Simple Application").getOrCreate();
StructType idSchema = DataTypes.createStructType(List.of(DataTypes.createStructField("id", DataTypes.LongType, false)));
StructType flatMapSchema = DataTypes.createStructType(List.of(
DataTypes.createStructField("id", DataTypes.LongType, false),
DataTypes.createStructField("subId", DataTypes.LongType, false)
));
Dataset<Row> inputDataset = spark.createDataset(
LongStream.range(0,5).mapToObj((id) -> RowFactory.create(id)).collect(Collectors.toList()),
RowEncoder.apply(idSchema)
);
inputDataset
.distinct()
.limit(2)
.flatMap((Row row) -> {
Long id = row.getLong(row.fieldIndex("id"));
return LongStream.range(6,8).mapToObj((subid) -> RowFactory.create(id, subid)).iterator();
}, RowEncoder.apply(flatMapSchema))
.write()
.mode("overwrite")
.csv("/Volumes/git/spark_38059/output_csv");
spark.stop();
}
}
{code}
> Incorrect query ordering with flatMap() and distinct()
> ------------------------------------------------------
>
> Key: SPARK-38059
> URL: https://issues.apache.org/jira/browse/SPARK-38059
> Project: Spark
> Issue Type: Bug
> Components: Optimizer
> Affects Versions: 3.0.2, 3.2.0
> Reporter: AJ Bousquet
> Priority: Major
>
> I have a Dataset of non-unique identifiers that I can use with {{Dataset::flatMap()}} to create multiple rows with sub-identifiers for each id. When I run the code below, the {{limit(2)}} call is placed _after_ the call to {{flatMap()}} in the optimized logical plan. This unexpectedly yields only 2 rows, when I would expect it to yield 6.
> {code:java}
> StructType idSchema = DataTypes.createStructType(List.of(DataTypes.createStructField("id", DataTypes.LongType, false)));
> StructType flatMapSchema = DataTypes.createStructType(List.of(
> DataTypes.createStructField("id", DataTypes.LongType, false),
> DataTypes.createStructField("subId", DataTypes.LongType, false)
> ));Dataset<Row> inputDataset = context.sparkSession().createDataset(
> LongStream.range(0,5).mapToObj((id) -> RowFactory.create(id)).collect(Collectors.toList()),
> RowEncoder.apply(idSchema)
> );
> return inputDataset
> .distinct()
> .limit(2)
> .flatMap((Row row) -> {
> Long id = row.getLong(row.fieldIndex("id")); return LongStream.range(6,8).mapToObj((subid) -> RowFactory.create(id, subid)).iterator();
> }, RowEncoder.apply(flatMapSchema)); {code}
> When run, the above code produces something like:
> ||id||subID||
> |0|6|
> |0|7|
> But I would expect something like:
> ||id||subID||
> |1|6|
> |1|7|
> |1|8|
> |0|6|
> |0|7|
> |0|8|
--
This message was sent by Atlassian Jira
(v8.20.1#820001)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org