You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ch...@apache.org on 2023/03/23 01:24:09 UTC

[druid] branch master updated: Unnest changes for moving the filter on right side of correlate to inside the unnest datasource (#13934)

This is an automated email from the ASF dual-hosted git repository.

cheddar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 2ad133c06e Unnest changes for moving the filter on right side of correlate to inside the unnest datasource (#13934)
2ad133c06e is described below

commit 2ad133c06e01c2f6af8d930b6bdea2a6ad1c6ea3
Author: Soumyava <93...@users.noreply.github.com>
AuthorDate: Wed Mar 22 18:24:00 2023 -0700

    Unnest changes for moving the filter on right side of correlate to inside the unnest datasource (#13934)
    
    * Refactoring and bug fixes on top of unnest. The filter now is passed inside the unnest cursors. Added tests for scenarios such as
    1. filter on unnested column which involves a left filter rewrite
    2. filter on unnested virtual column which pushes the filter to the right only and involves no rewrite
    3. not filters
    4. SQL functions applied on top of unnested column
    5. null present in first row of the column to be unnested
---
 .../org/apache/druid/query/UnnestDataSource.java   |  60 ++-
 .../druid/segment/UnnestDimensionCursor.java       |   3 +
 .../druid/segment/UnnestSegmentReference.java      |  12 +-
 .../apache/druid/segment/UnnestStorageAdapter.java | 162 +++++--
 .../apache/druid/segment/join/PostJoinCursor.java  |  10 +
 .../apache/druid/query/QueryRunnerTestHelper.java  |   3 +-
 .../groupby/UnnestGroupByQueryRunnerTest.java      |  12 +-
 .../query/scan/UnnestScanQueryRunnerTest.java      |   6 +-
 .../query/topn/UnnestTopNQueryRunnerTest.java      |   6 +-
 .../druid/segment/UnnestStorageAdapterTest.java    | 154 ++++++-
 .../sql/calcite/rel/DruidCorrelateUnnestRel.java   |  26 +-
 .../apache/druid/sql/calcite/rel/DruidQuery.java   |   2 +-
 .../druid/sql/calcite/rel/DruidUnnestRel.java      |  20 +-
 .../rule/CorrelateFilterLTransposeRule.java        |   1 -
 .../rule/CorrelateFilterRTransposeRule.java        | 112 -----
 .../sql/calcite/rule/DruidFilterUnnestRule.java    | 109 +++++
 .../apache/druid/sql/calcite/rule/DruidRules.java  |   3 +-
 .../druid/sql/calcite/CalciteArraysQueryTest.java  | 505 +++++++++++++++++++--
 18 files changed, 986 insertions(+), 220 deletions(-)

diff --git a/processing/src/main/java/org/apache/druid/query/UnnestDataSource.java b/processing/src/main/java/org/apache/druid/query/UnnestDataSource.java
index d46cd226a8..acd984b644 100644
--- a/processing/src/main/java/org/apache/druid/query/UnnestDataSource.java
+++ b/processing/src/main/java/org/apache/druid/query/UnnestDataSource.java
@@ -23,12 +23,16 @@ import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.collect.ImmutableList;
 import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.query.filter.DimFilter;
 import org.apache.druid.query.planning.DataSourceAnalysis;
 import org.apache.druid.segment.SegmentReference;
 import org.apache.druid.segment.UnnestSegmentReference;
 import org.apache.druid.segment.VirtualColumn;
 import org.apache.druid.utils.JvmUtils;
 
+
+import javax.annotation.Nullable;
+
 import java.util.List;
 import java.util.Objects;
 import java.util.Set;
@@ -48,22 +52,29 @@ public class UnnestDataSource implements DataSource
   private final DataSource base;
   private final VirtualColumn virtualColumn;
 
+  @Nullable
+  private final DimFilter unnestFilter;
+
   private UnnestDataSource(
       DataSource dataSource,
-      VirtualColumn virtualColumn
+      VirtualColumn virtualColumn,
+      DimFilter unnestFilter
   )
   {
     this.base = dataSource;
     this.virtualColumn = virtualColumn;
+    this.unnestFilter = unnestFilter;
   }
 
   @JsonCreator
   public static UnnestDataSource create(
       @JsonProperty("base") DataSource base,
-      @JsonProperty("virtualColumn") VirtualColumn virtualColumn
+      @JsonProperty("virtualColumn") VirtualColumn virtualColumn,
+      @Nullable @JsonProperty("unnestFilter") DimFilter unnestFilter
+
   )
   {
-    return new UnnestDataSource(base, virtualColumn);
+    return new UnnestDataSource(base, virtualColumn, unnestFilter);
   }
 
   @JsonProperty("base")
@@ -78,6 +89,12 @@ public class UnnestDataSource implements DataSource
     return virtualColumn;
   }
 
+  @JsonProperty("unnestFilter")
+  public DimFilter getUnnestFilter()
+  {
+    return unnestFilter;
+  }
+
   @Override
   public Set<String> getTableNames()
   {
@@ -96,7 +113,8 @@ public class UnnestDataSource implements DataSource
     if (children.size() != 1) {
       throw new IAE("Expected [1] child, got [%d]", children.size());
     }
-    return new UnnestDataSource(children.get(0), virtualColumn);
+
+    return new UnnestDataSource(children.get(0), virtualColumn, unnestFilter);
   }
 
   @Override
@@ -133,7 +151,8 @@ public class UnnestDataSource implements DataSource
             baseSegment ->
                 new UnnestSegmentReference(
                     segmentMapFn.apply(baseSegment),
-                    virtualColumn
+                    virtualColumn,
+                    unnestFilter
                 )
     );
   }
@@ -141,7 +160,7 @@ public class UnnestDataSource implements DataSource
   @Override
   public DataSource withUpdatedDataSource(DataSource newSource)
   {
-    return new UnnestDataSource(newSource, virtualColumn);
+    return new UnnestDataSource(newSource, virtualColumn, unnestFilter);
   }
 
   @Override
@@ -162,6 +181,17 @@ public class UnnestDataSource implements DataSource
     return current.getAnalysis();
   }
 
+
+  @Override
+  public String toString()
+  {
+    return "UnnestDataSource{" +
+           "base=" + base +
+           ", column='" + virtualColumn + '\'' +
+           ", unnestFilter='" + unnestFilter + '\'' +
+           '}';
+  }
+
   @Override
   public boolean equals(Object o)
   {
@@ -172,25 +202,17 @@ public class UnnestDataSource implements DataSource
       return false;
     }
     UnnestDataSource that = (UnnestDataSource) o;
-    return virtualColumn.equals(that.virtualColumn)
-           && base.equals(that.base);
+    return base.equals(that.base) && virtualColumn.equals(that.virtualColumn) && Objects.equals(
+        unnestFilter,
+        that.unnestFilter
+    );
   }
 
   @Override
   public int hashCode()
   {
-    return Objects.hash(base, virtualColumn);
-  }
-
-  @Override
-  public String toString()
-  {
-    return "UnnestDataSource{" +
-           "base=" + base +
-           ", column='" + virtualColumn + '\'' +
-           '}';
+    return Objects.hash(base, virtualColumn, unnestFilter);
   }
-
 }
 
 
