You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Shubham Chaurasia <sh...@gmail.com> on 2019/09/19 13:24:48 UTC

Incorrect results in left_outer join in DSv2 implementation with filter pushdown - spark 2.3.2

Hi,

Consider the following statements:

1)
> scala> val df = spark.read.format("com.shubham.MyDataSource").load
> scala> df.show
> +---+---+
> |  i|  j|
> +---+---+
> |  0|  0|
> |  1| -1|
> |  2| -2|
> |  3| -3|
> |  4| -4|
> +---+---+
> 2)
> scala> val df1 = df.filter("i < 3")
> scala> df1.show
> +---+---+
> |  i|  j|
> +---+---+
> |  0|  0|
> |  1| -1|
> |  2| -2|
> +---+---+
> 3)
> scala> df.join(df1, Seq("i"), "left_outer").show
> +---+---+---+
> |  i|  j|  j|
> +---+---+---+
> |  1| -1| -1|
> |  2| -2| -2|
> |  0|  0|  0|
> +---+---+---+


3) is not producing the right results for left_outer join.

Here is the minimal code.

-------------------------------------------------------------------

public class MyDataSourceReader implements DataSourceReader,
SupportsPushDownFilters {

  private Filter[] pushedFilters = new Filter[0];
  private boolean hasFilters = false;

  public MyDataSourceReader(Map<String, String> options) {
    System.out.println("MyDataSourceReader.MyDataSourceReader:
Instantiated...." + this);
  }

  @Override
  public StructType readSchema() {
    return (new StructType())
        .add("i", "int")
        .add("j", "int");
  }

  @Override
  public Filter[] pushFilters(Filter[] filters) {
    System.out.println("MyDataSourceReader.pushFilters: " +
Arrays.toString(filters));
    hasFilters = true;
    pushedFilters = filters;
    // filter's that can't be pushed down.
    return new Filter[0];
  }

  @Override
  public Filter[] pushedFilters() {
    return pushedFilters;
  }

  @Override
  public List<DataReaderFactory<Row>> createDataReaderFactories() {
    System.out.println("=======MyDataSourceReader.createBatchDataReaderFactories=======");
    int ltFilter = Integer.MAX_VALUE;
    if (hasFilters) {
      ltFilter = getLTFilter("i");
    }
    hasFilters = false;
    return Lists.newArrayList(new SimpleDataReaderFactory(0, 5, ltFilter));
  }

  private int getLTFilter(String attributeName) {
    int filterValue = Integer.MAX_VALUE;
    for (Filter pushedFilter : pushedFilters) {
      if (pushedFilter instanceof LessThan) {
        LessThan lt = (LessThan) pushedFilter;
        if (lt.attribute().equals(attributeName)) {
          filterValue = (int) lt.value();
        }
      }
    }
    return filterValue;
  }

}

------------------------------------------------------------

public class SimpleDataReaderFactory implements DataReaderFactory<Row> {

  private final int start;
  private final int end;
  private int current;
  private final int iLTFilter;

  public SimpleDataReaderFactory(int start, int end, int iLTFilter) {
    this.start = start;
    this.end = end;
    this.iLTFilter = iLTFilter;
  }

  @Override
  public DataReader<Row> createDataReader() {
    return new SimpleDataReader(start, end, iLTFilter);
  }

  public static class SimpleDataReader implements DataReader<Row> {
    private final int start;
    private final int end;
    private int current;
    private int iLTFilter;

    public SimpleDataReader(int start, int end, int iLTFilter) {
      this.start = start;
      this.end = end;
      this.current = start - 1;
      this.iLTFilter = iLTFilter;
    }
    @Override
    public boolean next() {
      current++;
      return current < end && current < iLTFilter ;
    }
    @Override
    public Row get() {
      return new GenericRow(new Object[]{current, -current});
    }
    @Override
    public void close() {
    }
  }
}

------------------------------------------------------------

It seems that somehow spark is applying filter (i < 3) after left_join
operation too because of which we see incorrect results in 3).
However I don't see any filter node after join in plan.

== Physical Plan ==
> *(5) Project [i#136, j#137, j#228]
> +- SortMergeJoin [i#136], [i#227], LeftOuter
>    :- *(2) Sort [i#136 ASC NULLS FIRST], false, 0
>    :  +- Exchange hashpartitioning(i#136, 200)
>    :     +- *(1) DataSourceV2Scan [i#136, j#137],
> com.shubham.reader.MyDataSourceReader@714bd7ad
>    +- *(4) Sort [i#227 ASC NULLS FIRST], false, 0
>       +- ReusedExchange [i#227, j#228], Exchange hashpartitioning(i#136,
> 200)


Any ideas what might be going wrong?

Thanks,
Shubham