You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ro...@apache.org on 2022/12/16 20:44:58 UTC

[pinot] branch master updated: [multistage] [feature] Support Right join and Full join and inEqui mix. (#9907)

This is an automated email from the ASF dual-hosted git repository.

rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 54046e1547 [multistage] [feature] Support Right join and Full join and inEqui mix. (#9907)
54046e1547 is described below

commit 54046e1547a139a04d62bdf1c365eb6f0166adad
Author: Yao Liu <ya...@startree.ai>
AuthorDate: Fri Dec 16 12:44:51 2022 -0800

    [multistage] [feature] Support Right join and Full join and inEqui mix. (#9907)
---
 .../query/runtime/blocks/TransferableBlock.java    |   8 +
 .../query/runtime/operator/HashJoinOperator.java   | 147 +++++++++++----
 .../query/runtime/plan/PhysicalPlanVisitor.java    |   3 +-
 .../pinot/query/service/QueryDispatcher.java       |   2 -
 .../pinot/query/runtime/QueryRunnerTestBase.java   |   2 +-
 .../runtime/operator/HashJoinOperatorTest.java     | 171 ++++++++++++-----
 .../runtime/queries/ResourceBasedQueriesTest.java  |  22 +--
 .../test/resources/queries/FromExpressions.json    | 206 ++++++++++++++++++++-
 8 files changed, 453 insertions(+), 108 deletions(-)

diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlock.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlock.java
index ec89f0d1c9..109764bf02 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlock.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlock.java
@@ -146,10 +146,18 @@ public class TransferableBlock implements Block {
    *
    * @return whether this block is the end of stream.
    */
+  // TODO: Update the name to isTerminateBlock.
   public boolean isEndOfStreamBlock() {
     return isType(MetadataBlock.MetadataBlockType.ERROR) || isType(MetadataBlock.MetadataBlockType.EOS);
   }
 
+  /**
+   * @return true when the block is a real end of stream block instead of error block.
+   */
+  public boolean isSuccessfulEndOfStreamBlock() {
+    return isType(MetadataBlock.MetadataBlockType.EOS);
+  }
+
   /**
    * @return whether this block represents a NOOP block
    */
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
index 89ff2f1abb..7fe3902f61 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
@@ -1,3 +1,5 @@
+
+
 /**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -21,9 +23,10 @@ package org.apache.pinot.query.runtime.operator;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableSet;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import javax.annotation.Nullable;
 import org.apache.calcite.rel.core.JoinRelType;
@@ -40,53 +43,76 @@ import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
 import org.apache.pinot.query.runtime.operator.operands.TransformOperand;
 import org.apache.pinot.query.runtime.operator.utils.FunctionInvokeUtils;
 
-
 /**
  * This basic {@code BroadcastJoinOperator} implement a basic broadcast join algorithm.
+ * This algorithm assumes that the broadcast table has to fit in memory since we are not supporting any spilling.
  *
+ * For left join, inner join, right join and full join,
  * <p>It takes the right table as the broadcast side and materialize a hash table. Then for each of the left table row,
  * it looks up for the corresponding row(s) from the hash table and create a joint row.
  *
  * <p>For each of the data block received from the left table, it will generate a joint data block.
- *
- * We currently support left join, inner join and semi join.
+ * We currently support left join, inner join, right join and full join.
  * The output is in the format of [left_row, right_row]
  */
 // TODO: Move inequi out of hashjoin. (https://github.com/apache/pinot/issues/9728)
 public class HashJoinOperator extends BaseOperator<TransferableBlock> {
   private static final String EXPLAIN_NAME = "HASH_JOIN";
-  private static final Set<JoinRelType> SUPPORTED_JOIN_TYPES = ImmutableSet.of(JoinRelType.INNER, JoinRelType.LEFT);
-  private final HashMap<Key, List<Object[]>> _broadcastHashTable;
+  private static final Set<JoinRelType> SUPPORTED_JOIN_TYPES =
+      ImmutableSet.of(JoinRelType.INNER, JoinRelType.LEFT, JoinRelType.RIGHT, JoinRelType.FULL);
+
+  private final HashMap<Key, List<Object[]>> _broadcastRightTable;
+
+  // Used to track matched right rows.
+  // Only used for right join and full join to output non-matched right rows.
+  // TODO: Replace hashset with rolling bit map.
+  private final HashMap<Key, HashSet<Integer>> _matchedRightRows;
+
   private final Operator<TransferableBlock> _leftTableOperator;
   private final Operator<TransferableBlock> _rightTableOperator;
   private final JoinRelType _joinType;
   private final DataSchema _resultSchema;
+  private final int _leftRowSize;
   private final int _resultRowSize;
   private final List<TransformOperand> _joinClauseEvaluators;
   private boolean _isHashTableBuilt;
+
+  // Used by non-inner join.
+  // Needed to indicate we have finished processing all results after returning last block.
+  // TODO: Remove this special handling by fixing data block EOS abstraction or operator's invariant.
+  private boolean _isTerminated;
   private TransferableBlock _upstreamErrorBlock;
   private KeySelector<Object[], Object[]> _leftKeySelector;
   private KeySelector<Object[], Object[]> _rightKeySelector;
 
   public HashJoinOperator(Operator<TransferableBlock> leftTableOperator, Operator<TransferableBlock> rightTableOperator,
-      DataSchema outputSchema, JoinNode.JoinKeys joinKeys, List<RexExpression> joinClauses, JoinRelType joinType) {
-    Preconditions.checkState(SUPPORTED_JOIN_TYPES.contains(joinType),
-        "Join type: " + joinType + " is not supported!");
-    _leftKeySelector = joinKeys.getLeftJoinKeySelector();
-    _rightKeySelector = joinKeys.getRightJoinKeySelector();
+      DataSchema leftSchema, JoinNode node) {
+    Preconditions.checkState(SUPPORTED_JOIN_TYPES.contains(node.getJoinRelType()),
+        "Join type: " + node.getJoinRelType() + " is not supported!");
+    _joinType = node.getJoinRelType();
+    _leftKeySelector = node.getJoinKeys().getLeftJoinKeySelector();
+    _rightKeySelector = node.getJoinKeys().getRightJoinKeySelector();
     Preconditions.checkState(_leftKeySelector != null, "LeftKeySelector for join cannot be null");
     Preconditions.checkState(_rightKeySelector != null, "RightKeySelector for join cannot be null");
+    _leftRowSize = leftSchema.size();
+    Preconditions.checkState(_leftRowSize > 0, "leftRowSize has to be greater than zero:" + _leftRowSize);
+    _resultSchema = node.getDataSchema();
+    _resultRowSize = _resultSchema.size();
+    Preconditions.checkState(_resultRowSize > _leftRowSize,
+        "Result row size" + _leftRowSize + " has to be greater than left row size:" + _leftRowSize);
     _leftTableOperator = leftTableOperator;
     _rightTableOperator = rightTableOperator;
-    _resultSchema = outputSchema;
-    _joinClauseEvaluators = new ArrayList<>(joinClauses.size());
-    for (RexExpression joinClause : joinClauses) {
+    _joinClauseEvaluators = new ArrayList<>(node.getJoinClauses().size());
+    for (RexExpression joinClause : node.getJoinClauses()) {
       _joinClauseEvaluators.add(TransformOperand.toTransformOperand(joinClause, _resultSchema));
     }
-    _joinType = joinType;
-    _resultRowSize = _resultSchema.size();
     _isHashTableBuilt = false;
-    _broadcastHashTable = new HashMap<>();
+    _broadcastRightTable = new HashMap<>();
+    if (needUnmatchedRightRows()) {
+      _matchedRightRows = new HashMap<>();
+    } else {
+      _matchedRightRows = null;
+    }
     _upstreamErrorBlock = null;
   }
 
@@ -105,6 +131,9 @@ public class HashJoinOperator extends BaseOperator<TransferableBlock> {
   @Override
   protected TransferableBlock getNextBlock() {
     try {
+      if (_isTerminated) {
+        return TransferableBlockUtils.getEndOfStreamTransferableBlock();
+      }
       if (!_isHashTableBuilt) {
         // Build JOIN hash table
         buildBroadcastHashTable();
@@ -124,22 +153,19 @@ public class HashJoinOperator extends BaseOperator<TransferableBlock> {
   private void buildBroadcastHashTable() {
     TransferableBlock rightBlock = _rightTableOperator.nextBlock();
     while (!rightBlock.isNoOpBlock()) {
-
       if (rightBlock.isErrorBlock()) {
         _upstreamErrorBlock = rightBlock;
         return;
       }
-
       if (TransferableBlockUtils.isEndOfStream(rightBlock)) {
         _isHashTableBuilt = true;
         return;
       }
-
       List<Object[]> container = rightBlock.getContainer();
       // put all the rows into corresponding hash collections keyed by the key selector function.
       for (Object[] row : container) {
-        List<Object[]> hashCollection = _broadcastHashTable.computeIfAbsent(
-            new Key(_rightKeySelector.getKey(row)), k -> new ArrayList<>());
+        List<Object[]> hashCollection =
+            _broadcastRightTable.computeIfAbsent(new Key(_rightKeySelector.getKey(row)), k -> new ArrayList<>());
         hashCollection.add(row);
       }
 
@@ -152,39 +178,74 @@ public class HashJoinOperator extends BaseOperator<TransferableBlock> {
     if (leftBlock.isErrorBlock()) {
       _upstreamErrorBlock = leftBlock;
       return _upstreamErrorBlock;
-    } else if (TransferableBlockUtils.isNoOpBlock(leftBlock) || TransferableBlockUtils.isEndOfStream(leftBlock)) {
+    }
+    if (leftBlock.isNoOpBlock() || (leftBlock.isSuccessfulEndOfStreamBlock() && !needUnmatchedRightRows())) {
       return leftBlock;
     }
+    // TODO: Moved to a different function.
+    if (leftBlock.isSuccessfulEndOfStreamBlock() && needUnmatchedRightRows()) {
+      // Return remaining non-matched rows for non-inner join.
+      List<Object[]> returnRows = new ArrayList<>();
+      for (Map.Entry<Key, List<Object[]>> entry : _broadcastRightTable.entrySet()) {
+        Set<Integer> matchedIdx = _matchedRightRows.getOrDefault(entry.getKey(), new HashSet<>());
+        List<Object[]> rightRows = entry.getValue();
+        if (rightRows.size() == matchedIdx.size()) {
+          continue;
+        }
+        for (int i = 0; i < rightRows.size(); i++) {
+          if (!matchedIdx.contains(i)) {
+            returnRows.add(joinRow(null, rightRows.get(i)));
+          }
+        }
+      }
+      _isTerminated = true;
+      return new TransferableBlock(returnRows, _resultSchema, DataBlock.Type.ROW);
+    }
     List<Object[]> rows = new ArrayList<>();
     List<Object[]> container = leftBlock.isEndOfStreamBlock() ? new ArrayList<>() : leftBlock.getContainer();
     for (Object[] leftRow : container) {
+      Key key = new Key(_leftKeySelector.getKey(leftRow));
       // NOTE: Empty key selector will always give same hash code.
-      List<Object[]> hashCollection =
-          _broadcastHashTable.getOrDefault(new Key(_leftKeySelector.getKey(leftRow)), Collections.emptyList());
-      // If it is a left join and right table is empty, we return left rows.
-      if (hashCollection.isEmpty() && _joinType == JoinRelType.LEFT) {
-        rows.add(joinRow(leftRow, null));
-      } else {
-        // If it is other type of join.
-        for (Object[] rightRow : hashCollection) {
-          // TODO: Optimize this to avoid unnecessary object copy.
-          Object[] resultRow = joinRow(leftRow, rightRow);
-          if (_joinClauseEvaluators.isEmpty() || _joinClauseEvaluators.stream().allMatch(evaluator ->
-              (Boolean) FunctionInvokeUtils.convert(evaluator.apply(resultRow), DataSchema.ColumnDataType.BOOLEAN))) {
-            rows.add(resultRow);
+      List<Object[]> matchedRightRows = _broadcastRightTable.getOrDefault(key, null);
+      if (matchedRightRows == null) {
+        if (needUnmatchedLeftRows()) {
+          rows.add(joinRow(leftRow, null));
+        }
+        continue;
+      }
+      boolean hasMatchForLeftRow = false;
+      for (int i = 0; i < matchedRightRows.size(); i++) {
+        Object[] rightRow = matchedRightRows.get(i);
+        // TODO: Optimize this to avoid unnecessary object copy.
+        Object[] resultRow = joinRow(leftRow, rightRow);
+        if (_joinClauseEvaluators.isEmpty() || _joinClauseEvaluators.stream().allMatch(
+            evaluator -> (Boolean) FunctionInvokeUtils.convert(evaluator.apply(resultRow),
+                DataSchema.ColumnDataType.BOOLEAN))) {
+          rows.add(resultRow);
+          hasMatchForLeftRow = true;
+          if (_matchedRightRows != null) {
+            HashSet<Integer> matchedRows = _matchedRightRows.computeIfAbsent(key, k -> new HashSet<>());
+            matchedRows.add(i);
           }
         }
       }
+      if (!hasMatchForLeftRow && needUnmatchedLeftRows()) {
+        rows.add(joinRow(leftRow, null));
+      }
     }
     return new TransferableBlock(rows, _resultSchema, DataBlock.Type.ROW);
   }
 
-  private Object[] joinRow(Object[] leftRow, @Nullable Object[] rightRow) {
+  private Object[] joinRow(@Nullable Object[] leftRow, @Nullable Object[] rightRow) {
     Object[] resultRow = new Object[_resultRowSize];
     int idx = 0;
-    for (Object obj : leftRow) {
-      resultRow[idx++] = obj;
+    if (leftRow != null) {
+      for (Object obj : leftRow) {
+        resultRow[idx++] = obj;
+      }
     }
+    // This is needed since left row can be null and we need to advance the idx to the beginning of right row.
+    idx = _leftRowSize;
     if (rightRow != null) {
       for (Object obj : rightRow) {
         resultRow[idx++] = obj;
@@ -192,4 +253,12 @@ public class HashJoinOperator extends BaseOperator<TransferableBlock> {
     }
     return resultRow;
   }
+
+  private boolean needUnmatchedRightRows() {
+    return _joinType == JoinRelType.RIGHT || _joinType == JoinRelType.FULL;
+  }
+
+  private boolean needUnmatchedLeftRows() {
+    return _joinType == JoinRelType.LEFT || _joinType == JoinRelType.FULL;
+  }
 }
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
index 6754b675af..2ba2cc7afc 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
@@ -102,8 +102,7 @@ public class PhysicalPlanVisitor implements StageNodeVisitor<Operator<Transferab
     Operator<TransferableBlock> leftOperator = left.visit(this, context);
     Operator<TransferableBlock> rightOperator = right.visit(this, context);
 
-    return new HashJoinOperator(leftOperator, rightOperator, node.getDataSchema(), node.getJoinKeys(),
-        node.getJoinClauses(), node.getJoinRelType());
+    return new HashJoinOperator(leftOperator, rightOperator, left.getDataSchema(), node);
   }
 
   @Override
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java
index f22b83773d..e29acb7206 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java
@@ -129,10 +129,8 @@ public class QueryDispatcher {
       } else if (transferableBlock.isEndOfStreamBlock()) {
         return resultDataBlocks;
       }
-
       resultDataBlocks.add(transferableBlock.getDataBlock());
     }
-
     throw new RuntimeException("Timed out while receiving from mailbox: " + QueryException.EXECUTION_TIMEOUT_ERROR);
   }
 
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java
index 65fe73e29d..090f4ded6f 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java
@@ -351,7 +351,7 @@ public abstract class QueryRunnerTestBase extends QueryTestSet {
       @JsonProperty("description")
       public String _description;
       @JsonProperty("outputs")
-      public List<List<Object>> _outputs = Collections.emptyList();
+      public List<List<Object>> _outputs = null;
       @JsonProperty("expectedException")
       public String _expectedException;
     }
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java
index 762d28fdfa..0a3ae48974 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java
@@ -89,8 +89,9 @@ public class HashJoinOperatorTest {
             DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.INT,
             DataSchema.ColumnDataType.STRING
         });
-    HashJoinOperator joinOnString = new HashJoinOperator(_leftOperator, _rightOperator, resultSchema,
-        getJoinKeys(Arrays.asList(1), Arrays.asList(1)), joinClauses, JoinRelType.INNER);
+    JoinNode node =
+        new JoinNode(1, resultSchema, JoinRelType.INNER, getJoinKeys(Arrays.asList(1), Arrays.asList(1)), joinClauses);
+    HashJoinOperator joinOnString = new HashJoinOperator(_leftOperator, _rightOperator, leftSchema, node);
 
     TransferableBlock result = joinOnString.nextBlock();
     while (result.isNoOpBlock()) {
@@ -125,8 +126,9 @@ public class HashJoinOperatorTest {
             DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.INT,
             DataSchema.ColumnDataType.STRING
         });
-    HashJoinOperator joinOnInt = new HashJoinOperator(_leftOperator, _rightOperator, resultSchema,
-        getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses, JoinRelType.INNER);
+    JoinNode node =
+        new JoinNode(1, resultSchema, JoinRelType.INNER, getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses);
+    HashJoinOperator joinOnInt = new HashJoinOperator(_leftOperator, _rightOperator, leftSchema, node);
     TransferableBlock result = joinOnInt.nextBlock();
     while (result.isNoOpBlock()) {
       result = joinOnInt.nextBlock();
@@ -158,8 +160,9 @@ public class HashJoinOperatorTest {
             DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.INT,
             DataSchema.ColumnDataType.STRING
         });
-    HashJoinOperator joinOnInt = new HashJoinOperator(_leftOperator, _rightOperator, resultSchema,
-        getJoinKeys(new ArrayList<>(), new ArrayList<>()), joinClauses, JoinRelType.INNER);
+    JoinNode node = new JoinNode(1, resultSchema, JoinRelType.INNER, getJoinKeys(new ArrayList<>(), new ArrayList<>()),
+        joinClauses);
+    HashJoinOperator joinOnInt = new HashJoinOperator(_leftOperator, _rightOperator, leftSchema, node);
     TransferableBlock result = joinOnInt.nextBlock();
     while (result.isNoOpBlock()) {
       result = joinOnInt.nextBlock();
@@ -198,8 +201,9 @@ public class HashJoinOperatorTest {
             DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.INT,
             DataSchema.ColumnDataType.STRING
         });
-    HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, resultSchema,
-        getJoinKeys(Arrays.asList(1), Arrays.asList(1)), joinClauses, JoinRelType.LEFT);
+    JoinNode node =
+        new JoinNode(1, resultSchema, JoinRelType.LEFT, getJoinKeys(Arrays.asList(1), Arrays.asList(1)), joinClauses);
+    HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, leftSchema, node);
 
     TransferableBlock result = join.nextBlock();
     while (result.isNoOpBlock()) {
@@ -214,6 +218,9 @@ public class HashJoinOperatorTest {
 
   @Test
   public void shouldPassLeftTableEOS() {
+    DataSchema leftSchema = new DataSchema(new String[]{"int_col", "string_col"}, new DataSchema.ColumnDataType[]{
+        DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING
+    });
     DataSchema rightSchema = new DataSchema(new String[]{"int_col", "string_col"}, new DataSchema.ColumnDataType[]{
         DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING
     });
@@ -222,14 +229,15 @@ public class HashJoinOperatorTest {
             OperatorTestUtil.block(rightSchema, new Object[]{1, "BB"}, new Object[]{1, "CC"}, new Object[]{3, "BB"}))
         .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
 
-    List<RexExpression> joinClauses = new ArrayList<>();
     DataSchema resultSchema = new DataSchema(new String[]{"int_col1", "string_col1", "int_co2", "string_col2"},
         new DataSchema.ColumnDataType[]{
             DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.INT,
             DataSchema.ColumnDataType.STRING
         });
-    HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, resultSchema,
-        getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses, JoinRelType.INNER);
+    List<RexExpression> joinClauses = new ArrayList<>();
+    JoinNode node =
+        new JoinNode(1, resultSchema, JoinRelType.INNER, getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses);
+    HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, leftSchema, node);
 
     TransferableBlock result = join.nextBlock();
     while (result.isNoOpBlock()) {
@@ -258,8 +266,9 @@ public class HashJoinOperatorTest {
             DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.INT,
             DataSchema.ColumnDataType.STRING
         });
-    HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, resultSchema,
-        getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses, JoinRelType.LEFT);
+    JoinNode node =
+        new JoinNode(1, resultSchema, JoinRelType.LEFT, getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses);
+    HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, leftSchema, node);
 
     TransferableBlock result = join.nextBlock();
     while (result.isNoOpBlock()) {
@@ -274,6 +283,9 @@ public class HashJoinOperatorTest {
 
   @Test
   public void shouldPassRightTableEOS() {
+    DataSchema leftSchema = new DataSchema(new String[]{"int_col", "string_col"}, new DataSchema.ColumnDataType[]{
+        DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING
+    });
     DataSchema rightSchema = new DataSchema(new String[]{"int_col", "string_col"}, new DataSchema.ColumnDataType[]{
         DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING
     });
@@ -288,8 +300,9 @@ public class HashJoinOperatorTest {
             DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.INT,
             DataSchema.ColumnDataType.STRING
         });
-    HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, resultSchema,
-        getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses, JoinRelType.INNER);
+    JoinNode node =
+        new JoinNode(1, resultSchema, JoinRelType.INNER, getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses);
+    HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, leftSchema, node);
 
     TransferableBlock result = join.nextBlock();
     while (result.isNoOpBlock()) {
@@ -325,9 +338,9 @@ public class HashJoinOperatorTest {
             DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.INT,
             DataSchema.ColumnDataType.STRING
         });
-    HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, resultSchema,
-        getJoinKeys(new ArrayList<>(), new ArrayList<>()), joinClauses, JoinRelType.INNER);
-
+    JoinNode node = new JoinNode(1, resultSchema, JoinRelType.INNER, getJoinKeys(new ArrayList<>(), new ArrayList<>()),
+        joinClauses);
+    HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, leftSchema, node);
     TransferableBlock result = join.nextBlock();
     while (result.isNoOpBlock()) {
       result = join.nextBlock();
@@ -363,9 +376,9 @@ public class HashJoinOperatorTest {
             DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.INT,
             DataSchema.ColumnDataType.STRING
         });
-    HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, resultSchema,
-        getJoinKeys(new ArrayList<>(), new ArrayList<>()), joinClauses, JoinRelType.INNER);
-
+    JoinNode node = new JoinNode(1, resultSchema, JoinRelType.INNER, getJoinKeys(new ArrayList<>(), new ArrayList<>()),
+        joinClauses);
+    HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, leftSchema, node);
     TransferableBlock result = join.nextBlock();
     while (result.isNoOpBlock()) {
       result = join.nextBlock();
@@ -377,9 +390,8 @@ public class HashJoinOperatorTest {
     Assert.assertEquals(resultRows.get(1), expectedRows.get(1));
   }
 
-  @Test(expectedExceptions = IllegalStateException.class, expectedExceptionsMessageRegExp = ".*RIGHT is not supported"
-      + ".*")
-  public void shouldThrowOnRightJoin() {
+  @Test
+  public void shouldHandleRightJoin() {
     DataSchema leftSchema = new DataSchema(new String[]{"int_col", "string_col"}, new DataSchema.ColumnDataType[]{
         DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING
     });
@@ -398,8 +410,33 @@ public class HashJoinOperatorTest {
         DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.INT,
         DataSchema.ColumnDataType.STRING
     });
-    HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, resultSchema,
-        getJoinKeys(Arrays.asList(1), Arrays.asList(1)), joinClauses, JoinRelType.RIGHT);
+    JoinNode node =
+        new JoinNode(1, resultSchema, JoinRelType.RIGHT, getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses);
+    HashJoinOperator joinOnNum = new HashJoinOperator(_leftOperator, _rightOperator, leftSchema, node);
+    TransferableBlock result = joinOnNum.nextBlock();
+    while (result.isNoOpBlock()) {
+      result = joinOnNum.nextBlock();
+    }
+    List<Object[]> resultRows = result.getContainer();
+    List<Object[]> expectedRows = Arrays.asList(new Object[]{2, "BB", 2, "Aa"}, new Object[]{2, "BB", 2, "BB"});
+    Assert.assertEquals(resultRows.size(), expectedRows.size());
+    Assert.assertEquals(resultRows.get(0), expectedRows.get(0));
+    Assert.assertEquals(resultRows.get(1), expectedRows.get(1));
+    // Second block should be non-matched broadcast rows
+    result = joinOnNum.nextBlock();
+    while (result.isNoOpBlock()) {
+      result = joinOnNum.nextBlock();
+    }
+    resultRows = result.getContainer();
+    expectedRows = ImmutableList.of(new Object[]{null, null, 3, "BB"});
+    Assert.assertEquals(resultRows.size(), expectedRows.size());
+    Assert.assertEquals(resultRows.get(0), expectedRows.get(0));
+    // Third block is EOS block.
+    result = joinOnNum.nextBlock();
+    while (result.isNoOpBlock()) {
+      result = joinOnNum.nextBlock();
+    }
+    Assert.assertTrue(result.isSuccessfulEndOfStreamBlock());
   }
 
   @Test(expectedExceptions = IllegalStateException.class, expectedExceptionsMessageRegExp = ".*SEMI is not "
@@ -423,32 +460,60 @@ public class HashJoinOperatorTest {
         DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.INT,
         DataSchema.ColumnDataType.STRING
     });
-    HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, resultSchema,
-        getJoinKeys(Arrays.asList(1), Arrays.asList(1)), joinClauses, JoinRelType.SEMI);
+    JoinNode node =
+        new JoinNode(1, resultSchema, JoinRelType.SEMI, getJoinKeys(Arrays.asList(1), Arrays.asList(1)), joinClauses);
+    HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, leftSchema, node);
   }
 
-  @Test(expectedExceptions = IllegalStateException.class, expectedExceptionsMessageRegExp = ".*FULL is not supported.*")
-  public void shouldThrowOnFullJoin() {
+  @Test
+  public void shouldHandleFullJoin() {
     DataSchema leftSchema = new DataSchema(new String[]{"int_col", "string_col"}, new DataSchema.ColumnDataType[]{
         DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING
     });
     DataSchema rightSchema = new DataSchema(new String[]{"int_col", "string_col"}, new DataSchema.ColumnDataType[]{
         DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING
     });
-    Mockito.when(_leftOperator.nextBlock())
-        .thenReturn(OperatorTestUtil.block(leftSchema, new Object[]{1, "Aa"}, new Object[]{2, "BB"}))
+    Mockito.when(_leftOperator.nextBlock()).thenReturn(
+            OperatorTestUtil.block(leftSchema, new Object[]{1, "Aa"}, new Object[]{2, "BB"}, new Object[]{4, "CC"}))
         .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
     Mockito.when(_rightOperator.nextBlock()).thenReturn(
             OperatorTestUtil.block(rightSchema, new Object[]{2, "Aa"}, new Object[]{2, "BB"}, new Object[]{3, "BB"}))
         .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
-
     List<RexExpression> joinClauses = new ArrayList<>();
     DataSchema resultSchema = new DataSchema(new String[]{"foo", "bar", "foo", "bar"}, new DataSchema.ColumnDataType[]{
         DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.INT,
         DataSchema.ColumnDataType.STRING
     });
-    HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, resultSchema,
-        getJoinKeys(Arrays.asList(1), Arrays.asList(1)), joinClauses, JoinRelType.FULL);
+    JoinNode node =
+        new JoinNode(1, resultSchema, JoinRelType.FULL, getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses);
+    HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, leftSchema, node);
+    TransferableBlock result = join.nextBlock();
+    while (result.isNoOpBlock()) {
+      result = join.nextBlock();
+    }
+    List<Object[]> resultRows = result.getContainer();
+    List<Object[]> expectedRows = ImmutableList.of(new Object[]{1, "Aa", null, null}, new Object[]{2, "BB", 2, "Aa"},
+        new Object[]{2, "BB", 2, "BB"}, new Object[]{4, "CC", null, null});
+    Assert.assertEquals(resultRows.size(), expectedRows.size());
+    Assert.assertEquals(resultRows.get(0), expectedRows.get(0));
+    Assert.assertEquals(resultRows.get(1), expectedRows.get(1));
+    Assert.assertEquals(resultRows.get(2), expectedRows.get(2));
+    Assert.assertEquals(resultRows.get(3), expectedRows.get(3));
+    // Second block should be non-matched broadcast rows
+    result = join.nextBlock();
+    while (result.isNoOpBlock()) {
+      result = join.nextBlock();
+    }
+    resultRows = result.getContainer();
+    expectedRows = ImmutableList.of(new Object[]{null, null, 3, "BB"});
+    Assert.assertEquals(resultRows.size(), expectedRows.size());
+    Assert.assertEquals(resultRows.get(0), expectedRows.get(0));
+    // Third block is EOS block.
+    result = join.nextBlock();
+    while (result.isNoOpBlock()) {
+      result = join.nextBlock();
+    }
+    Assert.assertTrue(result.isSuccessfulEndOfStreamBlock());
   }
 
   @Test(expectedExceptions = IllegalStateException.class, expectedExceptionsMessageRegExp = ".*ANTI is not supported.*")
@@ -471,12 +536,16 @@ public class HashJoinOperatorTest {
         DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.INT,
         DataSchema.ColumnDataType.STRING
     });
-    HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, resultSchema,
-        getJoinKeys(Arrays.asList(1), Arrays.asList(1)), joinClauses, JoinRelType.ANTI);
+    JoinNode node =
+        new JoinNode(1, resultSchema, JoinRelType.ANTI, getJoinKeys(Arrays.asList(1), Arrays.asList(1)), joinClauses);
+    HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, leftSchema, node);
   }
 
   @Test
   public void shouldPropagateRightTableError() {
+    DataSchema leftSchema = new DataSchema(new String[]{"int_col", "string_col"}, new DataSchema.ColumnDataType[]{
+        DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING
+    });
     DataSchema rightSchema = new DataSchema(new String[]{"int_col", "string_col"}, new DataSchema.ColumnDataType[]{
         DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING
     });
@@ -492,8 +561,9 @@ public class HashJoinOperatorTest {
             DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.INT,
             DataSchema.ColumnDataType.STRING
         });