diff --git a/processing/src/main/java/org/apache/druid/segment/UnnestDimensionCursor.java b/processing/src/main/java/org/apache/druid/segment/UnnestDimensionCursor.java
index 6c4c1a9534..d69eca6109 100644
--- a/processing/src/main/java/org/apache/druid/segment/UnnestDimensionCursor.java
+++ b/processing/src/main/java/org/apache/druid/segment/UnnestDimensionCursor.java
@@ -142,6 +142,9 @@ public class UnnestDimensionCursor implements Cursor
               @Override
               public boolean matches()
               {
+                if (indexedIntsForCurrentRow == null) {
+                  return false;
+                }
                 if (indexedIntsForCurrentRow.size() <= 0) {
                   return false;
                 }
diff --git a/processing/src/main/java/org/apache/druid/segment/UnnestSegmentReference.java b/processing/src/main/java/org/apache/druid/segment/UnnestSegmentReference.java
index 8f38713900..25c91f1e7d 100644
--- a/processing/src/main/java/org/apache/druid/segment/UnnestSegmentReference.java
+++ b/processing/src/main/java/org/apache/druid/segment/UnnestSegmentReference.java
@@ -21,6 +21,7 @@ package org.apache.druid.segment;
 
 import org.apache.druid.java.util.common.io.Closer;
 import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.query.filter.DimFilter;
 import org.apache.druid.timeline.SegmentId;
 import org.apache.druid.utils.CloseableUtils;
 import org.joda.time.Interval;
@@ -41,14 +42,20 @@ public class UnnestSegmentReference implements SegmentReference
   private final SegmentReference baseSegment;
   private final VirtualColumn unnestColumn;
 
+  @Nullable
+  private final DimFilter unnestFilter;
+
+
 
   public UnnestSegmentReference(
       SegmentReference baseSegment,
-      VirtualColumn unnestColumn
+      VirtualColumn unnestColumn,
+      DimFilter unnestFilter
   )
   {
     this.baseSegment = baseSegment;
     this.unnestColumn = unnestColumn;
+    this.unnestFilter = unnestFilter;
   }
 
   @Override
@@ -100,7 +107,8 @@ public class UnnestSegmentReference implements SegmentReference
   {
     return new UnnestStorageAdapter(
         baseSegment.asStorageAdapter(),
-        unnestColumn
+        unnestColumn,
+        unnestFilter
     );
   }
 
diff --git a/processing/src/main/java/org/apache/druid/segment/UnnestStorageAdapter.java b/processing/src/main/java/org/apache/druid/segment/UnnestStorageAdapter.java
index 94c471d1a6..8506a99d02 100644
--- a/processing/src/main/java/org/apache/druid/segment/UnnestStorageAdapter.java
+++ b/processing/src/main/java/org/apache/druid/segment/UnnestStorageAdapter.java
@@ -27,17 +27,18 @@ import org.apache.druid.java.util.common.guava.Sequence;
 import org.apache.druid.java.util.common.guava.Sequences;
 import org.apache.druid.query.QueryMetrics;
 import org.apache.druid.query.filter.BooleanFilter;
+import org.apache.druid.query.filter.DimFilter;
 import org.apache.druid.query.filter.Filter;
 import org.apache.druid.query.filter.InDimFilter;
 import org.apache.druid.segment.column.ColumnCapabilities;
 import org.apache.druid.segment.column.ValueType;
 import org.apache.druid.segment.data.Indexed;
 import org.apache.druid.segment.data.ListIndexed;
-import org.apache.druid.segment.filter.AndFilter;
 import org.apache.druid.segment.filter.BoundFilter;
 import org.apache.druid.segment.filter.Filters;
 import org.apache.druid.segment.filter.LikeFilter;
 import org.apache.druid.segment.filter.NotFilter;
+import org.apache.druid.segment.filter.OrFilter;
 import org.apache.druid.segment.filter.SelectorFilter;
 import org.apache.druid.segment.join.PostJoinCursor;
 import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
@@ -59,18 +60,28 @@ import java.util.Set;
  */
 public class UnnestStorageAdapter implements StorageAdapter
 {
+  public StorageAdapter getBaseAdapter()
+  {
+    return baseAdapter;
+  }
+
   private final StorageAdapter baseAdapter;
   private final VirtualColumn unnestColumn;
   private final String outputColumnName;
 
+  @Nullable
+  private final DimFilter unnestFilter;
+
   public UnnestStorageAdapter(
       final StorageAdapter baseAdapter,
-      final VirtualColumn unnestColumn
+      final VirtualColumn unnestColumn,
+      final DimFilter unnestFilter
   )
   {
     this.baseAdapter = baseAdapter;
     this.unnestColumn = unnestColumn;
     this.outputColumnName = unnestColumn.getOutputName();
+    this.unnestFilter = unnestFilter;
   }
 
   @Override
@@ -83,9 +94,10 @@ public class UnnestStorageAdapter implements StorageAdapter
       @Nullable QueryMetrics<?> queryMetrics
   )
   {
-    final String inputColumn = getUnnestInputIfDirectAccess();
+    final String inputColumn = getUnnestInputIfDirectAccess(unnestColumn);
     final Pair<Filter, Filter> filterPair = computeBaseAndPostUnnestFilters(
         filter,
+        unnestFilter != null ? unnestFilter.toFilter() : null,
         virtualColumns,
         inputColumn,
         inputColumn == null || virtualColumns.exists(inputColumn)
@@ -168,6 +180,12 @@ public class UnnestStorageAdapter implements StorageAdapter
     return baseAdapter.getAvailableMetrics();
   }
 
+  @Nullable
+  public Filter getUnnestFilter()
+  {
+    return unnestFilter.toFilter();
+  }
+
   @Override
   public int getDimensionCardinality(String column)
   {
@@ -250,25 +268,81 @@ public class UnnestStorageAdapter implements StorageAdapter
    * Split queryFilter into pre- and post-correlate filters.
    *
    * @param queryFilter            query filter passed to makeCursors
+   * @param unnestFilter           filter on unnested column passed to PostUnnestCursor
    * @param queryVirtualColumns    query virtual columns passed to makeCursors
    * @param inputColumn            input column to unnest if it's a direct access; otherwise null
    * @param inputColumnCapabilites input column capabilities if known; otherwise null
-   *
    * @return pair of pre- and post-unnest filters
    */
-  private Pair<Filter, Filter> computeBaseAndPostUnnestFilters(
+  public Pair<Filter, Filter> computeBaseAndPostUnnestFilters(
       @Nullable final Filter queryFilter,
+      @Nullable final Filter unnestFilter,
       final VirtualColumns queryVirtualColumns,
       @Nullable final String inputColumn,
       @Nullable final ColumnCapabilities inputColumnCapabilites
   )
   {
+    /*
+    The goal of this function is to take a filter from the top of Correlate (queryFilter)
+    and a filter from the top of Uncollect (here unnest filter) and then do a rewrite
+    to generate filters to be passed to base cursor (filtersPushedDownToBaseCursor) and unnest cursor (filtersForPostUnnestCursor)
+    based on the following scenarios:
+
+    1. If there is an AND filter between unnested column and left e.g. select * from foo, UNNEST(dim3) as u(d3) where d3 IN (a,b) and m1 < 10
+       query filter -> m1 < 10
+       unnest filter -> d3 IN (a,b)
+
+       Output should be:
+       filtersPushedDownToBaseCursor -> dim3 IN (a,b) AND m1 < 10
+       filtersForPostUnnestCursor -> d3 IN (a,b)
+
+    2. There is an AND filter between unnested column and left e.g. select * from foo, UNNEST(ARRAY[dim1,dim2]) as u(d12) where d12 IN (a,b) and m1 < 10
+       query filter -> m1 < 10
+       unnest filter -> d12 IN (a,b)
+
+       Output should be:
+       filtersPushedDownToBaseCursor -> m1 < 10 (as unnest is on a virtual column it cannot be added to the pre-filter)
+       filtersForPostUnnestCursor -> d12 IN (a,b)
+
+    3. There is an OR filter involving unnested and left column e.g.  select * from foo, UNNEST(dim3) as u(d3) where d3 IN (a,b) or m1 < 10
+       query filter -> d3 IN (a,b) or m1 < 10
+       unnest filter -> null
+
+       Output should be:
+       filtersPushedDownToBaseCursor -> dim3 IN (a,b) or m1 < 10
+       filtersForPostUnnestCursor -> d3 IN (a,b) or m1 < 10
+
+     4. There is an OR filter involving unnested and left column e.g. select * from foo, UNNEST(ARRAY[dim1,dim2]) as u(d12) where d12 IN (a,b) or m1 < 10
+       query filter -> d12 IN (a,b) or m1 < 10
+       unnest filter -> null
+
+       Output should be:
+       filtersPushedDownToBaseCursor -> null (as the filter cannot be re-written due to presence of virtual columns)
+       filtersForPostUnnestCursor -> d12 IN (a,b) or m1 < 10
+     */
     class FilterSplitter
     {
-      final List<Filter> preFilters = new ArrayList<>();
-      final List<Filter> postFilters = new ArrayList<>();
+      final List<Filter> filtersPushedDownToBaseCursor = new ArrayList<>();
+      final List<Filter> filtersForPostUnnestCursor = new ArrayList<>();
+
+      void addPostFilterWithPreFilterIfRewritePossible(@Nullable final Filter filter, boolean skipPreFilters)
+      {
+        if (filter == null) {
+          return;
+        }
+        if (!skipPreFilters) {
+          final Filter newFilter = rewriteFilterOnUnnestColumnIfPossible(filter, inputColumn, inputColumnCapabilites);
+          if (newFilter != null) {
+            // Add the rewritten filter pre-unnest, so we get the benefit of any indexes, and so we avoid unnesting
+            // any rows that do not match this filter at all.
+            filtersPushedDownToBaseCursor.add(newFilter);
+          }
+        }
+        // Add original filter post-unnest no matter what: we need to filter out any extraneous unnested values.
+        filtersForPostUnnestCursor.add(filter);
+      }
 
-      void add(@Nullable final Filter filter)
+      void addPreFilter(@Nullable final Filter filter)
       {
         if (filter == null) {
           return;
@@ -276,53 +350,77 @@ public class UnnestStorageAdapter implements StorageAdapter
 
         final Set<String> requiredColumns = filter.getRequiredColumns();
 
-        // Run filter post-unnest if it refers to any virtual columns.
+        // Run filter post-unnest if it refers to any virtual columns. This is a conservative judgement call
+        // that perhaps forces the code to use a ValueMatcher where an index would've been available,
+        // which can have real performance implications. This is an interim choice made to value correctness
+        // over performance. When we need to optimize this performance, we should be able to
+        // create a VirtualColumnDatasource that contains all the virtual columns, in which case the query
+        // itself would stop carrying them and everything should be able to be pushed down.
         if (queryVirtualColumns.getVirtualColumns().length > 0) {
           for (String column : requiredColumns) {
             if (queryVirtualColumns.exists(column)) {
-              postFilters.add(filter);
+              filtersForPostUnnestCursor.add(filter);
               return;
             }
           }
         }
+        filtersPushedDownToBaseCursor.add(filter);
 
-        if (requiredColumns.contains(outputColumnName)) {
-          // Rewrite filter post-unnest if possible.
-          final Filter newFilter = rewriteFilterOnUnnestColumnIfPossible(filter, inputColumn, inputColumnCapabilites);
-          if (newFilter != null) {
-            // Add the rewritten filter pre-unnest, so we get the benefit of any indexes, and so we avoid unnesting
-            // any rows that do not match this filter at all.
-            preFilters.add(newFilter);
-          }
-          // Add original filter post-unnest no matter what: we need to filter out any extraneous unnested values.
-          postFilters.add(filter);
-        } else {
-          preFilters.add(filter);
-        }
       }
     }
 
     final FilterSplitter filterSplitter = new FilterSplitter();
 
-    if (queryFilter instanceof AndFilter) {
-      for (Filter filter : ((AndFilter) queryFilter).getFilters()) {
-        filterSplitter.add(filter);
+    if (queryFilter != null) {
+      List<Filter> preFilterList = new ArrayList<>();
+      final int origFilterSize;
+      if (queryFilter.getRequiredColumns().contains(outputColumnName)) {
+        // outside filter contains unnested column
+        // requires check for OR
+        if (queryFilter instanceof OrFilter) {
+          origFilterSize = ((OrFilter) queryFilter).getFilters().size();
+          for (Filter filter : ((OrFilter) queryFilter).getFilters()) {
+            if (filter.getRequiredColumns().contains(outputColumnName)) {
+              final Filter newFilter = rewriteFilterOnUnnestColumnIfPossible(
+                  filter,
+                  inputColumn,
+                  inputColumnCapabilites
+              );
+              if (newFilter != null) {
+                preFilterList.add(newFilter);
+              }
+            } else {
+              preFilterList.add(filter);
+            }
+          }
+          if (preFilterList.size() == origFilterSize) {
+            // there has been successful rewrites
+            final OrFilter preOrFilter = new OrFilter(preFilterList);
+            filterSplitter.addPreFilter(preOrFilter);
+          }
+          // add the entire query filter to unnest filter to be used in Value matcher
+          filterSplitter.addPostFilterWithPreFilterIfRewritePossible(queryFilter, true);
+        }
+      } else {
+        // normal case without any filter on unnested column
+        // add everything to pre-filters
+        filterSplitter.addPreFilter(queryFilter);
       }
-    } else {
-      filterSplitter.add(queryFilter);
     }
+    filterSplitter.addPostFilterWithPreFilterIfRewritePossible(unnestFilter, false);
 
     return Pair.of(
-        Filters.maybeAnd(filterSplitter.preFilters).orElse(null),
-        Filters.maybeAnd(filterSplitter.postFilters).orElse(null)
+        Filters.maybeAnd(filterSplitter.filtersPushedDownToBaseCursor).orElse(null),
+        Filters.maybeAnd(filterSplitter.filtersForPostUnnestCursor).orElse(null)
     );
   }
 
+
   /**
    * Returns the input of {@link #unnestColumn}, if it's a direct access; otherwise returns null.
    */
   @Nullable
-  private String getUnnestInputIfDirectAccess()
+  public String getUnnestInputIfDirectAccess(VirtualColumn unnestColumn)
   {
     if (unnestColumn instanceof ExpressionVirtualColumn) {
       return ((ExpressionVirtualColumn) unnestColumn).getParsedExpression().get().getBindingIfIdentifier();
@@ -333,7 +431,7 @@ public class UnnestStorageAdapter implements StorageAdapter
 
   /**
    * Rewrites a filter on {@link #outputColumnName} to operate on the input column from
-   * {@link #getUnnestInputIfDirectAccess()}, if possible.
+   * if possible.
    */
   @Nullable
   private Filter rewriteFilterOnUnnestColumnIfPossible(
diff --git a/processing/src/main/java/org/apache/druid/segment/join/PostJoinCursor.java b/processing/src/main/java/org/apache/druid/segment/join/PostJoinCursor.java
index d003da0bb3..11370fc167 100644
--- a/processing/src/main/java/org/apache/druid/segment/join/PostJoinCursor.java
+++ b/processing/src/main/java/org/apache/druid/segment/join/PostJoinCursor.java
@@ -41,6 +41,9 @@ public class PostJoinCursor implements Cursor
   @Nullable
   private final ValueMatcher valueMatcher;
 
+  @Nullable
+  private final Filter postJoinFilter;
+
   private PostJoinCursor(Cursor baseCursor, VirtualColumns virtualColumns, @Nullable Filter filter)
   {
     this.baseCursor = baseCursor;
@@ -52,6 +55,7 @@ public class PostJoinCursor implements Cursor
     } else {
       this.valueMatcher = filter.makeMatcher(this.columnSelectorFactory);
     }
+    this.postJoinFilter = filter;
   }
 
   public static PostJoinCursor wrap(
@@ -86,6 +90,12 @@ public class PostJoinCursor implements Cursor
     return baseCursor.getTime();
   }
 
+  @Nullable
+  public Filter getPostJoinFilter()
+  {
+    return postJoinFilter;
+  }
+
   @Override
   public void advance()
   {
diff --git a/processing/src/test/java/org/apache/druid/query/QueryRunnerTestHelper.java b/processing/src/test/java/org/apache/druid/query/QueryRunnerTestHelper.java
index 2f4119af8f..aec318aa7a 100644
--- a/processing/src/test/java/org/apache/druid/query/QueryRunnerTestHelper.java
+++ b/processing/src/test/java/org/apache/druid/query/QueryRunnerTestHelper.java
@@ -111,7 +111,8 @@ public class QueryRunnerTestHelper
           "\"" + QueryRunnerTestHelper.PLACEMENTISH_DIMENSION + "\"",
           null,
           ExprMacroTable.nil()
-      )
+      ),
+      null
   );
 
   public static final Granularity DAY_GRAN = Granularities.DAY;
diff --git a/processing/src/test/java/org/apache/druid/query/groupby/UnnestGroupByQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/groupby/UnnestGroupByQueryRunnerTest.java
index a09d2fc256..826d612f67 100644
--- a/processing/src/test/java/org/apache/druid/query/groupby/UnnestGroupByQueryRunnerTest.java
+++ b/processing/src/test/java/org/apache/druid/query/groupby/UnnestGroupByQueryRunnerTest.java
@@ -239,7 +239,8 @@ public class UnnestGroupByQueryRunnerTest extends InitializedNullHandlingTest
                 "\"" + QueryRunnerTestHelper.PLACEMENTISH_DIMENSION + "\"",
                 null,
                 ExprMacroTable.nil()
-            )
+            ),
+            null
         ))
         .setQuerySegmentSpec(QueryRunnerTestHelper.FIRST_TO_THIRD)
         .setDimensions(new DefaultDimensionSpec("quality", "alias"))
@@ -452,7 +453,8 @@ public class UnnestGroupByQueryRunnerTest extends InitializedNullHandlingTest
                 "\"" + QueryRunnerTestHelper.PLACEMENTISH_DIMENSION + "\"",
                 null,
                 ExprMacroTable.nil()
-            )
+            ),
+            null
         ))
         .setQuerySegmentSpec(QueryRunnerTestHelper.FIRST_TO_THIRD)
         .setDimensions(
@@ -564,7 +566,8 @@ public class UnnestGroupByQueryRunnerTest extends InitializedNullHandlingTest
             "mv_to_array(placementish)",
             ColumnType.STRING_ARRAY,
             TestExprMacroTable.INSTANCE
-        )
+        ),
+        null
     );
 
     GroupByQuery query = makeQueryBuilder()
