You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Shubham Chaurasia <sh...@gmail.com> on 2019/09/19 13:24:48 UTC
Incorrect results in left_outer join in DSv2 implementation with
filter pushdown - spark 2.3.2
Hi,
Consider the following statements:
1)
> scala> val df = spark.read.format("com.shubham.MyDataSource").load
> scala> df.show
> +---+---+
> | i| j|
> +---+---+
> | 0| 0|
> | 1| -1|
> | 2| -2|
> | 3| -3|
> | 4| -4|
> +---+---+
> 2)
> scala> val df1 = df.filter("i < 3")
> scala> df1.show
> +---+---+
> | i| j|
> +---+---+
> | 0| 0|
> | 1| -1|
> | 2| -2|
> +---+---+
> 3)
> scala> df.join(df1, Seq("i"), "left_outer").show
> +---+---+---+
> | i| j| j|
> +---+---+---+
> | 1| -1| -1|
> | 2| -2| -2|
> | 0| 0| 0|
> +---+---+---+
3) is not producing the right results for left_outer join.
Here is the minimal code.
-------------------------------------------------------------------
public class MyDataSourceReader implements DataSourceReader,
SupportsPushDownFilters {
private Filter[] pushedFilters = new Filter[0];
private boolean hasFilters = false;
public MyDataSourceReader(Map<String, String> options) {
System.out.println("MyDataSourceReader.MyDataSourceReader:
Instantiated...." + this);
}
@Override
public StructType readSchema() {
return (new StructType())
.add("i", "int")
.add("j", "int");
}
@Override
public Filter[] pushFilters(Filter[] filters) {
System.out.println("MyDataSourceReader.pushFilters: " +
Arrays.toString(filters));
hasFilters = true;
pushedFilters = filters;
// filter's that can't be pushed down.
return new Filter[0];
}
@Override
public Filter[] pushedFilters() {
return pushedFilters;
}
@Override
public List<DataReaderFactory<Row>> createDataReaderFactories() {
System.out.println("=======MyDataSourceReader.createBatchDataReaderFactories=======");
int ltFilter = Integer.MAX_VALUE;
if (hasFilters) {
ltFilter = getLTFilter("i");
}
hasFilters = false;
return Lists.newArrayList(new SimpleDataReaderFactory(0, 5, ltFilter));
}
private int getLTFilter(String attributeName) {
int filterValue = Integer.MAX_VALUE;
for (Filter pushedFilter : pushedFilters) {
if (pushedFilter instanceof LessThan) {
LessThan lt = (LessThan) pushedFilter;
if (lt.attribute().equals(attributeName)) {
filterValue = (int) lt.value();
}
}
}
return filterValue;
}
}
------------------------------------------------------------
public class SimpleDataReaderFactory implements DataReaderFactory<Row> {
private final int start;
private final int end;
private int current;
private final int iLTFilter;
public SimpleDataReaderFactory(int start, int end, int iLTFilter) {
this.start = start;
this.end = end;
this.iLTFilter = iLTFilter;
}
@Override
public DataReader<Row> createDataReader() {
return new SimpleDataReader(start, end, iLTFilter);
}
public static class SimpleDataReader implements DataReader<Row> {
private final int start;
private final int end;
private int current;
private int iLTFilter;
public SimpleDataReader(int start, int end, int iLTFilter) {
this.start = start;
this.end = end;
this.current = start - 1;
this.iLTFilter = iLTFilter;
}
@Override
public boolean next() {
current++;
return current < end && current < iLTFilter ;
}
@Override
public Row get() {
return new GenericRow(new Object[]{current, -current});
}
@Override
public void close() {
}
}
}
------------------------------------------------------------
It seems that somehow spark is applying filter (i < 3) after left_join
operation too because of which we see incorrect results in 3).
However I don't see any filter node after join in plan.
== Physical Plan ==
> *(5) Project [i#136, j#137, j#228]
> +- SortMergeJoin [i#136], [i#227], LeftOuter
> :- *(2) Sort [i#136 ASC NULLS FIRST], false, 0
> : +- Exchange hashpartitioning(i#136, 200)
> : +- *(1) DataSourceV2Scan [i#136, j#137],
> com.shubham.reader.MyDataSourceReader@714bd7ad
> +- *(4) Sort [i#227 ASC NULLS FIRST], false, 0
> +- ReusedExchange [i#227, j#228], Exchange hashpartitioning(i#136,
> 200)
Any ideas what might be going wrong?
Thanks,
Shubham