You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by si...@apache.org on 2022/04/15 21:24:45 UTC

[pinot] branch multi_stage_query_engine updated: Use proto for query plan serialization (#8479)

This is an automated email from the ASF dual-hosted git repository.

siddteotia pushed a commit to branch multi_stage_query_engine
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/multi_stage_query_engine by this push:
     new 791248a318 Use proto for query plan serialization (#8479)
791248a318 is described below

commit 791248a3188d70bad6d3d69e4580dddbfe206c16
Author: Rong Rong <ro...@apache.org>
AuthorDate: Fri Apr 15 14:24:38 2022 -0700

    Use proto for query plan serialization (#8479)
    
    * serializable format
    
    * fix linter
    
    * bump version since feature branch based bumped
    
    * fix auto-rebase error
    
    * fix test async mock intercept issue
    
    * use reflection for ser/de
    
    * also fix test coverage on all node types
    
    * fix java8
    
    * fix JOINNode member type as well as plan.proto comments
    
    Co-authored-by: Rong Rong <ro...@startree.ai>
---
 .../pinot/common/config/provider/TableCache.java   |  10 -
 .../src/main/proto/{worker.proto => plan.proto}    |  66 ++++---
 pinot-common/src/main/proto/worker.proto           |   4 +-
 pinot-query-planner/pom.xml                        |   2 +-
 .../pinot/query/planner/RelToStageConverter.java   |   5 +-
 .../query/planner/nodes/AbstractStageNode.java     |  15 +-
 .../apache/pinot/query/planner/nodes/CalcNode.java |   7 +-
 .../apache/pinot/query/planner/nodes/JoinNode.java |  21 ++-
 .../query/planner/nodes/MailboxReceiveNode.java    |   8 +-
 .../pinot/query/planner/nodes/MailboxSendNode.java |   8 +-
 .../pinot/query/planner/nodes/SerDeUtils.java      |  65 +++++++
 .../pinot/query/planner/nodes/TableScanNode.java   |   8 +-
 .../ProtoSerializable.java}                        |  22 ++-
 .../nodes/serde/ProtoSerializationUtils.java       | 209 +++++++++++++++++++++
 .../partitioning/FieldSelectionKeySelector.java    |   7 +
 pinot-query-runtime/pom.xml                        |   2 +-
 .../runtime/plan/serde/QueryPlanSerDeUtils.java    |   6 +-
 .../runtime/plan/serde/StageNodeSerDeUtils.java    |  56 ------
 .../pinot/query/service/QueryServerTest.java       |  66 ++++++-
 19 files changed, 458 insertions(+), 129 deletions(-)

diff --git a/pinot-common/src/main/java/org/apache/pinot/common/config/provider/TableCache.java b/pinot-common/src/main/java/org/apache/pinot/common/config/provider/TableCache.java
index 8d3499c653..1f5d286608 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/config/provider/TableCache.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/config/provider/TableCache.java
@@ -217,16 +217,6 @@ public class TableCache implements PinotConfigProvider {
     }
   }
 
-  /**
-   * Return a map between lower-case table name and their canonicalized form. Key-value pair are only different in
-   * case-sensitive environment.
-   *
-   * @return the table name map.
-   */
-  public Map<String, String> getTableNameMap() {
-    return _tableNameMap;
-  }
-
   private void addTableConfigs(List<String> paths) {
     // Subscribe data changes before reading the data to avoid missing changes
     for (String path : paths) {
diff --git a/pinot-common/src/main/proto/worker.proto b/pinot-common/src/main/proto/plan.proto
similarity index 54%
copy from pinot-common/src/main/proto/worker.proto
copy to pinot-common/src/main/proto/plan.proto
index c64798fa63..47018197fc 100644
--- a/pinot-common/src/main/proto/worker.proto
+++ b/pinot-common/src/main/proto/plan.proto
@@ -39,36 +39,56 @@ syntax = "proto3";
 
 package org.apache.pinot.common.proto;
 
-service PinotQueryWorker {
-  // Dispatch a QueryRequest to a PinotQueryWorker
-  rpc Submit(QueryRequest) returns (QueryResponse);
+message StageNode {
+  int32 stageId = 1;
+  string nodeName = 2;
+  repeated StageNode inputs = 3;
+  ObjectField objectField = 4;
 }
 
-// QueryRequest is the dispatched content for a specific query stage on a specific worker.
-message QueryRequest {
-  map<string, string> metadata = 1;
-  StagePlan stagePlan = 2;
+// MemberVariableField defines the serialized format of the member variables of a class object.
+// MemberVariableField can be one of
+//   1. literal
+//   2. list
+//   3. map
+//   4. complex class object
+message MemberVariableField {
+  oneof member_variable_field {
+    LiteralField literalField = 1;
+    ListField listField = 2;
+    MapField mapField = 3;
+    ObjectField objectField = 4;
+  }
 }
 
-// QueryResponse is the dispatched response from worker, it doesn't contain actual data, only dispatch status.
-message QueryResponse {
-  map<string, string> metadata = 1;
-  bytes payload = 2;
+// ObjectField defines the serialized format of a complex class object.
+// it contains:
+//   1. its fully-qualified clazz name;
+//   2. its MemberVariableField map.
+message ObjectField {
+  string objectClassName = 1;
+  map<string, MemberVariableField> memberVariables = 2;
 }
 
-message StagePlan {
-  int32 stageId = 1;
-  string instanceId = 2;
-  bytes serializedStageRoot = 3;
-  map<int32, StageMetadata> stageMetadata = 4;
+// LiteralField defines the serialized format of a literal field.
+message LiteralField {
+  oneof literal_field {
+    bool boolField = 1;
+    int32 intField = 2;
+    int64 longField = 3;
+    double doubleField = 4;
+    string stringField = 5;
+  }
 }
 
-message StageMetadata {
-  repeated string instances = 1;
-  repeated string dataSources = 2;
-  map<string, SegmentList> instanceToSegmentList = 3;
+// ListField defines the serialized format of a list field.
+// The content of the list is a MemberVariableField.
+message ListField {
+  repeated MemberVariableField content = 1;
 }
 
-message SegmentList {
-  repeated string segments = 1;
-}
+// ListField defines the serialized format of a map field.
+// The key of the map is a string and the value of the map is a MemberVariableField.
+message MapField {
+  map<string, MemberVariableField> content = 1;
+}
\ No newline at end of file
diff --git a/pinot-common/src/main/proto/worker.proto b/pinot-common/src/main/proto/worker.proto
index c64798fa63..87aecc8391 100644
--- a/pinot-common/src/main/proto/worker.proto
+++ b/pinot-common/src/main/proto/worker.proto
@@ -39,6 +39,8 @@ syntax = "proto3";
 
 package org.apache.pinot.common.proto;
 
+import "plan.proto";
+
 service PinotQueryWorker {
   // Dispatch a QueryRequest to a PinotQueryWorker
   rpc Submit(QueryRequest) returns (QueryResponse);
@@ -59,7 +61,7 @@ message QueryResponse {
 message StagePlan {
   int32 stageId = 1;
   string instanceId = 2;
-  bytes serializedStageRoot = 3;
+  StageNode stageRoot = 3;
   map<int32, StageMetadata> stageMetadata = 4;
 }
 
diff --git a/pinot-query-planner/pom.xml b/pinot-query-planner/pom.xml
index 8d1af64a19..05b9461cdc 100644
--- a/pinot-query-planner/pom.xml
+++ b/pinot-query-planner/pom.xml
@@ -26,7 +26,7 @@
   <parent>
     <artifactId>pinot</artifactId>
     <groupId>org.apache.pinot</groupId>
-    <version>0.10.0-SNAPSHOT</version>
+    <version>0.11.0-SNAPSHOT</version>
   </parent>
   <artifactId>pinot-query-planner</artifactId>
   <name>Pinot Query Planner</name>
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/RelToStageConverter.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/RelToStageConverter.java
index e558694aab..572302ef92 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/RelToStageConverter.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/RelToStageConverter.java
@@ -37,7 +37,6 @@ import org.apache.pinot.query.planner.nodes.JoinNode;
 import org.apache.pinot.query.planner.nodes.StageNode;
 import org.apache.pinot.query.planner.nodes.TableScanNode;
 import org.apache.pinot.query.planner.partitioning.FieldSelectionKeySelector;
-import org.apache.pinot.query.planner.partitioning.KeySelector;
 
 
 /**
@@ -93,8 +92,8 @@ public final class RelToStageConverter {
     RelDataType rightRowType = node.getRight().getRowType();
     int leftOperandIndex = ((RexInputRef) joinCondition.getOperands().get(0)).getIndex();
     int rightOperandIndex = ((RexInputRef) joinCondition.getOperands().get(1)).getIndex();
-    KeySelector<Object[], Object> leftFieldSelectionKeySelector = new FieldSelectionKeySelector(leftOperandIndex);
-    KeySelector<Object[], Object> rightFieldSelectionKeySelector =
+    FieldSelectionKeySelector leftFieldSelectionKeySelector = new FieldSelectionKeySelector(leftOperandIndex);
+    FieldSelectionKeySelector rightFieldSelectionKeySelector =
           new FieldSelectionKeySelector(rightOperandIndex - leftRowType.getFieldNames().size());
     return new JoinNode(currentStageId, joinType, Collections.singletonList(new JoinNode.JoinClause(
         leftFieldSelectionKeySelector, rightFieldSelectionKeySelector)));
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/AbstractStageNode.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/AbstractStageNode.java
index b99075429a..ed1fc9ba3e 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/AbstractStageNode.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/AbstractStageNode.java
@@ -20,9 +20,12 @@ package org.apache.pinot.query.planner.nodes;
 
 import java.util.ArrayList;
 import java.util.List;
+import org.apache.pinot.common.proto.Plan;
+import org.apache.pinot.query.planner.nodes.serde.ProtoSerializable;
+import org.apache.pinot.query.planner.nodes.serde.ProtoSerializationUtils;
 
 
-public abstract class AbstractStageNode implements StageNode {
+public abstract class AbstractStageNode implements StageNode, ProtoSerializable {
 
   protected final int _stageId;
   protected final List<StageNode> _inputs;
@@ -46,4 +49,14 @@ public abstract class AbstractStageNode implements StageNode {
   public int getStageId() {
     return _stageId;
   }
+
+  @Override
+  public void setObjectField(Plan.ObjectField objectField) {
+    ProtoSerializationUtils.fromObjectField(this, objectField);
+  }
+
+  @Override
+  public Plan.ObjectField getObjectField() {
+    return ProtoSerializationUtils.toObjectField(this);
+  }
 }
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/CalcNode.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/CalcNode.java
index b188b8e2f7..0aa8c94ec8 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/CalcNode.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/CalcNode.java
@@ -19,8 +19,13 @@
 package org.apache.pinot.query.planner.nodes;
 
 
+
 public class CalcNode extends AbstractStageNode {
-  private final String _expression;
+  private String _expression;
+
+  public CalcNode(int stageId) {
+    super(stageId);
+  }
 
   public CalcNode(int stageId, String expression) {
     super(stageId);
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/JoinNode.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/JoinNode.java
index 94af122c74..bf380639d8 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/JoinNode.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/JoinNode.java
@@ -18,15 +18,19 @@
  */
 package org.apache.pinot.query.planner.nodes;
 
-import java.io.Serializable;
 import java.util.List;
 import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.pinot.query.planner.partitioning.FieldSelectionKeySelector;
 import org.apache.pinot.query.planner.partitioning.KeySelector;
 
 
 public class JoinNode extends AbstractStageNode {
-  private final JoinRelType _joinRelType;
-  private final List<JoinClause> _criteria;
+  private JoinRelType _joinRelType;
+  private List<JoinClause> _criteria;
+
+  public JoinNode(int stageId) {
+    super(stageId);
+  }
 
   public JoinNode(int stageId, JoinRelType joinRelType, List<JoinClause> criteria
   ) {
@@ -43,11 +47,14 @@ public class JoinNode extends AbstractStageNode {
     return _criteria;
   }
 
-  public static class JoinClause implements Serializable {
-    private final KeySelector<Object[], Object> _leftJoinKeySelector;
-    private final KeySelector<Object[], Object> _rightJoinKeySelector;
+  public static class JoinClause {
+    private KeySelector<Object[], Object> _leftJoinKeySelector;
+    private KeySelector<Object[], Object> _rightJoinKeySelector;
+
+    public JoinClause() {
+    }
 
-    public JoinClause(KeySelector<Object[], Object> leftKeySelector, KeySelector<Object[], Object> rightKeySelector) {
+    public JoinClause(FieldSelectionKeySelector leftKeySelector, FieldSelectionKeySelector rightKeySelector) {
       _leftJoinKeySelector = leftKeySelector;
       _rightJoinKeySelector = rightKeySelector;
     }
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/MailboxReceiveNode.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/MailboxReceiveNode.java
index d8269346f3..8f0c619b79 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/MailboxReceiveNode.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/MailboxReceiveNode.java
@@ -22,8 +22,12 @@ import org.apache.calcite.rel.RelDistribution;
 
 
 public class MailboxReceiveNode extends AbstractStageNode {
-  private final int _senderStageId;
-  private final RelDistribution.Type _exchangeType;
+  private int _senderStageId;
+  private RelDistribution.Type _exchangeType;
+
+  public MailboxReceiveNode(int stageId) {
+    super(stageId);
+  }
 
   public MailboxReceiveNode(int stageId, int senderStageId, RelDistribution.Type exchangeType) {
     super(stageId);
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/MailboxSendNode.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/MailboxSendNode.java
index ea39ad3493..9867a16f61 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/MailboxSendNode.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/MailboxSendNode.java
@@ -22,8 +22,12 @@ import org.apache.calcite.rel.RelDistribution;
 
 
 public class MailboxSendNode extends AbstractStageNode {
-  private final int _receiverStageId;
-  private final RelDistribution.Type _exchangeType;
+  private int _receiverStageId;
+  private RelDistribution.Type _exchangeType;
+
+  public MailboxSendNode(int stageId) {
+    super(stageId);
+  }
 
   public MailboxSendNode(int stageId, int receiverStageId, RelDistribution.Type exchangeType) {
     super(stageId);
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/SerDeUtils.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/SerDeUtils.java
new file mode 100644
index 0000000000..ad7184cdb1
--- /dev/null
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/SerDeUtils.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner.nodes;
+
+import org.apache.pinot.common.proto.Plan;
+
+
+public final class SerDeUtils {
+  private SerDeUtils() {
+    // do not instantiate.
+  }
+
+  public static AbstractStageNode deserializeStageNode(Plan.StageNode protoNode) {
+    AbstractStageNode stageNode = newNodeInstance(protoNode.getNodeName(), protoNode.getStageId());
+    stageNode.setObjectField(protoNode.getObjectField());
+    for (Plan.StageNode protoChild : protoNode.getInputsList()) {
+      stageNode.addInput(deserializeStageNode(protoChild));
+    }
+    return stageNode;
+  }
+
+  public static Plan.StageNode serializeStageNode(AbstractStageNode stageNode) {
+    Plan.StageNode.Builder builder = Plan.StageNode.newBuilder()
+        .setStageId(stageNode.getStageId())
+        .setNodeName(stageNode.getClass().getSimpleName())
+        .setObjectField(stageNode.getObjectField());
+    for (StageNode childNode : stageNode.getInputs()) {
+      builder.addInputs(serializeStageNode((AbstractStageNode) childNode));
+    }
+    return builder.build();
+  }
+
+  private static AbstractStageNode newNodeInstance(String nodeName, int stageId) {
+    switch (nodeName) {
+      case "TableScanNode":
+        return new TableScanNode(stageId);
+      case "JoinNode":
+        return new JoinNode(stageId);
+      case "CalcNode":
+        return new CalcNode(stageId);
+      case "MailboxSendNode":
+        return new MailboxSendNode(stageId);
+      case "MailboxReceiveNode":
+        return new MailboxReceiveNode(stageId);
+      default:
+        throw new IllegalArgumentException("Unknown node name: " + nodeName);
+    }
+  }
+}
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/TableScanNode.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/TableScanNode.java
index 8d78ec6d0f..9375a7e986 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/TableScanNode.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/TableScanNode.java
@@ -22,8 +22,12 @@ import java.util.List;
 
 
 public class TableScanNode extends AbstractStageNode {
-  private final String _tableName;
-  private final List<String> _tableScanColumns;
+  private String _tableName;
+  private List<String> _tableScanColumns;
+
+  public TableScanNode(int stageId) {
+    super(stageId);
+  }
 
   public TableScanNode(int stageId, String tableName, List<String> tableScanColumns) {
     super(stageId);
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/CalcNode.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/serde/ProtoSerializable.java
similarity index 69%
copy from pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/CalcNode.java
copy to pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/serde/ProtoSerializable.java
index b188b8e2f7..2b99003e87 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/CalcNode.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/serde/ProtoSerializable.java
@@ -16,18 +16,20 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.query.planner.nodes;
+/**
+ * Autogenerated by Thrift Compiler (0.13.0)
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ *  @generated
+ */
+package org.apache.pinot.query.planner.nodes.serde;
+
+import org.apache.pinot.common.proto.Plan;
 
 
-public class CalcNode extends AbstractStageNode {
-  private final String _expression;
+public interface ProtoSerializable {
 
-  public CalcNode(int stageId, String expression) {
-    super(stageId);
-    _expression = expression;
-  }
+  void setObjectField(Plan.ObjectField objFields);
 
-  public String getExpression() {
-    return _expression;
-  }
+  Plan.ObjectField getObjectField();
 }
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/serde/ProtoSerializationUtils.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/serde/ProtoSerializationUtils.java
new file mode 100644
index 0000000000..c30295a101
--- /dev/null
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/serde/ProtoSerializationUtils.java
@@ -0,0 +1,209 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner.nodes.serde;
+
+import com.google.common.base.Preconditions;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.pinot.common.proto.Plan;
+
+
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class ProtoSerializationUtils {
+  private static final String ENUM_VALUE_KEY = "ENUM_VALUE_KEY";
+
+  private ProtoSerializationUtils() {
+    // do not instantiate.
+  }
+
+  public static void fromObjectField(Object object, Plan.ObjectField objectField) {
+    Map<String, Plan.MemberVariableField> memberVariablesMap = objectField.getMemberVariablesMap();
+    try {
+      for (Map.Entry<String, Plan.MemberVariableField> e : memberVariablesMap.entrySet()) {
+        Object memberVarObject = constructMemberVariable(e.getValue());
+        if (memberVarObject != null) {
+          Field declaredField = object.getClass().getDeclaredField(e.getKey());
+          declaredField.setAccessible(true);
+          declaredField.set(object, memberVarObject);
+        }
+      }
+    } catch (NoSuchFieldException | IllegalAccessException e) {
+      throw new IllegalStateException("Unable to set Object field for: " + objectField.getObjectClassName(), e);
+    }
+  }
+
+  public static Plan.ObjectField toObjectField(Object object) {
+    Plan.ObjectField.Builder builder = Plan.ObjectField.newBuilder();
+    builder.setObjectClassName(object.getClass().getName());
+    // special handling for enum
+    if (object instanceof Enum) {
+      builder.putMemberVariables(ENUM_VALUE_KEY, serializeMemberVariable(((Enum) object).name()));
+    } else {
+      try {
+        for (Field field : object.getClass().getDeclaredFields()) {
+          field.setAccessible(true);
+          Object fieldObject = field.get(object);
+          builder.putMemberVariables(field.getName(), serializeMemberVariable(fieldObject));
+        }
+      } catch (IllegalAccessException e) {
+        throw new IllegalStateException("Unable to serialize Object: " + object.getClass(), e);
+      }
+    }
+    return builder.build();
+  }
+
+  // --------------------------------------------------------------------------
+  // Serialize Utils
+  // --------------------------------------------------------------------------
+
+  private static Plan.LiteralField boolField(boolean val) {
+    return Plan.LiteralField.newBuilder().setBoolField(val).build();
+  }
+
+  private static Plan.LiteralField intField(int val) {
+    return Plan.LiteralField.newBuilder().setIntField(val).build();
+  }
+
+  private static Plan.LiteralField longField(long val) {
+    return Plan.LiteralField.newBuilder().setLongField(val).build();
+  }
+
+  private static Plan.LiteralField doubleField(double val) {
+    return Plan.LiteralField.newBuilder().setDoubleField(val).build();
+  }
+
+  private static Plan.LiteralField stringField(String val) {
+    return Plan.LiteralField.newBuilder().setStringField(val).build();
+  }
+
+  private static Plan.MemberVariableField serializeMemberVariable(Object fieldObject) {
+    Plan.MemberVariableField.Builder builder = Plan.MemberVariableField.newBuilder();
+    if (fieldObject instanceof Boolean) {
+      builder.setLiteralField(boolField((Boolean) fieldObject));
+    } else if (fieldObject instanceof Integer) {
+      builder.setLiteralField(intField((Integer) fieldObject));
+    } else if (fieldObject instanceof Long) {
+      builder.setLiteralField(longField((Long) fieldObject));
+    } else if (fieldObject instanceof Double) {
+      builder.setLiteralField(doubleField((Double) fieldObject));
+    } else if (fieldObject instanceof String) {
+      builder.setLiteralField(stringField((String) fieldObject));
+    } else if (fieldObject instanceof List) {
+      builder.setListField(serializeListMemberVariable(fieldObject));
+    } else if (fieldObject instanceof Map) {
+      builder.setMapField(serializeMapMemberVariable(fieldObject));
+    } else {
+      builder.setObjectField(toObjectField(fieldObject));
+    }
+    return builder.build();
+  }
+
+  private static Plan.ListField serializeListMemberVariable(Object fieldObject) {
+    Preconditions.checkState(fieldObject instanceof List);
+    Plan.ListField.Builder builder = Plan.ListField.newBuilder();
+    for (Object e : (List) fieldObject) {
+      builder.addContent(serializeMemberVariable(e));
+    }
+    return builder.build();
+  }
+
+  private static Plan.MapField serializeMapMemberVariable(Object fieldObject) {
+    Preconditions.checkState(fieldObject instanceof Map);
+    Plan.MapField.Builder builder = Plan.MapField.newBuilder();
+    Set<Map.Entry<String, Object>> entrySet = ((Map) fieldObject).entrySet();
+    for (Map.Entry<String, Object> e : entrySet) {
+      builder.putContent(e.getKey(), serializeMemberVariable(e.getValue()));
+    }
+    return builder.build();
+  }
+
+  // --------------------------------------------------------------------------
+  // Deserialize Utils
+  // --------------------------------------------------------------------------
+
+  private static Object constructMemberVariable(Plan.MemberVariableField memberVariableField) {
+    switch (memberVariableField.getMemberVariableFieldCase()) {
+      case LITERALFIELD:
+        return constructLiteral(memberVariableField.getLiteralField());
+      case LISTFIELD:
+        return constructList(memberVariableField.getListField());
+      case MAPFIELD:
+        return constructMap(memberVariableField.getMapField());
+      case OBJECTFIELD:
+        return constructObject(memberVariableField.getObjectField());
+      case MEMBERVARIABLEFIELD_NOT_SET:
+      default:
+        return null;
+    }
+  }
+
+  private static Object constructLiteral(Plan.LiteralField literalField) {
+    switch (literalField.getLiteralFieldCase()) {
+      case BOOLFIELD:
+        return literalField.getBoolField();
+      case INTFIELD:
+        return literalField.getIntField();
+      case LONGFIELD:
+        return literalField.getLongField();
+      case DOUBLEFIELD:
+        return literalField.getDoubleField();
+      case STRINGFIELD:
+        return literalField.getStringField();
+      case LITERALFIELD_NOT_SET:
+      default:
+        return null;
+    }
+  }
+
+  private static List constructList(Plan.ListField listField) {
+    List list = new ArrayList();
+    for (Plan.MemberVariableField e : listField.getContentList()) {
+      list.add(constructMemberVariable(e));
+    }
+    return list;
+  }
+
+  private static Object constructMap(Plan.MapField mapField) {
+    Map map = new HashMap();
+    for (Map.Entry<String, Plan.MemberVariableField> e : mapField.getContentMap().entrySet()) {
+      map.put(e.getKey(), constructMemberVariable(e.getValue()));
+    }
+    return map;
+  }
+
+  private static Object constructObject(Plan.ObjectField objectField) {
+    try {
+      Class<?> clazz = Class.forName(objectField.getObjectClassName());
+      if (clazz.isEnum()) {
+        return Enum.valueOf((Class<Enum>) clazz,
+            objectField.getMemberVariablesOrDefault(ENUM_VALUE_KEY, null).getLiteralField().getStringField());
+      } else {
+        Object obj = clazz.newInstance();
+        fromObjectField(obj, objectField);
+        return obj;
+      }
+    } catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) {
+      throw new IllegalStateException("Unable to create Object of type: " + objectField.getObjectClassName(), e);
+    }
+  }
+}
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/FieldSelectionKeySelector.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/FieldSelectionKeySelector.java
index 0b846e555c..95991d558b 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/FieldSelectionKeySelector.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/FieldSelectionKeySelector.java
@@ -26,10 +26,17 @@ public class FieldSelectionKeySelector implements KeySelector<Object[], Object>
 
   private int _columnIndex;
 
+  public FieldSelectionKeySelector() {
+  }
+
   public FieldSelectionKeySelector(int columnIndex) {
     _columnIndex = columnIndex;
   }
 
+  public int getColumnIndex() {
+    return _columnIndex;
+  }
+
   @Override
   public Object getKey(Object[] input) {
     return input[_columnIndex];
diff --git a/pinot-query-runtime/pom.xml b/pinot-query-runtime/pom.xml
index 3607e51e31..03fa270ea4 100644
--- a/pinot-query-runtime/pom.xml
+++ b/pinot-query-runtime/pom.xml
@@ -26,7 +26,7 @@
   <parent>
     <artifactId>pinot</artifactId>
     <groupId>org.apache.pinot</groupId>
-    <version>0.10.0-SNAPSHOT</version>
+    <version>0.11.0-SNAPSHOT</version>
   </parent>
   <artifactId>pinot-query-runtime</artifactId>
   <name>Pinot Query Runtime</name>
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/serde/QueryPlanSerDeUtils.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/serde/QueryPlanSerDeUtils.java
index 1b4ecbbdc0..358ebb8465 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/serde/QueryPlanSerDeUtils.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/serde/QueryPlanSerDeUtils.java
@@ -25,6 +25,8 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.pinot.common.proto.Worker;
 import org.apache.pinot.core.transport.ServerInstance;
 import org.apache.pinot.query.planner.StageMetadata;
+import org.apache.pinot.query.planner.nodes.AbstractStageNode;
+import org.apache.pinot.query.planner.nodes.SerDeUtils;
 import org.apache.pinot.query.routing.WorkerInstance;
 import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
 
@@ -41,7 +43,7 @@ public class QueryPlanSerDeUtils {
   public static DistributedStagePlan deserialize(Worker.StagePlan stagePlan) {
     DistributedStagePlan distributedStagePlan = new DistributedStagePlan(stagePlan.getStageId());
     distributedStagePlan.setServerInstance(stringToInstance(stagePlan.getInstanceId()));
-    distributedStagePlan.setStageRoot(StageNodeSerDeUtils.deserializeStageRoot(stagePlan.getSerializedStageRoot()));
+    distributedStagePlan.setStageRoot(SerDeUtils.deserializeStageNode(stagePlan.getStageRoot()));
     Map<Integer, Worker.StageMetadata> metadataMap = stagePlan.getStageMetadataMap();
     distributedStagePlan.getMetadataMap().putAll(protoMapToStageMetadataMap(metadataMap));
     return distributedStagePlan;
@@ -51,7 +53,7 @@ public class QueryPlanSerDeUtils {
     return Worker.StagePlan.newBuilder()
         .setStageId(distributedStagePlan.getStageId())
         .setInstanceId(instanceToString(distributedStagePlan.getServerInstance()))
-        .setSerializedStageRoot(StageNodeSerDeUtils.serializeStageRoot(distributedStagePlan.getStageRoot()))
+        .setStageRoot(SerDeUtils.serializeStageNode((AbstractStageNode) distributedStagePlan.getStageRoot()))
         .putAllStageMetadata(stageMetadataMapToProtoMap(distributedStagePlan.getMetadataMap())).build();
   }
 
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/serde/StageNodeSerDeUtils.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/serde/StageNodeSerDeUtils.java
deleted file mode 100644
index 80370128cf..0000000000
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/serde/StageNodeSerDeUtils.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.query.runtime.plan.serde;
-
-import com.google.common.base.Preconditions;
-import com.google.protobuf.ByteString;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import org.apache.pinot.query.planner.nodes.StageNode;
-
-
-public class StageNodeSerDeUtils {
-  private StageNodeSerDeUtils() {
-    // do not instantiate.
-  }
-
-  public static StageNode deserializeStageRoot(ByteString serializeStagePlan) {
-    try (ByteArrayInputStream bs = new ByteArrayInputStream(serializeStagePlan.toByteArray());
-        ObjectInputStream is = new ObjectInputStream(bs)) {
-      Object o = is.readObject();
-      Preconditions.checkState(o instanceof StageNode, "invalid worker query request object");
-      return (StageNode) o;
-    } catch (Exception e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  public static ByteString serializeStageRoot(StageNode stageRoot) {
-    try (ByteArrayOutputStream bs = new ByteArrayOutputStream();
-        ObjectOutputStream os = new ObjectOutputStream(bs)) {
-      os.writeObject(stageRoot);
-      return ByteString.copyFrom(bs.toByteArray());
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-  }
-}
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryServerTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryServerTest.java
index 539d5d9a53..eeed6d20d9 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryServerTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryServerTest.java
@@ -20,29 +20,37 @@ package org.apache.pinot.query.service;
 
 import com.google.common.collect.Lists;
 import io.grpc.ManagedChannelBuilder;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ExecutorService;
 import org.apache.pinot.common.proto.PinotQueryWorkerGrpc;
 import org.apache.pinot.common.proto.Worker;
 import org.apache.pinot.core.transport.ServerInstance;
 import org.apache.pinot.query.QueryEnvironment;
 import org.apache.pinot.query.QueryEnvironmentTestUtils;
 import org.apache.pinot.query.planner.QueryPlan;
+import org.apache.pinot.query.planner.StageMetadata;
+import org.apache.pinot.query.planner.nodes.StageNode;
 import org.apache.pinot.query.routing.WorkerInstance;
 import org.apache.pinot.query.runtime.QueryRunner;
 import org.apache.pinot.query.runtime.plan.serde.QueryPlanSerDeUtils;
+import org.apache.pinot.util.TestUtils;
 import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
+import static org.mockito.ArgumentMatchers.any;
+
 
 public class QueryServerTest {
   private static final int QUERY_SERVER_COUNT = 2;
-  private Map<Integer, QueryServer> _queryServerMap = new HashMap<>();
-  private Map<Integer, ServerInstance> _queryServerInstanceMap = new HashMap<>();
+  private final Map<Integer, QueryServer> _queryServerMap = new HashMap<>();
+  private final Map<Integer, ServerInstance> _queryServerInstanceMap = new HashMap<>();
+  private final Map<Integer, QueryRunner> _queryRunnerMap = new HashMap<>();
 
   private QueryEnvironment _queryEnvironment;
 
@@ -52,9 +60,11 @@ public class QueryServerTest {
 
     for (int i = 0; i < QUERY_SERVER_COUNT; i++) {
       int availablePort = QueryEnvironmentTestUtils.getAvailablePort();
-      QueryServer queryServer = new QueryServer(availablePort, Mockito.mock(QueryRunner.class));
+      QueryRunner queryRunner = Mockito.mock(QueryRunner.class);
+      QueryServer queryServer = new QueryServer(availablePort, queryRunner);
       queryServer.start();
       _queryServerMap.put(availablePort, queryServer);
+      _queryRunnerMap.put(availablePort, queryRunner);
       // this only test the QueryServer functionality so the server port can be the same as the mailbox port.
       // this is only use for test identifier purpose.
       _queryServerInstanceMap.put(availablePort, new WorkerInstance("localhost", availablePort, availablePort));
@@ -73,17 +83,59 @@ public class QueryServerTest {
     }
   }
 
+  @SuppressWarnings("unchecked")
   @Test
   public void testWorkerAcceptsWorkerRequestCorrect()
       throws Exception {
     QueryPlan queryPlan = _queryEnvironment.planQuery("SELECT * FROM a JOIN b ON a.col1 = b.col2");
 
-    int singleServerStageId = QueryEnvironmentTestUtils.getTestStageByServerCount(queryPlan, 1);
+    for (int stageId : queryPlan.getStageMetadataMap().keySet()) {
+      if (stageId > 0) { // we do not test reduce stage.
+        Worker.QueryRequest queryRequest = getQueryRequest(queryPlan, stageId);
+
+        // submit the request for testing.
+        submitRequest(queryRequest);
+
+        StageMetadata stageMetadata = queryPlan.getStageMetadataMap().get(stageId);
+
+        // ensure mock query runner received correctly deserialized payload.
+        // since submitRequest is async, we need to wait for the mockRunner to receive the query payload.
+        QueryRunner mockRunner = _queryRunnerMap.get(stageMetadata.getServerInstances().get(0).getPort());
+        TestUtils.waitForCondition(aVoid -> {
+          try {
+            Mockito.verify(mockRunner).processQuery(Mockito.argThat(distributedStagePlan -> {
+              StageNode stageNode = queryPlan.getQueryStageMap().get(stageId);
+              return isStageNodesEqual(stageNode, distributedStagePlan.getStageRoot()) && isMetadataMapsEqual(
+                  stageMetadata, distributedStagePlan.getMetadataMap().get(stageId));
+            }), any(ExecutorService.class), any(Map.class));
+            return true;
+          } catch (Throwable t) {
+            return false;
+          }
+        }, 1000L, "Error verifying mock QueryRunner intercepted query payload!");
+      }
+    }
+  }
 
-    Worker.QueryRequest queryRequest = getQueryRequest(queryPlan, singleServerStageId);
+  private static boolean isMetadataMapsEqual(StageMetadata left, StageMetadata right) {
+    return left.getServerInstances().equals(right.getServerInstances())
+        && left.getServerInstanceToSegmentsMap().equals(right.getServerInstanceToSegmentsMap())
+        && left.getScannedTables().equals(right.getScannedTables());
+  }
 
-    // submit the request for testing.
-    submitRequest(queryRequest);
+  private static boolean isStageNodesEqual(StageNode left, StageNode right) {
+    if (left.getStageId() != right.getStageId() || left.getClass() != right.getClass()
+        || left.getInputs().size() != right.getInputs().size()) {
+      return false;
+    }
+    left.getInputs().sort(Comparator.comparingInt(StageNode::getStageId));
+    right.getInputs().sort(Comparator.comparingInt(StageNode::getStageId));
+    for (int i = 0; i < left.getInputs().size(); i++) {
+      if (!isStageNodesEqual(left.getInputs().get(i), right.getInputs().get(i))) {
+        return false;
+      }
+    }
+    return true;
   }
 
   private void submitRequest(Worker.QueryRequest queryRequest) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org