@@ -652,7 +655,8 @@ public class UnnestGroupByQueryRunnerTest extends InitializedNullHandlingTest
             "array(\"market\",\"quality\")",
             ColumnType.STRING,
             TestExprMacroTable.INSTANCE
-        )
+        ),
+        null
     );
 
     GroupByQuery query = makeQueryBuilder()
diff --git a/processing/src/test/java/org/apache/druid/query/scan/UnnestScanQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/scan/UnnestScanQueryRunnerTest.java
index a0fa2b929e..5c18e4784f 100644
--- a/processing/src/test/java/org/apache/druid/query/scan/UnnestScanQueryRunnerTest.java
+++ b/processing/src/test/java/org/apache/druid/query/scan/UnnestScanQueryRunnerTest.java
@@ -164,7 +164,8 @@ public class UnnestScanQueryRunnerTest extends InitializedNullHandlingTest
                       "mv_to_array(placementish)",
                       ColumnType.STRING,
                       TestExprMacroTable.INSTANCE
-                  )
+                  ),
+                  null
               ))
               .columns(QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST)
               .eternityInterval()
@@ -233,7 +234,8 @@ public class UnnestScanQueryRunnerTest extends InitializedNullHandlingTest
                       "array(\"market\",\"quality\")",
                       ColumnType.STRING,
                       TestExprMacroTable.INSTANCE
-                  )
+                  ),
+                  null
               ))
               .columns(QueryRunnerTestHelper.MARKET_DIMENSION, QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST)
               .eternityInterval()
diff --git a/processing/src/test/java/org/apache/druid/query/topn/UnnestTopNQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/topn/UnnestTopNQueryRunnerTest.java
index c30022216b..e822913489 100644
--- a/processing/src/test/java/org/apache/druid/query/topn/UnnestTopNQueryRunnerTest.java
+++ b/processing/src/test/java/org/apache/druid/query/topn/UnnestTopNQueryRunnerTest.java
@@ -258,7 +258,8 @@ public class UnnestTopNQueryRunnerTest extends InitializedNullHandlingTest
                 "mv_to_array(\"placementish\")",
                 ColumnType.STRING_ARRAY,
                 TestExprMacroTable.INSTANCE
-            )
+            ),
+            null
         ))
         .granularity(QueryRunnerTestHelper.ALL_GRAN)
         .dimension(QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST)
@@ -340,7 +341,8 @@ public class UnnestTopNQueryRunnerTest extends InitializedNullHandlingTest
                 "array(\"market\",\"quality\")",
                 ColumnType.STRING,
                 TestExprMacroTable.INSTANCE
-            )
+            ),
+            null
         ))
         .granularity(QueryRunnerTestHelper.ALL_GRAN)
         .dimension(QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST)
diff --git a/processing/src/test/java/org/apache/druid/segment/UnnestStorageAdapterTest.java b/processing/src/test/java/org/apache/druid/segment/UnnestStorageAdapterTest.java
index d852547e4e..f34dcc2b6b 100644
--- a/processing/src/test/java/org/apache/druid/segment/UnnestStorageAdapterTest.java
+++ b/processing/src/test/java/org/apache/druid/segment/UnnestStorageAdapterTest.java
@@ -22,17 +22,23 @@ package org.apache.druid.segment;
 import com.google.common.collect.ImmutableList;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.granularity.Granularity;
 import org.apache.druid.java.util.common.guava.Sequence;
 import org.apache.druid.java.util.common.io.Closer;
 import org.apache.druid.math.expr.ExprMacroTable;
+import org.apache.druid.query.QueryMetrics;
 import org.apache.druid.query.dimension.DefaultDimensionSpec;
+import org.apache.druid.query.filter.Filter;
+import org.apache.druid.query.filter.SelectorDimFilter;
 import org.apache.druid.segment.column.ColumnCapabilities;
 import org.apache.druid.segment.column.ValueType;
+import org.apache.druid.segment.filter.OrFilter;
 import org.apache.druid.segment.generator.GeneratorBasicSchemas;
 import org.apache.druid.segment.generator.GeneratorSchemaInfo;
 import org.apache.druid.segment.generator.SegmentGenerator;
 import org.apache.druid.segment.incremental.IncrementalIndex;
 import org.apache.druid.segment.incremental.IncrementalIndexStorageAdapter;
+import org.apache.druid.segment.join.PostJoinCursor;
 import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
 import org.apache.druid.testing.InitializedNullHandlingTest;
 import org.apache.druid.timeline.DataSegment;
@@ -40,11 +46,13 @@ import org.apache.druid.timeline.partition.LinearShardSpec;
 import org.apache.druid.utils.CloseableUtils;
 import org.hamcrest.CoreMatchers;
 import org.hamcrest.MatcherAssert;
+import org.joda.time.Interval;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import javax.annotation.Nullable;
 import java.util.Arrays;
 import java.util.List;
 
@@ -55,6 +63,7 @@ public class UnnestStorageAdapterTest extends InitializedNullHandlingTest
   private static IncrementalIndexStorageAdapter INCREMENTAL_INDEX_STORAGE_ADAPTER;
   private static UnnestStorageAdapter UNNEST_STORAGE_ADAPTER;
   private static UnnestStorageAdapter UNNEST_STORAGE_ADAPTER1;
+  private static UnnestStorageAdapter UNNEST_STORAGE_ADAPTER2;
   private static List<StorageAdapter> ADAPTERS;
   private static String COLUMNNAME = "multi-string1";
   private static String OUTPUT_COLUMN_NAME = "unnested-multi-string1";
@@ -82,14 +91,23 @@ public class UnnestStorageAdapterTest extends InitializedNullHandlingTest
     INCREMENTAL_INDEX_STORAGE_ADAPTER = new IncrementalIndexStorageAdapter(INCREMENTAL_INDEX);
     UNNEST_STORAGE_ADAPTER = new UnnestStorageAdapter(
         INCREMENTAL_INDEX_STORAGE_ADAPTER,
-        new ExpressionVirtualColumn(OUTPUT_COLUMN_NAME, "\"" + COLUMNNAME + "\"", null, ExprMacroTable.nil())
+        new ExpressionVirtualColumn(OUTPUT_COLUMN_NAME, "\"" + COLUMNNAME + "\"", null, ExprMacroTable.nil()),
+        null
     );
 
     UNNEST_STORAGE_ADAPTER1 = new UnnestStorageAdapter(
         UNNEST_STORAGE_ADAPTER,
-        new ExpressionVirtualColumn(OUTPUT_COLUMN_NAME1, "\"" + COLUMNNAME + "\"", null, ExprMacroTable.nil())
+        new ExpressionVirtualColumn(OUTPUT_COLUMN_NAME1, "\"" + COLUMNNAME + "\"", null, ExprMacroTable.nil()),
+        null
+    );
+
+    UNNEST_STORAGE_ADAPTER2 = new UnnestStorageAdapter(
+        INCREMENTAL_INDEX_STORAGE_ADAPTER,
+        new ExpressionVirtualColumn(OUTPUT_COLUMN_NAME, "\"" + COLUMNNAME + "\"", null, ExprMacroTable.nil()),
+        new SelectorDimFilter(OUTPUT_COLUMN_NAME, "1", null)
     );
 