-    HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, resultSchema,
-        getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses, JoinRelType.INNER);
+    JoinNode node =
+        new JoinNode(1, resultSchema, JoinRelType.INNER, getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses);
+    HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, leftSchema, node);
 
     TransferableBlock result = join.nextBlock();
     while (result.isNoOpBlock()) {
@@ -506,6 +576,9 @@ public class HashJoinOperatorTest {
 
   @Test
   public void shouldPropagateLeftTableError() {
+    DataSchema leftSchema = new DataSchema(new String[]{"int_col", "string_col"}, new DataSchema.ColumnDataType[]{
+        DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING
+    });
     DataSchema rightSchema = new DataSchema(new String[]{"int_col", "string_col"}, new DataSchema.ColumnDataType[]{
         DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING
     });
@@ -521,8 +594,9 @@ public class HashJoinOperatorTest {
             DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.INT,
             DataSchema.ColumnDataType.STRING
         });
-    HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, resultSchema,
-        getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses, JoinRelType.INNER);
+    JoinNode node =
+        new JoinNode(1, resultSchema, JoinRelType.INNER, getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses);
+    HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, leftSchema, node);
 
     TransferableBlock result = join.nextBlock();
     while (result.isNoOpBlock()) {
@@ -535,12 +609,15 @@ public class HashJoinOperatorTest {
 
   @Test
   public void shouldHandleNoOpBlock() {
+    DataSchema leftSchema = new DataSchema(new String[]{"int_col", "string_col"}, new DataSchema.ColumnDataType[]{
+        DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING
+    });
     DataSchema rightSchema = new DataSchema(new String[]{"int_col", "string_col"}, new DataSchema.ColumnDataType[]{
         DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING
     });
-    Mockito.when(_leftOperator.nextBlock()).thenReturn(OperatorTestUtil.block(rightSchema, new Object[]{2, "BB"}))
+    Mockito.when(_leftOperator.nextBlock()).thenReturn(OperatorTestUtil.block(leftSchema, new Object[]{2, "BB"}))
         .thenReturn(TransferableBlockUtils.getNoOpTransferableBlock())
-        .thenReturn(OperatorTestUtil.block(rightSchema, new Object[]{2, "CC"}))
+        .thenReturn(OperatorTestUtil.block(leftSchema, new Object[]{2, "CC"}))
         .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
     Mockito.when(_rightOperator.nextBlock()).thenReturn(OperatorTestUtil.block(rightSchema, new Object[]{1, "BB"}))
         .thenReturn(TransferableBlockUtils.getNoOpTransferableBlock())
@@ -553,14 +630,15 @@ public class HashJoinOperatorTest {
             DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.INT,
             DataSchema.ColumnDataType.STRING
         });
-    HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, resultSchema,
-        getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses, JoinRelType.INNER);
+    JoinNode node =
+        new JoinNode(1, resultSchema, JoinRelType.INNER, getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses);
+    HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, leftSchema, node);
 
     TransferableBlock result = join.nextBlock(); // first no-op consumes first right data block.
     Assert.assertTrue(result.isNoOpBlock());
     result = join.nextBlock(); // second no-op consumes no-op right block.
     Assert.assertTrue(result.isNoOpBlock());
