You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by si...@apache.org on 2022/04/15 21:24:45 UTC
[pinot] branch multi_stage_query_engine updated: Use proto for query plan serialization (#8479)
This is an automated email from the ASF dual-hosted git repository.
siddteotia pushed a commit to branch multi_stage_query_engine
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/multi_stage_query_engine by this push:
new 791248a318 Use proto for query plan serialization (#8479)
791248a318 is described below
commit 791248a3188d70bad6d3d69e4580dddbfe206c16
Author: Rong Rong <ro...@apache.org>
AuthorDate: Fri Apr 15 14:24:38 2022 -0700
Use proto for query plan serialization (#8479)
* serializable format
* fix linter
* bump version since feature branch based bumped
* fix auto-rebase error
* fix test async mock intercept issue
* use reflection for ser/de
* also fix test coverage on all node types
* fix java8
* fix JOINNode member type as well as plan.proto comments
Co-authored-by: Rong Rong <ro...@startree.ai>
---
.../pinot/common/config/provider/TableCache.java | 10 -
.../src/main/proto/{worker.proto => plan.proto} | 66 ++++---
pinot-common/src/main/proto/worker.proto | 4 +-
pinot-query-planner/pom.xml | 2 +-
.../pinot/query/planner/RelToStageConverter.java | 5 +-
.../query/planner/nodes/AbstractStageNode.java | 15 +-
.../apache/pinot/query/planner/nodes/CalcNode.java | 7 +-
.../apache/pinot/query/planner/nodes/JoinNode.java | 21 ++-
.../query/planner/nodes/MailboxReceiveNode.java | 8 +-
.../pinot/query/planner/nodes/MailboxSendNode.java | 8 +-
.../pinot/query/planner/nodes/SerDeUtils.java | 65 +++++++
.../pinot/query/planner/nodes/TableScanNode.java | 8 +-
.../ProtoSerializable.java} | 22 ++-
.../nodes/serde/ProtoSerializationUtils.java | 209 +++++++++++++++++++++
.../partitioning/FieldSelectionKeySelector.java | 7 +
pinot-query-runtime/pom.xml | 2 +-
.../runtime/plan/serde/QueryPlanSerDeUtils.java | 6 +-
.../runtime/plan/serde/StageNodeSerDeUtils.java | 56 ------
.../pinot/query/service/QueryServerTest.java | 66 ++++++-
19 files changed, 458 insertions(+), 129 deletions(-)
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/config/provider/TableCache.java b/pinot-common/src/main/java/org/apache/pinot/common/config/provider/TableCache.java
index 8d3499c653..1f5d286608 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/config/provider/TableCache.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/config/provider/TableCache.java
@@ -217,16 +217,6 @@ public class TableCache implements PinotConfigProvider {
}
}
- /**
- * Return a map between lower-case table name and their canonicalized form. Key-value pair are only different in
- * case-sensitive environment.
- *
- * @return the table name map.
- */
- public Map<String, String> getTableNameMap() {
- return _tableNameMap;
- }
-
private void addTableConfigs(List<String> paths) {
// Subscribe data changes before reading the data to avoid missing changes
for (String path : paths) {
diff --git a/pinot-common/src/main/proto/worker.proto b/pinot-common/src/main/proto/plan.proto
similarity index 54%
copy from pinot-common/src/main/proto/worker.proto
copy to pinot-common/src/main/proto/plan.proto
index c64798fa63..47018197fc 100644
--- a/pinot-common/src/main/proto/worker.proto
+++ b/pinot-common/src/main/proto/plan.proto
@@ -39,36 +39,56 @@ syntax = "proto3";
package org.apache.pinot.common.proto;
-service PinotQueryWorker {
- // Dispatch a QueryRequest to a PinotQueryWorker
- rpc Submit(QueryRequest) returns (QueryResponse);
+message StageNode {
+ int32 stageId = 1;
+ string nodeName = 2;
+ repeated StageNode inputs = 3;
+ ObjectField objectField = 4;
}
-// QueryRequest is the dispatched content for a specific query stage on a specific worker.
-message QueryRequest {
- map<string, string> metadata = 1;
- StagePlan stagePlan = 2;
+// MemberVariableField defines the serialized format of the member variables of a class object.
+// MemberVariableField can be one of
+// 1. literal
+// 2. list
+// 3. map
+// 4. complex class object
+message MemberVariableField {
+ oneof member_variable_field {
+ LiteralField literalField = 1;
+ ListField listField = 2;
+ MapField mapField = 3;
+ ObjectField objectField = 4;
+ }
}
-// QueryResponse is the dispatched response from worker, it doesn't contain actual data, only dispatch status.
-message QueryResponse {
- map<string, string> metadata = 1;
- bytes payload = 2;
+// ObjectField defines the serialized format of a complex class object.
+// it contains:
+// 1. its fully-qualified clazz name;
+// 2. its MemberVariableField map.
+message ObjectField {
+ string objectClassName = 1;
+ map<string, MemberVariableField> memberVariables = 2;
}
-message StagePlan {
- int32 stageId = 1;
- string instanceId = 2;
- bytes serializedStageRoot = 3;
- map<int32, StageMetadata> stageMetadata = 4;
+// LiteralField defines the serialized format of a literal field.
+message LiteralField {
+ oneof literal_field {
+ bool boolField = 1;
+ int32 intField = 2;
+ int64 longField = 3;
+ double doubleField = 4;
+ string stringField = 5;
+ }
}
-message StageMetadata {
- repeated string instances = 1;
- repeated string dataSources = 2;
- map<string, SegmentList> instanceToSegmentList = 3;
+// ListField defines the serialized format of a list field.
+// The content of the list is a MemberVariableField.
+message ListField {
+ repeated MemberVariableField content = 1;
}
-message SegmentList {
- repeated string segments = 1;
-}
+// ListField defines the serialized format of a map field.
+// The key of the map is a string and the value of the map is a MemberVariableField.
+message MapField {
+ map<string, MemberVariableField> content = 1;
+}
\ No newline at end of file
diff --git a/pinot-common/src/main/proto/worker.proto b/pinot-common/src/main/proto/worker.proto
index c64798fa63..87aecc8391 100644
--- a/pinot-common/src/main/proto/worker.proto
+++ b/pinot-common/src/main/proto/worker.proto
@@ -39,6 +39,8 @@ syntax = "proto3";
package org.apache.pinot.common.proto;
+import "plan.proto";
+
service PinotQueryWorker {
// Dispatch a QueryRequest to a PinotQueryWorker
rpc Submit(QueryRequest) returns (QueryResponse);
@@ -59,7 +61,7 @@ message QueryResponse {
message StagePlan {
int32 stageId = 1;
string instanceId = 2;
- bytes serializedStageRoot = 3;
+ StageNode stageRoot = 3;
map<int32, StageMetadata> stageMetadata = 4;
}
diff --git a/pinot-query-planner/pom.xml b/pinot-query-planner/pom.xml
index 8d1af64a19..05b9461cdc 100644
--- a/pinot-query-planner/pom.xml
+++ b/pinot-query-planner/pom.xml
@@ -26,7 +26,7 @@
<parent>
<artifactId>pinot</artifactId>
<groupId>org.apache.pinot</groupId>
- <version>0.10.0-SNAPSHOT</version>
+ <version>0.11.0-SNAPSHOT</version>
</parent>
<artifactId>pinot-query-planner</artifactId>
<name>Pinot Query Planner</name>
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/RelToStageConverter.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/RelToStageConverter.java
index e558694aab..572302ef92 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/RelToStageConverter.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/RelToStageConverter.java
@@ -37,7 +37,6 @@ import org.apache.pinot.query.planner.nodes.JoinNode;
import org.apache.pinot.query.planner.nodes.StageNode;
import org.apache.pinot.query.planner.nodes.TableScanNode;
import org.apache.pinot.query.planner.partitioning.FieldSelectionKeySelector;
-import org.apache.pinot.query.planner.partitioning.KeySelector;
/**
@@ -93,8 +92,8 @@ public final class RelToStageConverter {
RelDataType rightRowType = node.getRight().getRowType();
int leftOperandIndex = ((RexInputRef) joinCondition.getOperands().get(0)).getIndex();
int rightOperandIndex = ((RexInputRef) joinCondition.getOperands().get(1)).getIndex();
- KeySelector<Object[], Object> leftFieldSelectionKeySelector = new FieldSelectionKeySelector(leftOperandIndex);
- KeySelector<Object[], Object> rightFieldSelectionKeySelector =
+ FieldSelectionKeySelector leftFieldSelectionKeySelector = new FieldSelectionKeySelector(leftOperandIndex);
+ FieldSelectionKeySelector rightFieldSelectionKeySelector =
new FieldSelectionKeySelector(rightOperandIndex - leftRowType.getFieldNames().size());
return new JoinNode(currentStageId, joinType, Collections.singletonList(new JoinNode.JoinClause(
leftFieldSelectionKeySelector, rightFieldSelectionKeySelector)));
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/AbstractStageNode.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/AbstractStageNode.java
index b99075429a..ed1fc9ba3e 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/AbstractStageNode.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/AbstractStageNode.java
@@ -20,9 +20,12 @@ package org.apache.pinot.query.planner.nodes;
import java.util.ArrayList;
import java.util.List;
+import org.apache.pinot.common.proto.Plan;
+import org.apache.pinot.query.planner.nodes.serde.ProtoSerializable;
+import org.apache.pinot.query.planner.nodes.serde.ProtoSerializationUtils;
-public abstract class AbstractStageNode implements StageNode {
+public abstract class AbstractStageNode implements StageNode, ProtoSerializable {
protected final int _stageId;
protected final List<StageNode> _inputs;
@@ -46,4 +49,14 @@ public abstract class AbstractStageNode implements StageNode {
public int getStageId() {
return _stageId;
}
+
+ @Override
+ public void setObjectField(Plan.ObjectField objectField) {
+ ProtoSerializationUtils.fromObjectField(this, objectField);
+ }
+
+ @Override
+ public Plan.ObjectField getObjectField() {
+ return ProtoSerializationUtils.toObjectField(this);
+ }
}
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/CalcNode.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/CalcNode.java
index b188b8e2f7..0aa8c94ec8 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/CalcNode.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/CalcNode.java
@@ -19,8 +19,13 @@
package org.apache.pinot.query.planner.nodes;
+
public class CalcNode extends AbstractStageNode {
- private final String _expression;
+ private String _expression;
+
+ public CalcNode(int stageId) {
+ super(stageId);
+ }
public CalcNode(int stageId, String expression) {
super(stageId);
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/JoinNode.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/JoinNode.java
index 94af122c74..bf380639d8 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/JoinNode.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/JoinNode.java
@@ -18,15 +18,19 @@
*/
package org.apache.pinot.query.planner.nodes;
-import java.io.Serializable;
import java.util.List;
import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.pinot.query.planner.partitioning.FieldSelectionKeySelector;
import org.apache.pinot.query.planner.partitioning.KeySelector;
public class JoinNode extends AbstractStageNode {
- private final JoinRelType _joinRelType;
- private final List<JoinClause> _criteria;
+ private JoinRelType _joinRelType;
+ private List<JoinClause> _criteria;
+
+ public JoinNode(int stageId) {
+ super(stageId);
+ }
public JoinNode(int stageId, JoinRelType joinRelType, List<JoinClause> criteria
) {
@@ -43,11 +47,14 @@ public class JoinNode extends AbstractStageNode {
return _criteria;
}
- public static class JoinClause implements Serializable {
- private final KeySelector<Object[], Object> _leftJoinKeySelector;
- private final KeySelector<Object[], Object> _rightJoinKeySelector;
+ public static class JoinClause {
+ private KeySelector<Object[], Object> _leftJoinKeySelector;
+ private KeySelector<Object[], Object> _rightJoinKeySelector;
+
+ public JoinClause() {
+ }
- public JoinClause(KeySelector<Object[], Object> leftKeySelector, KeySelector<Object[], Object> rightKeySelector) {
+ public JoinClause(FieldSelectionKeySelector leftKeySelector, FieldSelectionKeySelector rightKeySelector) {
_leftJoinKeySelector = leftKeySelector;
_rightJoinKeySelector = rightKeySelector;
}
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/MailboxReceiveNode.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/MailboxReceiveNode.java
index d8269346f3..8f0c619b79 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/MailboxReceiveNode.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/MailboxReceiveNode.java
@@ -22,8 +22,12 @@ import org.apache.calcite.rel.RelDistribution;
public class MailboxReceiveNode extends AbstractStageNode {
- private final int _senderStageId;
- private final RelDistribution.Type _exchangeType;
+ private int _senderStageId;
+ private RelDistribution.Type _exchangeType;
+
+ public MailboxReceiveNode(int stageId) {
+ super(stageId);
+ }
public MailboxReceiveNode(int stageId, int senderStageId, RelDistribution.Type exchangeType) {
super(stageId);
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/MailboxSendNode.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/MailboxSendNode.java
index ea39ad3493..9867a16f61 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/MailboxSendNode.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/MailboxSendNode.java
@@ -22,8 +22,12 @@ import org.apache.calcite.rel.RelDistribution;
public class MailboxSendNode extends AbstractStageNode {
- private final int _receiverStageId;
- private final RelDistribution.Type _exchangeType;
+ private int _receiverStageId;
+ private RelDistribution.Type _exchangeType;
+
+ public MailboxSendNode(int stageId) {
+ super(stageId);
+ }
public MailboxSendNode(int stageId, int receiverStageId, RelDistribution.Type exchangeType) {
super(stageId);
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/SerDeUtils.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/SerDeUtils.java
new file mode 100644
index 0000000000..ad7184cdb1
--- /dev/null
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/SerDeUtils.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner.nodes;
+
+import org.apache.pinot.common.proto.Plan;
+
+
+public final class SerDeUtils {
+ private SerDeUtils() {
+ // do not instantiate.
+ }
+
+ public static AbstractStageNode deserializeStageNode(Plan.StageNode protoNode) {
+ AbstractStageNode stageNode = newNodeInstance(protoNode.getNodeName(), protoNode.getStageId());
+ stageNode.setObjectField(protoNode.getObjectField());
+ for (Plan.StageNode protoChild : protoNode.getInputsList()) {
+ stageNode.addInput(deserializeStageNode(protoChild));
+ }
+ return stageNode;
+ }
+
+ public static Plan.StageNode serializeStageNode(AbstractStageNode stageNode) {
+ Plan.StageNode.Builder builder = Plan.StageNode.newBuilder()
+ .setStageId(stageNode.getStageId())
+ .setNodeName(stageNode.getClass().getSimpleName())
+ .setObjectField(stageNode.getObjectField());
+ for (StageNode childNode : stageNode.getInputs()) {
+ builder.addInputs(serializeStageNode((AbstractStageNode) childNode));
+ }
+ return builder.build();
+ }
+
+ private static AbstractStageNode newNodeInstance(String nodeName, int stageId) {
+ switch (nodeName) {
+ case "TableScanNode":
+ return new TableScanNode(stageId);
+ case "JoinNode":
+ return new JoinNode(stageId);
+ case "CalcNode":
+ return new CalcNode(stageId);
+ case "MailboxSendNode":
+ return new MailboxSendNode(stageId);
+ case "MailboxReceiveNode":
+ return new MailboxReceiveNode(stageId);
+ default:
+ throw new IllegalArgumentException("Unknown node name: " + nodeName);
+ }
+ }
+}
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/TableScanNode.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/TableScanNode.java
index 8d78ec6d0f..9375a7e986 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/TableScanNode.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/TableScanNode.java
@@ -22,8 +22,12 @@ import java.util.List;
public class TableScanNode extends AbstractStageNode {
- private final String _tableName;
- private final List<String> _tableScanColumns;
+ private String _tableName;
+ private List<String> _tableScanColumns;
+
+ public TableScanNode(int stageId) {
+ super(stageId);
+ }
public TableScanNode(int stageId, String tableName, List<String> tableScanColumns) {
super(stageId);
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/CalcNode.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/serde/ProtoSerializable.java
similarity index 69%
copy from pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/CalcNode.java
copy to pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/serde/ProtoSerializable.java
index b188b8e2f7..2b99003e87 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/CalcNode.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/serde/ProtoSerializable.java
@@ -16,18 +16,20 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.query.planner.nodes;
+/**
+ * Autogenerated by Thrift Compiler (0.13.0)
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ * @generated
+ */
+package org.apache.pinot.query.planner.nodes.serde;
+
+import org.apache.pinot.common.proto.Plan;
-public class CalcNode extends AbstractStageNode {
- private final String _expression;
+public interface ProtoSerializable {
- public CalcNode(int stageId, String expression) {
- super(stageId);
- _expression = expression;
- }
+ void setObjectField(Plan.ObjectField objFields);
- public String getExpression() {
- return _expression;
- }
+ Plan.ObjectField getObjectField();
}
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/serde/ProtoSerializationUtils.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/serde/ProtoSerializationUtils.java
new file mode 100644
index 0000000000..c30295a101
--- /dev/null
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/serde/ProtoSerializationUtils.java
@@ -0,0 +1,209 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner.nodes.serde;
+
+import com.google.common.base.Preconditions;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.pinot.common.proto.Plan;
+
+
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class ProtoSerializationUtils {
+ private static final String ENUM_VALUE_KEY = "ENUM_VALUE_KEY";
+
+ private ProtoSerializationUtils() {
+ // do not instantiate.
+ }
+
+ public static void fromObjectField(Object object, Plan.ObjectField objectField) {
+ Map<String, Plan.MemberVariableField> memberVariablesMap = objectField.getMemberVariablesMap();
+ try {
+ for (Map.Entry<String, Plan.MemberVariableField> e : memberVariablesMap.entrySet()) {
+ Object memberVarObject = constructMemberVariable(e.getValue());
+ if (memberVarObject != null) {
+ Field declaredField = object.getClass().getDeclaredField(e.getKey());
+ declaredField.setAccessible(true);
+ declaredField.set(object, memberVarObject);
+ }
+ }
+ } catch (NoSuchFieldException | IllegalAccessException e) {
+ throw new IllegalStateException("Unable to set Object field for: " + objectField.getObjectClassName(), e);
+ }
+ }
+
+ public static Plan.ObjectField toObjectField(Object object) {
+ Plan.ObjectField.Builder builder = Plan.ObjectField.newBuilder();
+ builder.setObjectClassName(object.getClass().getName());
+ // special handling for enum
+ if (object instanceof Enum) {
+ builder.putMemberVariables(ENUM_VALUE_KEY, serializeMemberVariable(((Enum) object).name()));
+ } else {
+ try {
+ for (Field field : object.getClass().getDeclaredFields()) {
+ field.setAccessible(true);
+ Object fieldObject = field.get(object);
+ builder.putMemberVariables(field.getName(), serializeMemberVariable(fieldObject));
+ }
+ } catch (IllegalAccessException e) {
+ throw new IllegalStateException("Unable to serialize Object: " + object.getClass(), e);
+ }
+ }
+ return builder.build();
+ }
+
+ // --------------------------------------------------------------------------
+ // Serialize Utils
+ // --------------------------------------------------------------------------
+
+ private static Plan.LiteralField boolField(boolean val) {
+ return Plan.LiteralField.newBuilder().setBoolField(val).build();
+ }
+
+ private static Plan.LiteralField intField(int val) {
+ return Plan.LiteralField.newBuilder().setIntField(val).build();
+ }
+
+ private static Plan.LiteralField longField(long val) {
+ return Plan.LiteralField.newBuilder().setLongField(val).build();
+ }
+
+ private static Plan.LiteralField doubleField(double val) {
+ return Plan.LiteralField.newBuilder().setDoubleField(val).build();
+ }
+
+ private static Plan.LiteralField stringField(String val) {
+ return Plan.LiteralField.newBuilder().setStringField(val).build();
+ }
+
+ private static Plan.MemberVariableField serializeMemberVariable(Object fieldObject) {
+ Plan.MemberVariableField.Builder builder = Plan.MemberVariableField.newBuilder();
+ if (fieldObject instanceof Boolean) {
+ builder.setLiteralField(boolField((Boolean) fieldObject));
+ } else if (fieldObject instanceof Integer) {
+ builder.setLiteralField(intField((Integer) fieldObject));
+ } else if (fieldObject instanceof Long) {
+ builder.setLiteralField(longField((Long) fieldObject));
+ } else if (fieldObject instanceof Double) {
+ builder.setLiteralField(doubleField((Double) fieldObject));
+ } else if (fieldObject instanceof String) {
+ builder.setLiteralField(stringField((String) fieldObject));
+ } else if (fieldObject instanceof List) {
+ builder.setListField(serializeListMemberVariable(fieldObject));
+ } else if (fieldObject instanceof Map) {
+ builder.setMapField(serializeMapMemberVariable(fieldObject));
+ } else {
+ builder.setObjectField(toObjectField(fieldObject));
+ }
+ return builder.build();
+ }
+
+ private static Plan.ListField serializeListMemberVariable(Object fieldObject) {
+ Preconditions.checkState(fieldObject instanceof List);
+ Plan.ListField.Builder builder = Plan.ListField.newBuilder();
+ for (Object e : (List) fieldObject) {
+ builder.addContent(serializeMemberVariable(e));
+ }
+ return builder.build();
+ }
+
+ private static Plan.MapField serializeMapMemberVariable(Object fieldObject) {
+ Preconditions.checkState(fieldObject instanceof Map);
+ Plan.MapField.Builder builder = Plan.MapField.newBuilder();
+ Set<Map.Entry<String, Object>> entrySet = ((Map) fieldObject).entrySet();
+ for (Map.Entry<String, Object> e : entrySet) {
+ builder.putContent(e.getKey(), serializeMemberVariable(e.getValue()));
+ }
+ return builder.build();
+ }
+
+ // --------------------------------------------------------------------------
+ // Deserialize Utils
+ // --------------------------------------------------------------------------
+
+ private static Object constructMemberVariable(Plan.MemberVariableField memberVariableField) {
+ switch (memberVariableField.getMemberVariableFieldCase()) {
+ case LITERALFIELD:
+ return constructLiteral(memberVariableField.getLiteralField());
+ case LISTFIELD:
+ return constructList(memberVariableField.getListField());
+ case MAPFIELD:
+ return constructMap(memberVariableField.getMapField());
+ case OBJECTFIELD:
+ return constructObject(memberVariableField.getObjectField());
+ case MEMBERVARIABLEFIELD_NOT_SET:
+ default:
+ return null;
+ }
+ }
+
+ private static Object constructLiteral(Plan.LiteralField literalField) {
+ switch (literalField.getLiteralFieldCase()) {
+ case BOOLFIELD:
+ return literalField.getBoolField();
+ case INTFIELD:
+ return literalField.getIntField();
+ case LONGFIELD:
+ return literalField.getLongField();
+ case DOUBLEFIELD:
+ return literalField.getDoubleField();
+ case STRINGFIELD:
+ return literalField.getStringField();
+ case LITERALFIELD_NOT_SET:
+ default:
+ return null;
+ }
+ }
+
+ private static List constructList(Plan.ListField listField) {
+ List list = new ArrayList();
+ for (Plan.MemberVariableField e : listField.getContentList()) {
+ list.add(constructMemberVariable(e));
+ }
+ return list;
+ }
+
+ private static Object constructMap(Plan.MapField mapField) {
+ Map map = new HashMap();
+ for (Map.Entry<String, Plan.MemberVariableField> e : mapField.getContentMap().entrySet()) {
+ map.put(e.getKey(), constructMemberVariable(e.getValue()));
+ }
+ return map;
+ }
+
+ private static Object constructObject(Plan.ObjectField objectField) {
+ try {
+ Class<?> clazz = Class.forName(objectField.getObjectClassName());
+ if (clazz.isEnum()) {
+ return Enum.valueOf((Class<Enum>) clazz,
+ objectField.getMemberVariablesOrDefault(ENUM_VALUE_KEY, null).getLiteralField().getStringField());
+ } else {
+ Object obj = clazz.newInstance();
+ fromObjectField(obj, objectField);
+ return obj;
+ }
+ } catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) {
+ throw new IllegalStateException("Unable to create Object of type: " + objectField.getObjectClassName(), e);
+ }
+ }
+}
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/FieldSelectionKeySelector.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/FieldSelectionKeySelector.java
index 0b846e555c..95991d558b 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/FieldSelectionKeySelector.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/FieldSelectionKeySelector.java
@@ -26,10 +26,17 @@ public class FieldSelectionKeySelector implements KeySelector<Object[], Object>
private int _columnIndex;
+ public FieldSelectionKeySelector() {
+ }
+
public FieldSelectionKeySelector(int columnIndex) {
_columnIndex = columnIndex;
}
+ public int getColumnIndex() {
+ return _columnIndex;
+ }
+
@Override
public Object getKey(Object[] input) {
return input[_columnIndex];
diff --git a/pinot-query-runtime/pom.xml b/pinot-query-runtime/pom.xml
index 3607e51e31..03fa270ea4 100644
--- a/pinot-query-runtime/pom.xml
+++ b/pinot-query-runtime/pom.xml
@@ -26,7 +26,7 @@
<parent>
<artifactId>pinot</artifactId>
<groupId>org.apache.pinot</groupId>
- <version>0.10.0-SNAPSHOT</version>
+ <version>0.11.0-SNAPSHOT</version>
</parent>
<artifactId>pinot-query-runtime</artifactId>
<name>Pinot Query Runtime</name>
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/serde/QueryPlanSerDeUtils.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/serde/QueryPlanSerDeUtils.java
index 1b4ecbbdc0..358ebb8465 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/serde/QueryPlanSerDeUtils.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/serde/QueryPlanSerDeUtils.java
@@ -25,6 +25,8 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.pinot.common.proto.Worker;
import org.apache.pinot.core.transport.ServerInstance;
import org.apache.pinot.query.planner.StageMetadata;
+import org.apache.pinot.query.planner.nodes.AbstractStageNode;
+import org.apache.pinot.query.planner.nodes.SerDeUtils;
import org.apache.pinot.query.routing.WorkerInstance;
import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
@@ -41,7 +43,7 @@ public class QueryPlanSerDeUtils {
public static DistributedStagePlan deserialize(Worker.StagePlan stagePlan) {
DistributedStagePlan distributedStagePlan = new DistributedStagePlan(stagePlan.getStageId());
distributedStagePlan.setServerInstance(stringToInstance(stagePlan.getInstanceId()));
- distributedStagePlan.setStageRoot(StageNodeSerDeUtils.deserializeStageRoot(stagePlan.getSerializedStageRoot()));
+ distributedStagePlan.setStageRoot(SerDeUtils.deserializeStageNode(stagePlan.getStageRoot()));
Map<Integer, Worker.StageMetadata> metadataMap = stagePlan.getStageMetadataMap();
distributedStagePlan.getMetadataMap().putAll(protoMapToStageMetadataMap(metadataMap));
return distributedStagePlan;
@@ -51,7 +53,7 @@ public class QueryPlanSerDeUtils {
return Worker.StagePlan.newBuilder()
.setStageId(distributedStagePlan.getStageId())
.setInstanceId(instanceToString(distributedStagePlan.getServerInstance()))
- .setSerializedStageRoot(StageNodeSerDeUtils.serializeStageRoot(distributedStagePlan.getStageRoot()))
+ .setStageRoot(SerDeUtils.serializeStageNode((AbstractStageNode) distributedStagePlan.getStageRoot()))
.putAllStageMetadata(stageMetadataMapToProtoMap(distributedStagePlan.getMetadataMap())).build();
}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/serde/StageNodeSerDeUtils.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/serde/StageNodeSerDeUtils.java
deleted file mode 100644
index 80370128cf..0000000000
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/serde/StageNodeSerDeUtils.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.query.runtime.plan.serde;
-
-import com.google.common.base.Preconditions;
-import com.google.protobuf.ByteString;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import org.apache.pinot.query.planner.nodes.StageNode;
-
-
-public class StageNodeSerDeUtils {
- private StageNodeSerDeUtils() {
- // do not instantiate.
- }
-
- public static StageNode deserializeStageRoot(ByteString serializeStagePlan) {
- try (ByteArrayInputStream bs = new ByteArrayInputStream(serializeStagePlan.toByteArray());
- ObjectInputStream is = new ObjectInputStream(bs)) {
- Object o = is.readObject();
- Preconditions.checkState(o instanceof StageNode, "invalid worker query request object");
- return (StageNode) o;
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- public static ByteString serializeStageRoot(StageNode stageRoot) {
- try (ByteArrayOutputStream bs = new ByteArrayOutputStream();
- ObjectOutputStream os = new ObjectOutputStream(bs)) {
- os.writeObject(stageRoot);
- return ByteString.copyFrom(bs.toByteArray());
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-}
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryServerTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryServerTest.java
index 539d5d9a53..eeed6d20d9 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryServerTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryServerTest.java
@@ -20,29 +20,37 @@ package org.apache.pinot.query.service;
import com.google.common.collect.Lists;
import io.grpc.ManagedChannelBuilder;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ExecutorService;
import org.apache.pinot.common.proto.PinotQueryWorkerGrpc;
import org.apache.pinot.common.proto.Worker;
import org.apache.pinot.core.transport.ServerInstance;
import org.apache.pinot.query.QueryEnvironment;
import org.apache.pinot.query.QueryEnvironmentTestUtils;
import org.apache.pinot.query.planner.QueryPlan;
+import org.apache.pinot.query.planner.StageMetadata;
+import org.apache.pinot.query.planner.nodes.StageNode;
import org.apache.pinot.query.routing.WorkerInstance;
import org.apache.pinot.query.runtime.QueryRunner;
import org.apache.pinot.query.runtime.plan.serde.QueryPlanSerDeUtils;
+import org.apache.pinot.util.TestUtils;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
+import static org.mockito.ArgumentMatchers.any;
+
public class QueryServerTest {
private static final int QUERY_SERVER_COUNT = 2;
- private Map<Integer, QueryServer> _queryServerMap = new HashMap<>();
- private Map<Integer, ServerInstance> _queryServerInstanceMap = new HashMap<>();
+ private final Map<Integer, QueryServer> _queryServerMap = new HashMap<>();
+ private final Map<Integer, ServerInstance> _queryServerInstanceMap = new HashMap<>();
+ private final Map<Integer, QueryRunner> _queryRunnerMap = new HashMap<>();
private QueryEnvironment _queryEnvironment;
@@ -52,9 +60,11 @@ public class QueryServerTest {
for (int i = 0; i < QUERY_SERVER_COUNT; i++) {
int availablePort = QueryEnvironmentTestUtils.getAvailablePort();
- QueryServer queryServer = new QueryServer(availablePort, Mockito.mock(QueryRunner.class));
+ QueryRunner queryRunner = Mockito.mock(QueryRunner.class);
+ QueryServer queryServer = new QueryServer(availablePort, queryRunner);
queryServer.start();
_queryServerMap.put(availablePort, queryServer);
+ _queryRunnerMap.put(availablePort, queryRunner);
// this only test the QueryServer functionality so the server port can be the same as the mailbox port.
// this is only use for test identifier purpose.
_queryServerInstanceMap.put(availablePort, new WorkerInstance("localhost", availablePort, availablePort));
@@ -73,17 +83,59 @@ public class QueryServerTest {
}
}
+ @SuppressWarnings("unchecked")
@Test
public void testWorkerAcceptsWorkerRequestCorrect()
throws Exception {
QueryPlan queryPlan = _queryEnvironment.planQuery("SELECT * FROM a JOIN b ON a.col1 = b.col2");
- int singleServerStageId = QueryEnvironmentTestUtils.getTestStageByServerCount(queryPlan, 1);
+ for (int stageId : queryPlan.getStageMetadataMap().keySet()) {
+ if (stageId > 0) { // we do not test reduce stage.
+ Worker.QueryRequest queryRequest = getQueryRequest(queryPlan, stageId);
+
+ // submit the request for testing.
+ submitRequest(queryRequest);
+
+ StageMetadata stageMetadata = queryPlan.getStageMetadataMap().get(stageId);
+
+ // ensure mock query runner received correctly deserialized payload.
+ // since submitRequest is async, we need to wait for the mockRunner to receive the query payload.
+ QueryRunner mockRunner = _queryRunnerMap.get(stageMetadata.getServerInstances().get(0).getPort());
+ TestUtils.waitForCondition(aVoid -> {
+ try {
+ Mockito.verify(mockRunner).processQuery(Mockito.argThat(distributedStagePlan -> {
+ StageNode stageNode = queryPlan.getQueryStageMap().get(stageId);
+ return isStageNodesEqual(stageNode, distributedStagePlan.getStageRoot()) && isMetadataMapsEqual(
+ stageMetadata, distributedStagePlan.getMetadataMap().get(stageId));
+ }), any(ExecutorService.class), any(Map.class));
+ return true;
+ } catch (Throwable t) {
+ return false;
+ }
+ }, 1000L, "Error verifying mock QueryRunner intercepted query payload!");
+ }
+ }
+ }
- Worker.QueryRequest queryRequest = getQueryRequest(queryPlan, singleServerStageId);
+ private static boolean isMetadataMapsEqual(StageMetadata left, StageMetadata right) {
+ return left.getServerInstances().equals(right.getServerInstances())
+ && left.getServerInstanceToSegmentsMap().equals(right.getServerInstanceToSegmentsMap())
+ && left.getScannedTables().equals(right.getScannedTables());
+ }
- // submit the request for testing.
- submitRequest(queryRequest);
+ private static boolean isStageNodesEqual(StageNode left, StageNode right) {
+ if (left.getStageId() != right.getStageId() || left.getClass() != right.getClass()
+ || left.getInputs().size() != right.getInputs().size()) {
+ return false;
+ }
+ left.getInputs().sort(Comparator.comparingInt(StageNode::getStageId));
+ right.getInputs().sort(Comparator.comparingInt(StageNode::getStageId));
+ for (int i = 0; i < left.getInputs().size(); i++) {
+ if (!isStageNodesEqual(left.getInputs().get(i), right.getInputs().get(i))) {
+ return false;
+ }
+ }
+ return true;
}
private void submitRequest(Worker.QueryRequest queryRequest) {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org