+
     ADAPTERS = ImmutableList.of(
         UNNEST_STORAGE_ADAPTER,
         UNNEST_STORAGE_ADAPTER1
@@ -245,4 +263,136 @@ public class UnnestStorageAdapterTest extends InitializedNullHandlingTest
     MatcherAssert.assertThat(column, CoreMatchers.instanceOf(ExpressionVirtualColumn.class));
     Assert.assertEquals("\"" + identifier + "\"", ((ExpressionVirtualColumn) column).getExpression());
   }
+
+  @Test
+  public void test_pushdown_or_filters_unnested_and_original_dimension_with_unnest_adapters()
+  {
+    final UnnestStorageAdapter unnestStorageAdapter = new UnnestStorageAdapter(
+        new TestStorageAdapter(INCREMENTAL_INDEX),
+        new ExpressionVirtualColumn(OUTPUT_COLUMN_NAME, "\"" + COLUMNNAME + "\"", null, ExprMacroTable.nil()),
+        null
+    );
+
+    final VirtualColumn vc = unnestStorageAdapter.getUnnestColumn();
+
+    final String inputColumn = unnestStorageAdapter.getUnnestInputIfDirectAccess(vc);
+
+    final OrFilter baseFilter = new OrFilter(ImmutableList.of(
+        new SelectorDimFilter(OUTPUT_COLUMN_NAME, "1", null).toFilter(),
+        new SelectorDimFilter(inputColumn, "2", null).toFilter()
+    ));
+
+    final OrFilter expectedPushDownFilter = new OrFilter(ImmutableList.of(
+        new SelectorDimFilter(inputColumn, "1", null).toFilter(),
+        new SelectorDimFilter(inputColumn, "2", null).toFilter()
+    ));
+
+    final Sequence<Cursor> cursorSequence = unnestStorageAdapter.makeCursors(
+        baseFilter,
+        unnestStorageAdapter.getInterval(),
+        VirtualColumns.EMPTY,
+        Granularities.ALL,
+        false,
+        null
+    );
+
+    final TestStorageAdapter base = (TestStorageAdapter) unnestStorageAdapter.getBaseAdapter();
+    final Filter pushDownFilter = base.getPushDownFilter();
+
+    Assert.assertEquals(expectedPushDownFilter, pushDownFilter);
+    cursorSequence.accumulate(null, (accumulated, cursor) -> {
+      Assert.assertEquals(cursor.getClass(), PostJoinCursor.class);
+      final Filter postFilter = ((PostJoinCursor) cursor).getPostJoinFilter();
+      // OR-case so base filter should match the postJoinFilter
+      Assert.assertEquals(baseFilter, postFilter);
+      return null;
+    });
+  }
+
+  @Test
+  public void test_pushdown_filters_unnested_dimension_with_unnest_adapters()
+  {
+    final UnnestStorageAdapter unnestStorageAdapter = new UnnestStorageAdapter(
+        new TestStorageAdapter(INCREMENTAL_INDEX),
+        new ExpressionVirtualColumn(OUTPUT_COLUMN_NAME, "\"" + COLUMNNAME + "\"", null, ExprMacroTable.nil()),
+        new SelectorDimFilter(OUTPUT_COLUMN_NAME, "1", null)
+    );
+
+    final VirtualColumn vc = unnestStorageAdapter.getUnnestColumn();
+
+    final String inputColumn = unnestStorageAdapter.getUnnestInputIfDirectAccess(vc);
+
+    final Filter expectedPushDownFilter =
+        new SelectorDimFilter(inputColumn, "1", null).toFilter();
+
+
+    final Sequence<Cursor> cursorSequence = unnestStorageAdapter.makeCursors(
+        null,
+        unnestStorageAdapter.getInterval(),
+        VirtualColumns.EMPTY,
+        Granularities.ALL,
+        false,
+        null
+    );
+
+    final TestStorageAdapter base = (TestStorageAdapter) unnestStorageAdapter.getBaseAdapter();
+    final Filter pushDownFilter = base.getPushDownFilter();
+
+    Assert.assertEquals(expectedPushDownFilter, pushDownFilter);
+    cursorSequence.accumulate(null, (accumulated, cursor) -> {
+      Assert.assertEquals(cursor.getClass(), PostJoinCursor.class);
+      final Filter postFilter = ((PostJoinCursor) cursor).getPostJoinFilter();
+      Assert.assertEquals(unnestStorageAdapter.getUnnestFilter(), postFilter);
+
+      ColumnSelectorFactory factory = cursor.getColumnSelectorFactory();
+      DimensionSelector dimSelector = factory.makeDimensionSelector(DefaultDimensionSpec.of(OUTPUT_COLUMN_NAME));
+      int count = 0;
+      while (!cursor.isDone()) {
+        Object dimSelectorVal = dimSelector.getObject();
+        if (dimSelectorVal == null) {
+          Assert.assertNull(dimSelectorVal);
+        }
+        cursor.advance();
+        count++;
+      }
+      Assert.assertEquals(1, count);
+      return null;
+    });
+  }
+}
+
+/**
+ * Class to test the flow of pushing down filters into the base cursor
+ * while using the UnnestStorageAdapter. This class keeps a reference of the filter
+ * which is pushed down to the cursor which serves as a checkpoint to validate
+ * if the right filter is being pushed down
+ */
+class TestStorageAdapter extends IncrementalIndexStorageAdapter
+{
+
+  private Filter pushDownFilter;
+
+  public TestStorageAdapter(IncrementalIndex index)
+  {
+    super(index);
+  }
+
+  public Filter getPushDownFilter()
+  {
+    return pushDownFilter;
+  }
+
+  @Override
+  public Sequence<Cursor> makeCursors(
+      @Nullable final Filter filter,
+      final Interval interval,
+      final VirtualColumns virtualColumns,
+      final Granularity gran,
+      final boolean descending,
+      @Nullable QueryMetrics<?> queryMetrics
+  )
+  {
+    this.pushDownFilter = filter;
+    return super.makeCursors(filter, interval, virtualColumns, gran, descending, queryMetrics);
+  }
 }
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidCorrelateUnnestRel.java b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidCorrelateUnnestRel.java
index bcf358272f..889d30648f 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidCorrelateUnnestRel.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidCorrelateUnnestRel.java
@@ -31,6 +31,7 @@ import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.RelWriter;
 import org.apache.calcite.rel.core.Correlate;
 import org.apache.calcite.rel.core.CorrelationId;
+import org.apache.calcite.rel.core.Filter;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rex.RexCall;
@@ -44,10 +45,12 @@ import org.apache.druid.query.DataSource;
 import org.apache.druid.query.QueryDataSource;
 import org.apache.druid.query.TableDataSource;
 import org.apache.druid.query.UnnestDataSource;
+import org.apache.druid.query.filter.DimFilter;
 import org.apache.druid.segment.column.RowSignature;
 import org.apache.druid.sql.calcite.expression.DruidExpression;
 import org.apache.druid.sql.calcite.expression.Expressions;
 import org.apache.druid.sql.calcite.expression.builtin.MultiValueStringToArrayOperatorConversion;
+import org.apache.druid.sql.calcite.filtration.Filtration;
 import org.apache.druid.sql.calcite.planner.Calcites;
 import org.apache.druid.sql.calcite.planner.PlannerContext;
 import org.apache.druid.sql.calcite.table.RowSignatures;
@@ -63,7 +66,7 @@ import java.util.stream.Collectors;
  * Each correlate can be perceived as a join with the join type being inner
  * the left of a correlate as seen in the rule {@link org.apache.druid.sql.calcite.rule.DruidCorrelateUnnestRule}
  * is the {@link DruidQueryRel} while the right will always be an {@link DruidUnnestRel}.
- *
+ * <p>
  * Since this is a subclass of DruidRel it is automatically considered by other rules that involves DruidRels.
  * Some example being SELECT_PROJECT and SORT_PROJECT rules in {@link org.apache.druid.sql.calcite.rule.DruidRules.DruidQueryRule}
  */
@@ -136,11 +139,13 @@ public class DruidCorrelateUnnestRel extends DruidRel<DruidCorrelateUnnestRel>
     final DruidUnnestRel unnestDatasourceRel = (DruidUnnestRel) right;
     final DataSource leftDataSource;
     final RowSignature leftDataSourceSignature;
+    final Filter unnestFilter = unnestDatasourceRel.getUnnestFilter();
 
     if (right.getRowType().getFieldNames().size() != 1) {
       throw new CannotBuildQueryException("Cannot perform correlated join + UNNEST with more than one column");
     }
 
+
     if (computeLeftRequiresSubquery(leftDruidRel)) {
       // Left side is doing more than simple scan: generate a subquery.
       leftDataSource = new QueryDataSource(leftQuery.getQuery());
@@ -164,6 +169,20 @@ public class DruidCorrelateUnnestRel extends DruidRel<DruidCorrelateUnnestRel>
 
     // Final output row signature.
     final RowSignature correlateRowSignature = getCorrelateRowSignature(correlateRel, leftQuery);
+    final DimFilter unnestFilterOnDataSource;
+    if (unnestFilter != null) {
+      RowSignature filterRowSignature = RowSignatures.fromRelDataType(ImmutableList.of(correlateRowSignature.getColumnName(
+          correlateRowSignature.size() - 1)), unnestFilter.getInput().getRowType());
+      unnestFilterOnDataSource = Filtration.create(DruidQuery.getDimFilter(
+                                               getPlannerContext(),
+                                               filterRowSignature,
+                                               null,
+                                               unnestFilter
+                                           ))
+                                           .optimizeFilterOnly(filterRowSignature).getDimFilter();
+    } else {
+      unnestFilterOnDataSource = null;
+    }
 
     return partialQuery.build(
         UnnestDataSource.create(
@@ -172,7 +191,8 @@ public class DruidCorrelateUnnestRel extends DruidRel<DruidCorrelateUnnestRel>
                 correlateRowSignature.getColumnName(correlateRowSignature.size() - 1),
                 Calcites.getColumnTypeForRelDataType(rexNodeToUnnest.getType()),
                 getPlannerContext().getExprMacroTable()
-            )
+            ),
+            unnestFilterOnDataSource
         ),
         correlateRowSignature,
         getPlannerContext(),