-    result = join.nextBlock(); // third -op consumes two right data blocks and builds result
+    result = join.nextBlock(); // third no-op consumes two right data blocks and builds result
     List<Object[]> resultRows = result.getContainer();
     List<Object[]> expectedRows = ImmutableList.of(new Object[]{2, "BB", 2, "Aa"});
     Assert.assertEquals(resultRows.size(), expectedRows.size());
@@ -576,3 +654,4 @@ public class HashJoinOperatorTest {
     Assert.assertTrue(result.isEndOfStreamBlock());
   }
 }
+// TODO: Add more inequi join tests.
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java
index 78553b5e1b..a5419afd80 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java
@@ -129,10 +129,10 @@ public class ResourceBasedQueriesTest extends QueryRunnerTestBase {
         factory2.addSegment(tableNameWithType, rows2);
       }
 
-      boolean anyHaveOutput = testCase._queries.stream().anyMatch(q -> q._outputs != null && !q._outputs.isEmpty());
+      boolean anyHaveOutput = testCase._queries.stream().anyMatch(q -> q._outputs != null);
 
       if (anyHaveOutput) {
-        boolean allHaveOutput = testCase._queries.stream().allMatch(q -> q._outputs != null && !q._outputs.isEmpty());
+        boolean allHaveOutput = testCase._queries.stream().allMatch(q -> q._outputs != null);
         if (!allHaveOutput) {
           throw new IllegalArgumentException("Cannot support one test where some queries require H2 and others don't");
         }
@@ -161,9 +161,9 @@ public class ResourceBasedQueriesTest extends QueryRunnerTestBase {
         ignored -> { });
     _mailboxService.start();
 
-    _queryEnvironment = QueryEnvironmentTestBase.getQueryEnvironment(_reducerGrpcPort, server1.getPort(),
-        server2.getPort(), factory1.buildSchemaMap(), factory1.buildTableSegmentNameMap(),
-        factory2.buildTableSegmentNameMap());
+    _queryEnvironment =
+        QueryEnvironmentTestBase.getQueryEnvironment(_reducerGrpcPort, server1.getPort(), server2.getPort(),
+            factory1.buildSchemaMap(), factory1.buildTableSegmentNameMap(), factory2.buildTableSegmentNameMap());
     server1.start();
     server2.start();
     // this doesn't test the QueryServer functionality so the server port can be the same as the mailbox port.
@@ -209,8 +209,8 @@ public class ResourceBasedQueriesTest extends QueryRunnerTestBase {
       List<Object[]> resultRows = queryRunner(sql);
 
       Assert.assertNull(except,
-        "Expected error with message '" + except + "'. But instead rows were returned: "
-            + resultRows.stream().map(Arrays::toString).collect(Collectors.joining(",\n")));
+          "Expected error with message '" + except + "'. But instead rows were returned: " + resultRows.stream()
+              .map(Arrays::toString).collect(Collectors.joining(",\n")));
 
       return Optional.of(resultRows);
     } catch (Exception e) {
@@ -219,8 +219,8 @@ public class ResourceBasedQueriesTest extends QueryRunnerTestBase {
       } else {
         Pattern pattern = Pattern.compile(except);
         Assert.assertTrue(pattern.matcher(e.getMessage()).matches(),
-            String.format("Caught exception '%s', but it did not match the expected pattern '%s'.",
-                e.getMessage(), except));
+            String.format("Caught exception '%s', but it did not match the expected pattern '%s'.", e.getMessage(),
+                except));
       }
     }
 