@@ -282,7 +302,7 @@ public class DruidCorrelateUnnestRel extends DruidRel<DruidCorrelateUnnestRel>
   /**
    * Computes whether a particular left-side rel requires a subquery, or if we can operate on its underlying
    * datasource directly.
-   *
+   * <p>
    * Stricter than {@link DruidJoinQueryRel#computeLeftRequiresSubquery}: this method only allows scans (not mappings).
    * This is OK because any mapping or other simple projection would have been pulled above the {@link Correlate} by
    * {@link org.apache.druid.sql.calcite.rule.DruidCorrelateUnnestRule}.
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java
index 116acb0c5c..2a093e6a30 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java
@@ -332,7 +332,7 @@ public class DruidQuery
   }
 
   @Nonnull
-  private static DimFilter getDimFilter(
+  public static DimFilter getDimFilter(
       final PlannerContext plannerContext,
       final RowSignature rowSignature,
       @Nullable final VirtualColumnRegistry virtualColumnRegistry,
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidUnnestRel.java b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidUnnestRel.java
index f7809e2fae..de4294a737 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidUnnestRel.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidUnnestRel.java
@@ -24,6 +24,7 @@ import org.apache.calcite.plan.RelOptUtil;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.RelWriter;
+import org.apache.calcite.rel.core.Filter;
 import org.apache.calcite.rel.core.Uncollect;
 import org.apache.calcite.rel.logical.LogicalProject;
 import org.apache.calcite.rel.logical.LogicalValues;
@@ -56,16 +57,24 @@ public class DruidUnnestRel extends DruidRel<DruidUnnestRel>
    * {@link org.apache.calcite.rex.RexFieldAccess}.
    */
   private final RexNode inputRexNode;
+  private final Filter unnestFilter;
 
   private DruidUnnestRel(
       final RelOptCluster cluster,
       final RelTraitSet traits,
       final RexNode inputRexNode,
+      final Filter unnestFilter,
       final PlannerContext plannerContext
   )
   {
     super(cluster, traits, plannerContext);
     this.inputRexNode = inputRexNode;
+    this.unnestFilter = unnestFilter;
+  }
+
+  public Filter getUnnestFilter()
+  {
+    return unnestFilter;
   }
 
   public static DruidUnnestRel create(
@@ -83,6 +92,7 @@ public class DruidUnnestRel extends DruidRel<DruidUnnestRel>
         cluster,
         traits,
         unnestRexNode,
+        null,
         plannerContext
     );
   }
@@ -100,6 +110,7 @@ public class DruidUnnestRel extends DruidRel<DruidUnnestRel>
           getCluster(),
           getTraitSet(),
           newInputRexNode,
+          unnestFilter,
           getPlannerContext()
       );
     }
@@ -125,6 +136,11 @@ public class DruidUnnestRel extends DruidRel<DruidUnnestRel>
     throw new UnsupportedOperationException();
   }
 
+  public DruidUnnestRel withFilter(Filter f)
+  {
+    return new DruidUnnestRel(getCluster(), getTraitSet(), inputRexNode, f, getPlannerContext());
+  }
+
   /**
    * Returns a new rel with a new input. The output type is unchanged.
    */
@@ -134,6 +150,7 @@ public class DruidUnnestRel extends DruidRel<DruidUnnestRel>
         getCluster(),
         getTraitSet(),
         newInputRexNode,
+        unnestFilter,
         getPlannerContext()
     );
   }
@@ -160,6 +177,7 @@ public class DruidUnnestRel extends DruidRel<DruidUnnestRel>
         getCluster(),
         getTraitSet().replace(DruidConvention.instance()),
         inputRexNode,
+        unnestFilter,
         getPlannerContext()
     );
   }
@@ -167,7 +185,7 @@ public class DruidUnnestRel extends DruidRel<DruidUnnestRel>
   @Override
   public RelWriter explainTerms(RelWriter pw)
   {
-    return pw.item("expr", inputRexNode);
+    return pw.item("expr", inputRexNode).item("filter", unnestFilter);
   }
 
   @Override
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rule/CorrelateFilterLTransposeRule.java b/sql/src/main/java/org/apache/druid/sql/calcite/rule/CorrelateFilterLTransposeRule.java
index 8e2b8d8d72..3fd4baf6f5 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/rule/CorrelateFilterLTransposeRule.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/rule/CorrelateFilterLTransposeRule.java
@@ -30,7 +30,6 @@ import org.apache.calcite.rel.core.Filter;
  * Rule that pulls a {@link Filter} from the left-hand side of a {@link Correlate} above the Correlate.
  * Allows subquery elimination.
  *
- * @see CorrelateFilterRTransposeRule similar, but for right-hand side filters
  */
 public class CorrelateFilterLTransposeRule extends RelOptRule
 {
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rule/CorrelateFilterRTransposeRule.java b/sql/src/main/java/org/apache/druid/sql/calcite/rule/CorrelateFilterRTransposeRule.java
deleted file mode 100644
index 66731ca78a..0000000000
--- a/sql/src/main/java/org/apache/druid/sql/calcite/rule/CorrelateFilterRTransposeRule.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.sql.calcite.rule;
-
-import com.google.common.collect.ImmutableList;
-import org.apache.calcite.plan.RelOptRule;
-import org.apache.calcite.plan.RelOptRuleCall;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.core.Correlate;
-import org.apache.calcite.rel.core.CorrelationId;
-import org.apache.calcite.rel.core.Filter;
-import org.apache.calcite.rex.RexCorrelVariable;
-import org.apache.calcite.rex.RexNode;
-import org.apache.calcite.rex.RexUtil;
-import org.apache.calcite.rex.RexVisitorImpl;
-
-/**
- * Rule that pulls a {@link Filter} from the right-hand side of a {@link Correlate} above the Correlate.
- * Allows filters on unnested fields to be added to queries that use {@link org.apache.druid.query.UnnestDataSource}.
- *
- * @see CorrelateFilterLTransposeRule similar, but for left-hand side filters
- */
-public class CorrelateFilterRTransposeRule extends RelOptRule
-{
-  private static final CorrelateFilterRTransposeRule INSTANCE = new CorrelateFilterRTransposeRule();
-
-  public CorrelateFilterRTransposeRule()
-  {
-    super(
-        operand(
-            Correlate.class,
-            operand(RelNode.class, any()),
-            operand(Filter.class, any())
-        ));
-  }
-
-  public static CorrelateFilterRTransposeRule instance()
-  {
-    return INSTANCE;
-  }
-
-  @Override
-  public boolean matches(RelOptRuleCall call)
-  {
-    final Correlate correlate = call.rel(0);
-    final Filter right = call.rel(2);
-
-    // Can't pull up filters that explicitly refer to the correlation variable.
-    return !usesCorrelationId(correlate.getCorrelationId(), right.getCondition());
-  }
-
-  @Override
-  public void onMatch(final RelOptRuleCall call)
-  {
-    final Correlate correlate = call.rel(0);
-    final RelNode left = call.rel(1);
-    final Filter right = call.rel(2);
-
-    call.transformTo(
-        call.builder()
-            .push(correlate.copy(correlate.getTraitSet(), ImmutableList.of(left, right.getInput())))
-            .filter(RexUtil.shift(right.getCondition(), left.getRowType().getFieldCount()))
-            .build()
-    );
-  }
-
-  /**
-   * Whether an expression refers to correlation variables.
-   */
-  private static boolean usesCorrelationId(final CorrelationId correlationId, final RexNode rexNode)
-  {
-    class CorrelationVisitor extends RexVisitorImpl<Void>
-    {
-      private boolean found = false;
-
-      public CorrelationVisitor()
-      {
-        super(true);
-      }
-
-      @Override
-      public Void visitCorrelVariable(RexCorrelVariable correlVariable)
-      {
-        if (correlVariable.id.equals(correlationId)) {
-          found = true;
-        }
-        return null;
-      }
-    }
-
-    final CorrelationVisitor visitor = new CorrelationVisitor();
-    rexNode.accept(visitor);
-    return visitor.found;
-  }
-}
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rule/DruidFilterUnnestRule.java b/sql/src/main/java/org/apache/druid/sql/calcite/rule/DruidFilterUnnestRule.java
new file mode 100644
index 0000000000..c732caaa2a
--- /dev/null
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/rule/DruidFilterUnnestRule.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.sql.calcite.rule;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.druid.sql.calcite.rel.DruidUnnestRel;
+
+public class DruidFilterUnnestRule extends RelOptRule
+{
+  private static final DruidFilterUnnestRule INSTANCE = new DruidFilterUnnestRule();
+
+  private DruidFilterUnnestRule()
+  {
+    super(
+        operand(
+            Filter.class,
+            operand(DruidUnnestRel.class, any())
+        )
+    );
+  }
+
+  public static DruidFilterUnnestRule instance()
+  {
+    return INSTANCE;
+  }
+
+  @Override
+  public void onMatch(RelOptRuleCall call)
+  {
+    final Filter filter = call.rel(0);
+    final DruidUnnestRel unnestDatasourceRel = call.rel(1);
+    DruidUnnestRel newRel = unnestDatasourceRel.withFilter(filter);
+    call.transformTo(newRel);
+  }
+
+  // This is for a special case of handling selector filters
+  // on top of UnnestDataSourceRel when Calcite adds an extra
+  // LogicalProject on the LogicalFilter. For e.g. #122 here
+  // SELECT d3 FROM druid.numfoo, UNNEST(MV_TO_ARRAY(dim3)) as unnested (d3) where d3='b'
+  // 126:LogicalProject(d3=[$17])
+  //  124:LogicalCorrelate(subset=[rel#125:Subset#6.NONE.[]], correlation=[$cor0], joinType=[inner], requiredColumns=[{3}])
+  //    8:LogicalTableScan(subset=[rel#114:Subset#0.NONE.[]], table=[[druid, numfoo]])
+  //    122:LogicalProject(subset=[rel#123:Subset#5.NONE.[]], d3=[CAST('b':VARCHAR):VARCHAR])
+  //      120:LogicalFilter(subset=[rel#121:Subset#4.NONE.[]], condition=[=($0, 'b')])
+  //        118:Uncollect(subset=[rel#119:Subset#3.NONE.[]])
+  //          116:LogicalProject(subset=[rel#117:Subset#2.NONE.[]], EXPR$0=[MV_TO_ARRAY($cor0.dim3)])
+  //            9:LogicalValues(subset=[rel#115:Subset#1.NONE.[0]], tuples=[[{ 0 }]])
+
+  // This logical project does a type cast only which Druid already has information about
+  // So we can skip this LogicalProject only if it is a CAST for strings or LITERALS for other types
+  // Extensive unit tests can be found in {@link CalciteArraysQueryTest}
+
+  static class DruidProjectOnUnnestRule extends RelOptRule
+  {
+    private static final DruidProjectOnUnnestRule INSTANCE = new DruidProjectOnUnnestRule();
+
+    private DruidProjectOnUnnestRule()
+    {
+      super(
+          operand(
+              Project.class,
+              operand(DruidUnnestRel.class, any())
+          )
+      );
+    }
+
+    public static DruidProjectOnUnnestRule instance()
+    {
+      return INSTANCE;
+    }
+
+    @Override
+    public boolean matches(RelOptRuleCall call)
+    {
+      final Project rightP = call.rel(0);
+      final SqlKind rightProjectKind = rightP.getChildExps().get(0).getKind();
+      // allow rule to trigger only if there's a string CAST or numeric literal cast
+      return rightP.getProjects().size() == 1 && (rightProjectKind == SqlKind.CAST || rightProjectKind == SqlKind.LITERAL);
+    }
+
+    @Override
+    public void onMatch(RelOptRuleCall call)
+    {
+      final DruidUnnestRel unnestDatasourceRel = call.rel(1);
+      call.transformTo(unnestDatasourceRel);
+    }
+  }
+}
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rule/DruidRules.java b/sql/src/main/java/org/apache/druid/sql/calcite/rule/DruidRules.java
index 519e9365b4..52841fd99a 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/rule/DruidRules.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/rule/DruidRules.java
@@ -120,7 +120,8 @@ public class DruidRules
       retVal.add(new DruidCorrelateUnnestRule(plannerContext));
       retVal.add(ProjectCorrelateTransposeRule.INSTANCE);
       retVal.add(CorrelateFilterLTransposeRule.instance());
-      retVal.add(CorrelateFilterRTransposeRule.instance());
+      retVal.add(DruidFilterUnnestRule.instance());
+      retVal.add(DruidFilterUnnestRule.DruidProjectOnUnnestRule.instance());
     }
 
     return retVal;
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteArraysQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteArraysQueryTest.java
index f8976de7b0..80d7a5636c 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteArraysQueryTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteArraysQueryTest.java
@@ -2744,7 +2744,8 @@ public class CalciteArraysQueryTest extends BaseCalciteQueryTest
             Druids.newScanQueryBuilder()
                   .dataSource(UnnestDataSource.create(
                       new TableDataSource(CalciteTests.DATASOURCE3),
-                      expressionVirtualColumn("j0.unnest", "\"dim3\"", ColumnType.STRING)
+                      expressionVirtualColumn("j0.unnest", "\"dim3\"", ColumnType.STRING),
+                      null
                   ))
                   .intervals(querySegmentSpec(Filtration.eternity()))
                   .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