@@ -244,7 +244,7 @@ public class ResourceBasedQueriesTest extends QueryRunnerTestBase {
           continue;
         }
 
-        if (queryCase._outputs != null && !queryCase._outputs.isEmpty()) {
+        if (queryCase._outputs != null) {
           String sql = replaceTableName(testCaseName, queryCase._sql);
           List<List<Object>> orgRows = queryCase._outputs;
           List<Object[]> expectedRows = new ArrayList<>(orgRows.size());
@@ -275,7 +275,7 @@ public class ResourceBasedQueriesTest extends QueryRunnerTestBase {
         if (queryCase._ignored) {
           continue;
         }
-        if (queryCase._outputs == null || queryCase._outputs.isEmpty()) {
+        if (queryCase._outputs == null) {
           String sql = replaceTableName(testCaseName, queryCase._sql);
           Object[] testEntry = new Object[]{testCaseName, sql, queryCase._expectedException};
           providerContent.add(testEntry);
diff --git a/pinot-query-runtime/src/test/resources/queries/FromExpressions.json b/pinot-query-runtime/src/test/resources/queries/FromExpressions.json
index 94a82b8194..70375ec8a4 100644
--- a/pinot-query-runtime/src/test/resources/queries/FromExpressions.json
+++ b/pinot-query-runtime/src/test/resources/queries/FromExpressions.json
@@ -8,8 +8,19 @@
         ],
         "inputs": [
           [1, "a"],
+          [1, "a"],
+          [1, "b"],
+          [1, "xxx"],
           [2, "b"],
-          [3, "c"]
+          [3, "c"],
+          [3, "d"],
+          [4, "d"],
+          [6, "e"],
+          [7, "a"],
+          [7, "a"],
+          [7, "c"],
+          [8, "a"],
+          [8, "b"]
         ]
       },
       "tbl2" : {
@@ -19,8 +30,17 @@
         ],
         "inputs": [
           [1, "xxx"],
+          [1, "zzz"],
           [3, "yyy"],
-          [5, "zzz"]
+          [3, "c"],
+          [4, "d"],
+          [5, "zzz"],
+          [7, "a"],
+          [7, "a"],
+          [7, "c"],
+          [8, "a"],
+          [8, "a"],
+          [8, "c"]
         ]
       }
     },
@@ -50,6 +70,16 @@
         "description": "LEFT OUTER JOIN with non-join related clause",
         "sql": "SELECT * FROM {tbl1} LEFT OUTER JOIN {tbl2} ON {tbl1}.num = {tbl2}.num AND {tbl2}.value = 'xxx'"
       },