@@ -2798,13 +2799,15 @@ public class CalciteArraysQueryTest extends BaseCalciteQueryTest
                                   "j0.unnest",
                                   "string_to_array(\"dim1\",'\\u005C.')",
                                   ColumnType.STRING_ARRAY
-                              )
+                              ),
+                              null
                           ),
                           expressionVirtualColumn(
                               "_j0.unnest",
                               "\"dim3\"",
                               ColumnType.STRING
-                          )
+                          ),
+                          null
                       )
                   )
                   .intervals(querySegmentSpec(Filtration.eternity()))
@@ -2863,13 +2866,15 @@ public class CalciteArraysQueryTest extends BaseCalciteQueryTest
                                   "j0.unnest",
                                   "string_to_array(\"dim1\",'\\u005C.')",
                                   ColumnType.STRING_ARRAY
-                              )
+                              ),
+                              in("j0.unnest", ImmutableList.of("1", "2"), null)
                           ),
                           expressionVirtualColumn(
                               "_j0.unnest",
                               "\"dim3\"",
                               ColumnType.STRING
-                          )
+                          ),
+                          new LikeDimFilter("_j0.unnest", "_", null, null)
                       )
                   )
                   .intervals(querySegmentSpec(Filtration.eternity()))
@@ -2890,10 +2895,6 @@ public class CalciteArraysQueryTest extends BaseCalciteQueryTest
                           ColumnType.STRING
                       )
                   )
-                  .filters(and(
-                      in("j0.unnest", ImmutableList.of("1", "2"), null),
-                      new LikeDimFilter("_j0.unnest", "_", null, null)
-                  ))
                   .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
                   .legacy(false)
                   .context(QUERY_CONTEXT_UNNEST)
@@ -2924,7 +2925,8 @@ public class CalciteArraysQueryTest extends BaseCalciteQueryTest
             GroupByQuery.builder()
                         .setDataSource(UnnestDataSource.create(
                             new TableDataSource(CalciteTests.DATASOURCE3),
-                            expressionVirtualColumn("j0.unnest", "\"dim3\"", ColumnType.STRING)
+                            expressionVirtualColumn("j0.unnest", "\"dim3\"", ColumnType.STRING),
+                            null
                         ))
                         .setInterval(querySegmentSpec(Filtration.eternity()))
                         .setContext(QUERY_CONTEXT_UNNEST)
@@ -2968,7 +2970,8 @@ public class CalciteArraysQueryTest extends BaseCalciteQueryTest
             GroupByQuery.builder()
                         .setDataSource(UnnestDataSource.create(
                             new TableDataSource(CalciteTests.DATASOURCE3),
-                            expressionVirtualColumn("j0.unnest", "\"dim3\"", ColumnType.STRING)
+                            expressionVirtualColumn("j0.unnest", "\"dim3\"", ColumnType.STRING),
+                            null
                         ))
                         .setInterval(querySegmentSpec(Filtration.eternity()))
                         .setContext(QUERY_CONTEXT_UNNEST)
@@ -3023,7 +3026,8 @@ public class CalciteArraysQueryTest extends BaseCalciteQueryTest
             new TopNQueryBuilder()
                 .dataSource(UnnestDataSource.create(
                     new TableDataSource(CalciteTests.DATASOURCE3),
-                    expressionVirtualColumn("j0.unnest", "\"dim3\"", ColumnType.STRING)
+                    expressionVirtualColumn("j0.unnest", "\"dim3\"", ColumnType.STRING),
+                    null
                 ))
                 .intervals(querySegmentSpec(Filtration.eternity()))
                 .dimension(new DefaultDimensionSpec("j0.unnest", "_d0", ColumnType.STRING))
@@ -3061,7 +3065,8 @@ public class CalciteArraysQueryTest extends BaseCalciteQueryTest
             GroupByQuery.builder()
                         .setDataSource(UnnestDataSource.create(
                             new TableDataSource(CalciteTests.DATASOURCE3),
-                            expressionVirtualColumn("j0.unnest", "\"dim3\"", ColumnType.STRING)
+                            expressionVirtualColumn("j0.unnest", "\"dim3\"", ColumnType.STRING),
+                            null
                         ))
                         .setInterval(querySegmentSpec(Filtration.eternity()))
                         .setContext(QUERY_CONTEXT_UNNEST)
@@ -3103,7 +3108,8 @@ public class CalciteArraysQueryTest extends BaseCalciteQueryTest
             Druids.newScanQueryBuilder()
                   .dataSource(UnnestDataSource.create(
                       new TableDataSource(CalciteTests.DATASOURCE3),
-                      expressionVirtualColumn("j0.unnest", "\"dim3\"", ColumnType.STRING)
+                      expressionVirtualColumn("j0.unnest", "\"dim3\"", ColumnType.STRING),
+                      null
                   ))
                   .intervals(querySegmentSpec(Filtration.eternity()))
                   .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
@@ -3137,7 +3143,8 @@ public class CalciteArraysQueryTest extends BaseCalciteQueryTest
             Druids.newScanQueryBuilder()
                   .dataSource(UnnestDataSource.create(
                       new TableDataSource(CalciteTests.DATASOURCE3),
-                      expressionVirtualColumn("j0.unnest", "\"dim3\"", ColumnType.STRING)
+                      expressionVirtualColumn("j0.unnest", "\"dim3\"", ColumnType.STRING),
+                      null
                   ))
                   .intervals(querySegmentSpec(Filtration.eternity()))
                   .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
@@ -3198,7 +3205,8 @@ public class CalciteArraysQueryTest extends BaseCalciteQueryTest
                               .context(QUERY_CONTEXT_UNNEST)
                               .build()
                       ),
-                      expressionVirtualColumn("j0.unnest", "\"dim3\"", ColumnType.STRING)
+                      expressionVirtualColumn("j0.unnest", "\"dim3\"", ColumnType.STRING),
+                      null
                   ))
                   .intervals(querySegmentSpec(Filtration.eternity()))
                   .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
@@ -3247,11 +3255,11 @@ public class CalciteArraysQueryTest extends BaseCalciteQueryTest
                               .context(QUERY_CONTEXT_UNNEST)
                               .build()
                       ),
-                      expressionVirtualColumn("j0.unnest", "\"dim3\"", ColumnType.STRING)
+                      expressionVirtualColumn("j0.unnest", "\"dim3\"", ColumnType.STRING),
+                      not(selector("j0.unnest", "b", null))
                   ))
                   .intervals(querySegmentSpec(Filtration.eternity()))
                   .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
-                  .filters(not(selector("j0.unnest", "b", null)))
                   .legacy(false)
                   .context(QUERY_CONTEXT_UNNEST)
                   .columns(ImmutableList.of("j0.unnest"))
@@ -3280,16 +3288,16 @@ public class CalciteArraysQueryTest extends BaseCalciteQueryTest
             Druids.newScanQueryBuilder()
                   .dataSource(UnnestDataSource.create(
                       new TableDataSource(CalciteTests.DATASOURCE3),
-                      expressionVirtualColumn("j0.unnest", "\"dim3\"", ColumnType.STRING)
+                      expressionVirtualColumn("j0.unnest", "\"dim3\"", ColumnType.STRING),
+                      or(
+                          new LikeDimFilter("j0.unnest", "_", null, null),
+                          in("j0.unnest", ImmutableList.of("a", "c"), null)
+                      )
                   ))
                   .intervals(querySegmentSpec(Filtration.eternity()))
                   .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
                   .filters(
                       and(
-                          or(
-                              new LikeDimFilter("j0.unnest", "_", null, null),
-                              in("j0.unnest", ImmutableList.of("a", "c"), null)
-                          ),
                           selector("dim2", "a", null),
                           not(selector("dim1", "foo", null))
                       )
@@ -3322,7 +3330,8 @@ public class CalciteArraysQueryTest extends BaseCalciteQueryTest
             Druids.newScanQueryBuilder()
                   .dataSource(UnnestDataSource.create(
                       new TableDataSource(CalciteTests.DATASOURCE3),
-                      expressionVirtualColumn("j0.unnest", "\"dim3\"", ColumnType.STRING)
+                      expressionVirtualColumn("j0.unnest", "\"dim3\"", ColumnType.STRING),
+                      null
                   ))
                   .intervals(querySegmentSpec(Filtration.eternity()))
                   .filters(new InDimFilter("dim2", ImmutableList.of("a", "b", "ab", "abc"), null))
@@ -3358,7 +3367,8 @@ public class CalciteArraysQueryTest extends BaseCalciteQueryTest
             Druids.newScanQueryBuilder()
                   .dataSource(UnnestDataSource.create(
                       new TableDataSource(CalciteTests.DATASOURCE3),
-                      expressionVirtualColumn("j0.unnest", "array(\"dim4\",\"dim5\")", ColumnType.STRING_ARRAY)
+                      expressionVirtualColumn("j0.unnest", "array(\"dim4\",\"dim5\")", ColumnType.STRING_ARRAY),
+                      null
                   ))
                   .intervals(querySegmentSpec(Filtration.eternity()))
                   .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
@@ -3401,7 +3411,8 @@ public class CalciteArraysQueryTest extends BaseCalciteQueryTest
                                     "j0.unnest",
                                     "array(\"dim2\",\"dim4\")",
                                     ColumnType.STRING_ARRAY
-                                )
+                                ),
+                                null
                             )
                         )
                         .setInterval(querySegmentSpec(Filtration.eternity()))