+      {
+        "psql": "7.2.1.1",
+        "description": "LEFT OUTER JOIN with inequality condition",
+        "sql": "SELECT {tbl1}.num, {tbl2}.num, {tbl1}.name, {tbl2}.value FROM {tbl2} LEFT OUTER JOIN {tbl1} ON {tbl1}.num = {tbl2}.num AND {tbl1}.name != {tbl2}.value"
+      },
+      {
+        "psql": "7.2.1.1",
+        "description": "LEFT OUTER JOIN with inequality condition",
+        "sql": "SELECT {tbl1}.num, {tbl2}.num, {tbl1}.name, {tbl2}.value FROM {tbl2} LEFT OUTER JOIN {tbl1} ON {tbl1}.name = {tbl2}.value AND {tbl1}.num > {tbl2}.num + 1"
+      },
       {
         "psql": "7.2.1.1",
         "description": "CROSS JOIN",
@@ -57,15 +87,29 @@
       },
       {
         "psql": "7.2.1.1",
-        "ignored": true,
+        "comments": "select all doesn't work because h2 output the rows in different order from Pinot and test framework doesn't consider the column order for rows and blindly assume they are in the same order",
         "description": "RIGHT OUTER JOIN",
-        "sql": "SELECT * FROM {tbl1} RIGHT OUTER JOIN {tbl2} ON {tbl1}.col1 = {tbl2}.col1"
+        "sql": "SELECT {tbl1}.num, {tbl1}.name, {tbl2}.num, {tbl2}.value FROM {tbl1} RIGHT OUTER JOIN {tbl2} ON {tbl1}.num = {tbl2}.num"
       },
       {
         "psql": "7.2.1.1",
-        "ignored": true,
-        "description": "FULL OUTER JOIN",
-        "sql": "SELECT * FROM {tbl1} FULL OUTER JOIN {tbl2} ON {tbl1}.col1 = {tbl2}.col1"
+        "description": "RIGHT OUTER JOIN output rows in different order than join order",
+        "sql": "SELECT {tbl2}.num, {tbl2}.value, {tbl1}.num, {tbl1}.name FROM {tbl1} RIGHT OUTER JOIN {tbl2} ON {tbl1}.num = {tbl2}.num"
+      },
+      {
+        "psql": "7.2.1.1",
+        "description": "RIGHT OUTER JOIN output rows in mixed table order",
+        "sql": "SELECT {tbl1}.num, {tbl2}.num, {tbl1}.name, {tbl2}.value FROM {tbl1} RIGHT OUTER JOIN {tbl2} ON {tbl1}.num = {tbl2}.num"
+      },
+      {
+        "psql": "7.2.1.1",
+        "description": "RIGHT OUTER JOIN with inequality condition",
+        "sql": "SELECT {tbl1}.num, {tbl2}.num, {tbl1}.name, {tbl2}.value FROM {tbl1} RIGHT OUTER JOIN {tbl2} ON {tbl1}.num = {tbl2}.num AND {tbl1}.name != {tbl2}.value"
+      },
+      {
+        "psql": "7.2.1.1",
+        "description": "RIGHT OUTER JOIN with inequality condition",
+        "sql": "SELECT {tbl1}.num, {tbl2}.num, {tbl1}.name, {tbl2}.value FROM {tbl1} RIGHT OUTER JOIN {tbl2} ON {tbl1}.name = {tbl2}.value AND {tbl1}.num > {tbl2}.num + 1"
       },
       {
         "psql": "7.2.1.1",
@@ -104,6 +148,142 @@
       }
     ]
   },