@@ -3469,7 +3480,8 @@ public class CalciteArraysQueryTest extends BaseCalciteQueryTest
                           "(\"dim2\" == \"j0.dim2\")",
                           JoinType.INNER
                       ),
-                      expressionVirtualColumn("_j0.unnest", "\"dim3\"", ColumnType.STRING)
+                      expressionVirtualColumn("_j0.unnest", "\"dim3\"", ColumnType.STRING),
+                      null
                   ))
                   .intervals(querySegmentSpec(Filtration.eternity()))
                   .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
@@ -3571,7 +3583,8 @@ public class CalciteArraysQueryTest extends BaseCalciteQueryTest
             Druids.newScanQueryBuilder()
                   .dataSource(UnnestDataSource.create(
                       new TableDataSource(CalciteTests.DATASOURCE3),
-                      expressionVirtualColumn("j0.unnest", "\"dim3\"", ColumnType.STRING)
+                      expressionVirtualColumn("j0.unnest", "\"dim3\"", ColumnType.STRING),
+                      null
                   ))
                   .intervals(querySegmentSpec(Filtration.eternity()))
                   .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
@@ -3611,17 +3624,18 @@ public class CalciteArraysQueryTest extends BaseCalciteQueryTest
     skipVectorize();
     cannotVectorize();
     testQuery(
-        "SELECT d3 FROM druid.numfoo, UNNEST(MV_TO_ARRAY(dim3)) as unnested (d3) where d3 IN ('a','b')",
+        "SELECT d3 FROM druid.numfoo, UNNEST(MV_TO_ARRAY(dim3)) as unnested (d3) where d3 IN ('a','b') and m1 < 10",
         QUERY_CONTEXT_UNNEST,
         ImmutableList.of(
             Druids.newScanQueryBuilder()
                   .dataSource(UnnestDataSource.create(
                       new TableDataSource(CalciteTests.DATASOURCE3),
-                      expressionVirtualColumn("j0.unnest", "\"dim3\"", ColumnType.STRING)
+                      expressionVirtualColumn("j0.unnest", "\"dim3\"", ColumnType.STRING),
+                      new InDimFilter("j0.unnest", ImmutableSet.of("a", "b"), null)
                   ))
                   .intervals(querySegmentSpec(Filtration.eternity()))
                   .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
-                  .filters(new InDimFilter("j0.unnest", ImmutableSet.of("a", "b"), null))
+                  .filters(bound("m1", null, "10", false, true, null, StringComparators.NUMERIC))
                   .legacy(false)
                   .context(QUERY_CONTEXT_UNNEST)
                   .columns(ImmutableList.of("j0.unnest"))
@@ -3647,11 +3661,11 @@ public class CalciteArraysQueryTest extends BaseCalciteQueryTest
             Druids.newScanQueryBuilder()
                   .dataSource(UnnestDataSource.create(
                       new TableDataSource(CalciteTests.DATASOURCE3),
-                      expressionVirtualColumn("j0.unnest", "array(\"dim4\",\"dim5\")", ColumnType.STRING_ARRAY)
+                      expressionVirtualColumn("j0.unnest", "array(\"dim4\",\"dim5\")", ColumnType.STRING_ARRAY),
+                      new InDimFilter("j0.unnest", ImmutableSet.of("a", "b"), null)
                   ))
                   .intervals(querySegmentSpec(Filtration.eternity()))
                   .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
-                  .filters(new InDimFilter("j0.unnest", ImmutableSet.of("a", "b"), null))
                   .legacy(false)
                   .context(QUERY_CONTEXT_UNNEST)
                   .columns(ImmutableList.of("j0.unnest"))
@@ -3680,11 +3694,11 @@ public class CalciteArraysQueryTest extends BaseCalciteQueryTest
             Druids.newScanQueryBuilder()
                   .dataSource(UnnestDataSource.create(
                       new TableDataSource(CalciteTests.DATASOURCE3),
-                      expressionVirtualColumn("j0.unnest", "\"dim3\"", ColumnType.STRING)
+                      expressionVirtualColumn("j0.unnest", "\"dim3\"", ColumnType.STRING),
+                      new InDimFilter("j0.unnest", ImmutableSet.of("foo", "bar"), null)
                   ))
                   .intervals(querySegmentSpec(Filtration.eternity()))
                   .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
-                  .filters(new InDimFilter("j0.unnest", ImmutableSet.of("foo", "bar"), null))
                   .legacy(false)
                   .context(QUERY_CONTEXT_UNNEST)
                   .columns(ImmutableList.of("j0.unnest"))
@@ -3706,11 +3720,11 @@ public class CalciteArraysQueryTest extends BaseCalciteQueryTest
             Druids.newScanQueryBuilder()
                   .dataSource(UnnestDataSource.create(
                       new TableDataSource(CalciteTests.DATASOURCE3),
-                      expressionVirtualColumn("j0.unnest", "\"dim3\"", ColumnType.STRING)
+                      expressionVirtualColumn("j0.unnest", "\"dim3\"", ColumnType.STRING),
+                      not(selector("j0.unnest", "d", null))
                   ))
                   .intervals(querySegmentSpec(Filtration.eternity()))
                   .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
-                  .filters(not(selector("j0.unnest", "d", null)))
                   .legacy(false)
                   .context(QUERY_CONTEXT_UNNEST)
                   .columns(ImmutableList.of("j0.unnest"))
@@ -3737,4 +3751,421 @@ public class CalciteArraysQueryTest extends BaseCalciteQueryTest
         )
     );
   }
+
+  @Test
+  public void testUnnestWithSelectorFiltersOnSelectedColumn()
+  {
+    skipVectorize();
+    cannotVectorize();
+    testQuery(
+        "SELECT d3 FROM druid.numfoo, UNNEST(MV_TO_ARRAY(dim3)) as unnested (d3) where d3='b'",
+        QUERY_CONTEXT_UNNEST,
+        ImmutableList.of(
+            Druids.newScanQueryBuilder()
+                  .dataSource(UnnestDataSource.create(
+                      new TableDataSource(CalciteTests.DATASOURCE3),
+                      expressionVirtualColumn("j0.unnest", "\"dim3\"", ColumnType.STRING),
+                      selector("j0.unnest", "b", null)
+                  ))
+                  .intervals(querySegmentSpec(Filtration.eternity()))
+                  .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
+                  .legacy(false)
+                  .context(QUERY_CONTEXT_UNNEST)
+                  .columns(ImmutableList.of("j0.unnest"))
+                  .build()
+        ),
+        ImmutableList.of(
+            new Object[]{"b"},
+            new Object[]{"b"}
+        )
+    );
+  }
+
+  @Test
+  public void testUnnestWithSelectorFiltersOnVirtualColumn()
+  {
+    skipVectorize();
+    cannotVectorize();
+    testQuery(
+        "SELECT d12 FROM druid.numfoo, UNNEST(ARRAY[m1,m2]) as unnested (d12) where d12=1",
+        QUERY_CONTEXT_UNNEST,
+        ImmutableList.of(
+            Druids.newScanQueryBuilder()
+                  .dataSource(UnnestDataSource.create(
+                      new TableDataSource(CalciteTests.DATASOURCE3),
+                      expressionVirtualColumn("j0.unnest", "array(\"m1\",\"m2\")", ColumnType.FLOAT_ARRAY),
+                      selector("j0.unnest", "1", null)
+                  ))
+                  .intervals(querySegmentSpec(Filtration.eternity()))
+                  .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
+                  .legacy(false)
+                  .context(QUERY_CONTEXT_UNNEST)
+                  .columns(ImmutableList.of("j0.unnest"))
+                  .build()
+        ),
+        ImmutableList.of(
+            new Object[]{1.0f},
+            new Object[]{1.0f}
+        )
+    );
+  }
+
+  @Test
+  public void testUnnestWithSelectorFiltersOnVirtualStringColumn()
+  {
+    skipVectorize();
+    cannotVectorize();
+    testQuery(
+        "SELECT d45 FROM druid.numfoo, UNNEST(ARRAY[dim4,dim5]) as unnested (d45) where d45 IN ('a','ab')",
+        QUERY_CONTEXT_UNNEST,
+        ImmutableList.of(
+            Druids.newScanQueryBuilder()
+                  .dataSource(UnnestDataSource.create(
+                      new TableDataSource(CalciteTests.DATASOURCE3),
+                      expressionVirtualColumn("j0.unnest", "array(\"dim4\",\"dim5\")", ColumnType.STRING_ARRAY),
+                      new InDimFilter("j0.unnest", ImmutableSet.of("a", "ab"), null)
+                  ))
+                  .intervals(querySegmentSpec(Filtration.eternity()))
+                  .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
+                  .legacy(false)
+                  .context(QUERY_CONTEXT_UNNEST)
+                  .columns(ImmutableList.of("j0.unnest"))
+                  .build()
+        ),
+        ImmutableList.of(
+            new Object[]{"a"},
+            new Object[]{"a"},
+            new Object[]{"ab"},
+            new Object[]{"a"},
+            new Object[]{"ab"}
+        )
+    );
+  }
+
+  @Test
+  public void testUnnestWithMultipleAndFiltersOnSelectedColumns()
+  {
+    skipVectorize();
+    cannotVectorize();
+    testQuery(
+        "SELECT d3 FROM druid.numfoo, UNNEST(MV_TO_ARRAY(dim3)) as unnested (d3) where d3='b' and m1 < 10 and m2 < 10",
+        QUERY_CONTEXT_UNNEST,
+        ImmutableList.of(
+            Druids.newScanQueryBuilder()
+                  .dataSource(UnnestDataSource.create(
+                      new TableDataSource(CalciteTests.DATASOURCE3),
+                      expressionVirtualColumn("j0.unnest", "\"dim3\"", ColumnType.STRING),
+                      selector("j0.unnest", "b", null)
+                  ))
+                  .intervals(querySegmentSpec(Filtration.eternity()))
+                  .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
+                  .legacy(false)
+                  .context(QUERY_CONTEXT_UNNEST)
+                  .filters(
+                      and(
+                          bound("m1", null, "10", false, true, null, StringComparators.NUMERIC),
+                          bound("m2", null, "10", false, true, null, StringComparators.NUMERIC)
+                      )
+                  )
+                  .columns(ImmutableList.of("j0.unnest"))
+                  .build()
+        ),
+        ImmutableList.of(
+            new Object[]{"b"},
+            new Object[]{"b"}
+        )
+    );
+  }
+
+  @Test
+  public void testUnnestWithMultipleOrFiltersOnSelectedColumns()
+  {
+    skipVectorize();
+    cannotVectorize();
+    testQuery(
+        "SELECT d3 FROM druid.numfoo, UNNEST(MV_TO_ARRAY(dim3)) as unnested (d3) where d3='b' or m1 < 2 ",
+        QUERY_CONTEXT_UNNEST,
+        ImmutableList.of(
+            Druids.newScanQueryBuilder()
+                  .dataSource(UnnestDataSource.create(
+                      new TableDataSource(CalciteTests.DATASOURCE3),
+                      expressionVirtualColumn("j0.unnest", "\"dim3\"", ColumnType.STRING),
+                      null
+                  ))
+                  .intervals(querySegmentSpec(Filtration.eternity()))
+                  .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
+                  .legacy(false)
+                  .context(QUERY_CONTEXT_UNNEST)
+                  .filters(
+                      or(
+                          selector("j0.unnest", "b", null),
+                          bound("m1", null, "2", false, true, null, StringComparators.NUMERIC)
+                      )
+                  )
+                  .columns(ImmutableList.of("j0.unnest"))
+                  .build()
+        ),
+        ImmutableList.of(
+            new Object[]{"a"},
+            new Object[]{"b"},
+            new Object[]{"b"}
+        )
+    );
+  }
+
+  @Test
+  public void testUnnestWithMultipleAndFiltersOnSelectedUnnestedColumns()
+  {
+    skipVectorize();
+    cannotVectorize();
+    testQuery(
+        "SELECT d3 FROM druid.numfoo, UNNEST(MV_TO_ARRAY(dim3)) as unnested (d3) where d3 IN ('a','b') and d3 < 'e' ",
+        QUERY_CONTEXT_UNNEST,
+        ImmutableList.of(
+            Druids.newScanQueryBuilder()
+                  .dataSource(UnnestDataSource.create(
+                      new TableDataSource(CalciteTests.DATASOURCE3),
+                      expressionVirtualColumn("j0.unnest", "\"dim3\"", ColumnType.STRING),
+                      and(
+                          new InDimFilter("j0.unnest", ImmutableSet.of("a", "b"), null),
+                          bound("j0.unnest", null, "e", false, true, null, StringComparators.LEXICOGRAPHIC)
+                      )
+                  ))
+                  .intervals(querySegmentSpec(Filtration.eternity()))
+                  .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
+                  .legacy(false)
+                  .context(QUERY_CONTEXT_UNNEST)
+                  .columns(ImmutableList.of("j0.unnest"))
+                  .build()
+        ),
+        ImmutableList.of(
+            new Object[]{"a"},
+            new Object[]{"b"},
+            new Object[]{"b"}
+        )
+    );
+  }
+
+  @Test
+  public void testUnnestWithMultipleOrFiltersOnUnnestedColumns()
+  {
+    skipVectorize();
+    cannotVectorize();
+    testQuery(
+        "SELECT d3 FROM druid.numfoo, UNNEST(MV_TO_ARRAY(dim3)) as unnested (d3) where d3='b' or d3='d' ",
+        QUERY_CONTEXT_UNNEST,
+        ImmutableList.of(
+            Druids.newScanQueryBuilder()
+                  .dataSource(UnnestDataSource.create(
+                      new TableDataSource(CalciteTests.DATASOURCE3),
+                      expressionVirtualColumn("j0.unnest", "\"dim3\"", ColumnType.STRING),
+                      new InDimFilter("j0.unnest", ImmutableSet.of("b", "d"), null)
+                  ))
+                  .intervals(querySegmentSpec(Filtration.eternity()))
+                  .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
+                  .legacy(false)
+                  .context(QUERY_CONTEXT_UNNEST)
+                  .columns(ImmutableList.of("j0.unnest"))
+                  .build()
+        ),
+        ImmutableList.of(
+            new Object[]{"b"},
+            new Object[]{"b"},
+            new Object[]{"d"}
+        )
+    );
+  }
+
+  @Test
+  public void testUnnestWithMultipleOrFiltersOnVariationsOfUnnestedColumns()
+  {
+    skipVectorize();
+    cannotVectorize();
+    testQuery(
+        "SELECT d3 FROM druid.numfoo, UNNEST(MV_TO_ARRAY(dim3)) as unnested (d3) where strlen(d3) < 2 or d3='d' ",
+        QUERY_CONTEXT_UNNEST,
+        ImmutableList.of(
+            Druids.newScanQueryBuilder()
+                  .dataSource(UnnestDataSource.create(
+                      new TableDataSource(CalciteTests.DATASOURCE3),
+                      expressionVirtualColumn("j0.unnest", "\"dim3\"", ColumnType.STRING),
+                      or(
+                          new ExpressionDimFilter("(strlen(\"j0.unnest\") < 2)", TestExprMacroTable.INSTANCE),
+                          selector("j0.unnest", "d", null)
+                      )
+                  ))
+                  .intervals(querySegmentSpec(Filtration.eternity()))
+                  .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
+                  .legacy(false)
+                  .context(QUERY_CONTEXT_UNNEST)
+                  .columns(ImmutableList.of("j0.unnest"))
+                  .build()
+        ),
+        useDefault ?
+        ImmutableList.of(
+            new Object[]{"a"},
+            new Object[]{"b"},
+            new Object[]{"b"},
+            new Object[]{"c"},
+            new Object[]{"d"},
+            new Object[]{""},
+            new Object[]{""},
+            new Object[]{""}
+        ) :
+        ImmutableList.of(
+            new Object[]{"a"},
+            new Object[]{"b"},
+            new Object[]{"b"},
+            new Object[]{"c"},
+            new Object[]{"d"},
+            new Object[]{""}
+        )
+    );
+  }
+
+  @Test
+  public void testUnnestWithMultipleOrFiltersOnSelectedNonUnnestedColumns()
+  {
+    skipVectorize();
+    cannotVectorize();
+    testQuery(
+        "SELECT d3 FROM druid.numfoo, UNNEST(MV_TO_ARRAY(dim3)) as unnested (d3) where m1 < 2 or m2 < 2 ",
+        QUERY_CONTEXT_UNNEST,
+        ImmutableList.of(
+            Druids.newScanQueryBuilder()
+                  .dataSource(UnnestDataSource.create(
+                      new TableDataSource(CalciteTests.DATASOURCE3),
+                      expressionVirtualColumn("j0.unnest", "\"dim3\"", ColumnType.STRING),
+                      null
+                  ))
+                  .intervals(querySegmentSpec(Filtration.eternity()))
+                  .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
+                  .legacy(false)
+                  .context(QUERY_CONTEXT_UNNEST)
+                  .filters(
+                      or(
+                          bound("m1", null, "2", false, true, null, StringComparators.NUMERIC),
+                          bound("m2", null, "2", false, true, null, StringComparators.NUMERIC)
+                      )
+                  )
+                  .columns(ImmutableList.of("j0.unnest"))
+                  .build()
+        ),
+        ImmutableList.of(
+            new Object[]{"a"},
+            new Object[]{"b"}
+        )
+    );
+  }
+
+  @Test
+  public void testUnnestWithMultipleOrFiltersOnSelectedVirtualColumns()
+  {
+    skipVectorize();
+    cannotVectorize();
+    testQuery(
+        "SELECT d45 FROM druid.numfoo, UNNEST(ARRAY[dim4,dim5]) as unnested (d45) where d45 IN ('a','aa') or m1 < 2 ",
+        QUERY_CONTEXT_UNNEST,
+        ImmutableList.of(
+            Druids.newScanQueryBuilder()
+                  .dataSource(UnnestDataSource.create(
+                      new TableDataSource(CalciteTests.DATASOURCE3),
+                      expressionVirtualColumn("j0.unnest", "array(\"dim4\",\"dim5\")", ColumnType.STRING_ARRAY),
+                      null
+                  ))
+                  .intervals(querySegmentSpec(Filtration.eternity()))
+                  .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
+                  .legacy(false)
+                  .context(QUERY_CONTEXT_UNNEST)
+                  .filters(
+                      or(
+                          bound("m1", null, "2", false, true, null, StringComparators.NUMERIC),
+                          new InDimFilter("j0.unnest", ImmutableSet.of("a", "aa"), null)
+                      )
+                  )
+                  .columns(ImmutableList.of("j0.unnest"))
+                  .build()
+        ),
+        ImmutableList.of(
+            new Object[]{"a"},
+            new Object[]{"aa"},
+            new Object[]{"a"},
+            new Object[]{"a"},
+            new Object[]{"aa"}
+        )
+    );
+  }
+
+  @Test
+  public void testUnnestWithMultipleOrFiltersOnUnnestedColumnsAndOnOriginalColumn()
+  {
+    skipVectorize();
+    cannotVectorize();
+    testQuery(
+        "SELECT d3 FROM druid.numfoo, UNNEST(MV_TO_ARRAY(dim3)) as unnested (d3) where d3='b' or dim3='d' ",
+        QUERY_CONTEXT_UNNEST,
+        ImmutableList.of(
+            Druids.newScanQueryBuilder()
+                  .dataSource(UnnestDataSource.create(
+                      new TableDataSource(CalciteTests.DATASOURCE3),
+                      expressionVirtualColumn("j0.unnest", "\"dim3\"", ColumnType.STRING),
+                      null
+                  ))
+                  .intervals(querySegmentSpec(Filtration.eternity()))
+                  .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
+                  .filters(
+                      or(
+                          selector("j0.unnest", "b", null),
+                          selector("dim3", "d", null)
+                      )
+                  )
+                  .legacy(false)
+                  .context(QUERY_CONTEXT_UNNEST)
+                  .columns(ImmutableList.of("j0.unnest"))
+                  .build()
+        ),
+        ImmutableList.of(
+            new Object[]{"b"},
+            new Object[]{"b"},
+            new Object[]{"d"}
+        )
+    );
+  }
+
+  @Test
+  public void testUnnestWithMultipleOrFiltersOnUnnestedColumnsAndOnOriginalColumnDiffOrdering()
+  {
+    skipVectorize();
+    cannotVectorize();
+    testQuery(
+        "SELECT dim3, d3 FROM druid.numfoo, UNNEST(MV_TO_ARRAY(dim3)) as unnested (d3) where dim3='b' or d3='a' ",
+        QUERY_CONTEXT_UNNEST,
+        ImmutableList.of(
+            Druids.newScanQueryBuilder()
+                  .dataSource(UnnestDataSource.create(
+                      new TableDataSource(CalciteTests.DATASOURCE3),
+                      expressionVirtualColumn("j0.unnest", "\"dim3\"", ColumnType.STRING),
+                      null
+                  ))
+                  .intervals(querySegmentSpec(Filtration.eternity()))
+                  .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
+                  .filters(
+                      or(
+                          selector("dim3", "b", null),
+                          selector("j0.unnest", "a", null)
+                      )
+                  )
+                  .legacy(false)
+                  .context(QUERY_CONTEXT_UNNEST)
+                  .columns(ImmutableList.of("dim3", "j0.unnest"))
+                  .build()
+        ),
+        ImmutableList.of(
+            new Object[]{"[\"a\",\"b\"]", "a"},
+            new Object[]{"[\"a\",\"b\"]", "b"},
+            new Object[]{"[\"b\",\"c\"]", "b"},
+            new Object[]{"[\"b\",\"c\"]", "c"}
+        )
+    );
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org