+  "basic_full_join_queries": {
+    "tables": {
+      "tbl1" : {
+        "schema": [
+          {"name": "num", "type": "INT"},
+          {"name": "name", "type": "STRING"}
+        ],
+        "inputs": [
+          [1, "a"],
+          [2, "b"],
+          [3, "c"],
+          [3, "yyy"],
+          [4, "e"],
+          [4, "e"],
+          [6, "e"],
+          [7, "d"],
+          [7, "f"],
+          [8, "z"]
+        ]
+      },
+      "tbl2" : {
+        "schema": [
+          {"name": "num", "type": "INT"},
+          {"name": "value", "type": "STRING"}
+        ],
+        "inputs": [
+          [1, "xxx"],
+          [1, "xxx"],
+          [3, "yyy"],
+          [3, "zzz"],
+          [5, "zzz"],
+          [6, "e"],
+          [7, "d"],
+          [8, "z"]
+        ]
+      },
+      "tbl_empty" : {
+        "schema": [
+          {"name": "strCol1", "type": "STRING"},
+          {"name": "intCol1", "type": "INT"},
+          {"name": "strCol2", "type": "STRING"}
+        ],
+        "inputs": [ ]
+      },
+      "tbl_empty2" : {
+        "schema": [
+          {"name": "strCol1", "type": "STRING"},
+          {"name": "intCol1", "type": "INT"},
+          {"name": "strCol2", "type": "STRING"}
+        ],
+        "inputs": [ ]
+      }
+    },
+    "queries": [
+      {
+        "psql": "7.2.1.1",
+        "description": "FULL OUTER JOIN",
+        "sql": "SELECT {tbl1}.num, {tbl1}.name, {tbl2}.num, {tbl2}.value FROM {tbl1} FULL JOIN {tbl2} ON {tbl1}.num = {tbl2}.num",
+        "outputs": [
+          [1, "a", 1, "xxx"],
+          [1, "a", 1, "xxx"],
+          [2, "b", null, null],
+          [3, "c", 3, "yyy"],
+          [3, "c", 3, "zzz"],
+          [3, "yyy", 3, "yyy"],
+          [3, "yyy", 3, "zzz"],
+          [4, "e", null, null],
+          [4, "e", null, null],
+          [null, null, 5, "zzz"],
+          [6, "e", 6, "e"],
+          [7, "d", 7, "d"],
+          [7, "f", 7, "d"],
+          [8, "z", 8, "z"]
+        ]
+      },
+      {
+        "psql": "7.2.1.1",
+        "description": "FULL OUTER JOIN with left table empty",
+        "sql": "SELECT {tbl_empty}.strCol1, {tbl_empty}.intCol1, {tbl_empty}.strCol2, {tbl2}.num, {tbl2}.value FROM {tbl_empty} FULL JOIN {tbl2} ON {tbl_empty}.intCol1 = {tbl2}.num",
+        "outputs": [
+          [null, null, null, 1, "xxx"],
+          [null, null, null, 1, "xxx"],
+          [null, null, null, 3, "yyy"],
+          [null, null, null, 3, "zzz"],
+          [null, null, null, 5, "zzz"],
+          [null, null, null, 6, "e"],
+          [null, null, null, 7, "d"],
+          [null, null, null, 8, "z"]
+        ]
+      },
+      {
+        "psql": "7.2.1.1",
+        "description": "FULL OUTER JOIN with right table empty",
+        "sql": "SELECT {tbl_empty}.strCol1, {tbl_empty}.intCol1, {tbl_empty}.strCol2, {tbl2}.num, {tbl2}.value FROM {tbl2} FULL JOIN {tbl_empty} ON {tbl_empty}.intCol1 = {tbl2}.num",
+        "outputs": [
+          [null, null, null, 1, "xxx"],
+          [null, null, null, 1, "xxx"],
+          [null, null, null, 3, "yyy"],
+          [null, null, null, 3, "zzz"],
+          [null, null, null, 5, "zzz"],
+          [null, null, null, 6, "e"],
+          [null, null, null, 7, "d"],
+          [null, null, null, 8, "z"]
+        ]
+      },
+      {
+        "psql": "7.2.1.1",
+        "description": "FULL OUTER JOIN with inequality condition",
+        "sql": "SELECT {tbl1}.num, {tbl2}.num, {tbl1}.name, {tbl2}.value FROM {tbl1} FULL OUTER JOIN {tbl2} ON {tbl1}.num = {tbl2}.num AND {tbl1}.name != {tbl2}.value",
+        "outputs": [
+          [1,  1, "a", "xxx"],
+          [1,  1, "a", "xxx"],
+          [2,  null, "b", null],
+          [3, 3, "c", "zzz"],
+          [3, 3, "c", "yyy"],
+          [3, 3, "yyy", "zzz"],
+          [4, null, "e", null],
+          [4, null, "e", null],
+          [6, null, "e", null],
+          [7, null, "d", null],
+          [7, 7, "f", "d"],
+          [8, null, "z", null],
+          [null, 8, null, "z"],
+          [null, 6, null, "e"],
+          [null, 5, null, "zzz"]
+        ]
+      },
+      {
+        "psql": "7.2.1.1",
+        "description": "FULL OUTER JOIN with both tables empty",
+        "sql": "SELECT {tbl_empty}.strCol1, {tbl_empty}.intCol1, {tbl_empty}.strCol2, {tbl_empty2}.strCol1, {tbl_empty2}.intCol1, {tbl_empty2}.strCol2 FROM {tbl_empty} FULL JOIN {tbl_empty2} ON {tbl_empty}.intCol1 = {tbl_empty2}.intCol1",
+        "outputs": [
+        ]
+      }
+    ]
+  },
   "extended_join_features": {
     "tables": {
       "tbl1" : {
@@ -244,6 +424,18 @@
       {
         "description": "join with an empty left table using LEFT OUTER",
         "sql": "SELECT * FROM {tbl_empty} LEFT JOIN {tbl1} ON {tbl1}.intCol1 = {tbl_empty}.intCol1"
+      },
+      {
+        "description": "join with an empty right table using RIGHT OUTER",
+        "sql": "SELECT {tbl1}.strCol1, {tbl1}.intCol1, {tbl1}.strCol2, {tbl_empty}.strCol1, {tbl_empty}.intCol1, {tbl_empty}.strCol2 FROM {tbl1} RIGHT JOIN {tbl_empty} ON {tbl1}.intCol1 = {tbl_empty}.intCol1"
+      },
+      {
+        "description": "join with an empty left table using RIGHT OUTER",
+        "sql": "SELECT {tbl1}.strCol1, {tbl1}.intCol1, {tbl1}.strCol2, {tbl_empty}.strCol1, {tbl_empty}.intCol1, {tbl_empty}.strCol2 FROM {tbl_empty} RIGHT JOIN {tbl1}  ON {tbl1}.intCol1 = {tbl_empty}.intCol1"
+      },
+      {
+        "description": "join with both left and right empty table using RIGHT OUTER",
+        "sql": "SELECT * FROM {tbl_empty} as a RIGHT JOIN {tbl_empty} as b ON a.intCol1 = b.intCol1"
       }
     ]
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org