You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2020/06/20 01:31:54 UTC
[beam] branch master updated: [BEAM-10267] ZetaSQLCalc makes grpc
call per row
This is an automated email from the ASF dual-hosted git repository.
robinyqiu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 148cd8a [BEAM-10267] ZetaSQLCalc makes grpc call per row
new f3d5fc4 Merge pull request #12036 from apilloud/singleexp
148cd8a is described below
commit 148cd8a14d9188a81aff48a4b0bf99393c8658cb
Author: Andrew Pilloud <ap...@google.com>
AuthorDate: Tue Jun 16 15:40:55 2020 -0700
[BEAM-10267] ZetaSQLCalc makes grpc call per row
---
.../extensions/sql/zetasql/BeamZetaSqlCalcRel.java | 88 ++++++----------------
.../sql/zetasql/ZetaSqlBeamTranslationUtils.java | 2 +-
2 files changed, 26 insertions(+), 64 deletions(-)
diff --git a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCalcRel.java b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCalcRel.java
index 05b1bac..d92d9c3 100644
--- a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCalcRel.java
+++ b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCalcRel.java
@@ -20,14 +20,10 @@ package org.apache.beam.sdk.extensions.sql.zetasql;
import com.google.zetasql.AnalyzerOptions;
import com.google.zetasql.PreparedExpression;
import com.google.zetasql.Value;
-import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import java.util.function.IntFunction;
-import java.util.stream.Collectors;
-import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.extensions.sql.impl.BeamSqlPipelineOptions;
import org.apache.beam.sdk.extensions.sql.impl.rel.AbstractBeamCalcRel;
@@ -46,15 +42,15 @@ import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelTraitSe
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelNode;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.core.Calc;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.rel2sql.SqlImplementor;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexBuilder;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexNode;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexProgram;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.SqlDialect;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.SqlIdentifier;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.SqlNode;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.fun.SqlStdOperatorTable;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.parser.SqlParserPos;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
/**
* BeamRelNode to replace {@code Project} and {@code Filter} node based on the {@code ZetaSQL}
@@ -97,19 +93,22 @@ public class BeamZetaSqlCalcRel extends AbstractBeamCalcRel {
pinput.size());
PCollection<Row> upstream = pinput.get(0);
- final List<String> projects =
- getProgram().getProjectList().stream()
- .map(BeamZetaSqlCalcRel.this::unparseRexNode)
- .collect(Collectors.toList());
+ final RexBuilder rexBuilder = getCluster().getRexBuilder();
+ RexNode rex = rexBuilder.makeCall(SqlStdOperatorTable.ROW, getProgram().getProjectList());
+
final RexNode condition = getProgram().getCondition();
+ if (condition != null) {
+ rex =
+ rexBuilder.makeCall(
+ SqlStdOperatorTable.CASE, condition, rex, rexBuilder.makeNullLiteral(getRowType()));
+ }
boolean verifyRowValues =
pinput.getPipeline().getOptions().as(BeamSqlPipelineOptions.class).getVerifyRowValues();
Schema outputSchema = CalciteUtils.toSchema(getRowType());
CalcFn calcFn =
new CalcFn(
- projects,
- condition == null ? null : unparseRexNode(condition),
+ context.toSql(getProgram(), rex).toSqlString(DIALECT).getSql(),
upstream.getSchema(),
outputSchema,
verifyRowValues);
@@ -121,32 +120,19 @@ public class BeamZetaSqlCalcRel extends AbstractBeamCalcRel {
}
}
- private String unparseRexNode(RexNode rex) {
- return context.toSql(getProgram(), rex).toSqlString(DIALECT).getSql();
- }
-
/**
* {@code CalcFn} is the executor for a {@link BeamZetaSqlCalcRel} step. The implementation is
* based on the {@code ZetaSQL} expression evaluator.
*/
private static class CalcFn extends DoFn<Row, Row> {
- private final List<String> projects;
- @Nullable private final String condition;
+ private final String sql;
private final Schema inputSchema;
private final Schema outputSchema;
private final boolean verifyRowValues;
- private transient List<PreparedExpression> projectExps;
- @Nullable private transient PreparedExpression conditionExp;
-
- CalcFn(
- List<String> projects,
- @Nullable String condition,
- Schema inputSchema,
- Schema outputSchema,
- boolean verifyRowValues) {
- Preconditions.checkArgument(projects.size() == outputSchema.getFieldCount());
- this.projects = ImmutableList.copyOf(projects);
- this.condition = condition;
+ private transient PreparedExpression exp;
+
+ CalcFn(String sql, Schema inputSchema, Schema outputSchema, boolean verifyRowValues) {
+ this.sql = sql;
this.inputSchema = inputSchema;
this.outputSchema = outputSchema;
this.verifyRowValues = verifyRowValues;
@@ -162,17 +148,8 @@ public class BeamZetaSqlCalcRel extends AbstractBeamCalcRel {
inputSchema.getField(i).getType()));
}
- // TODO[BEAM-8630]: use a single PreparedExpression for all condition and projects
- projectExps = new ArrayList<>();
- for (String project : projects) {
- PreparedExpression projectExp = new PreparedExpression(project);
- projectExp.prepare(options);
- projectExps.add(projectExp);
- }
- if (condition != null) {
- conditionExp = new PreparedExpression(condition);
- conditionExp.prepare(options);
- }
+ exp = new PreparedExpression(sql);
+ exp.prepare(options);
}
@ProcessElement
@@ -190,33 +167,18 @@ public class BeamZetaSqlCalcRel extends AbstractBeamCalcRel {
// The map is empty because parameters in the query string have already been substituted.
Map<String, Value> params = Collections.emptyMap();
- if (conditionExp != null && !conditionExp.execute(columns, params).getBoolValue()) {
- return;
- }
-
- List<Object> values = Lists.newArrayListWithExpectedSize(outputSchema.getFieldCount());
- for (int i = 0; i < outputSchema.getFieldCount(); i++) {
- // TODO[BEAM-8630]: performance optimization by bundling the gRPC calls
- Value v = projectExps.get(i).execute(columns, params);
- values.add(
- ZetaSqlBeamTranslationUtils.zetaSqlValueToJavaObject(
- v, outputSchema.getField(i).getType(), verifyRowValues));
+ Value v = exp.execute(columns, params);
+ if (!v.isNull()) {
+ Row outputRow =
+ ZetaSqlBeamTranslationUtils.zetaSqlStructValueToBeamRow(
+ v, outputSchema, verifyRowValues);
+ c.output(outputRow);
}
- Row outputRow =
- verifyRowValues
- ? Row.withSchema(outputSchema).addValues(values).build()
- : Row.withSchema(outputSchema).attachValues(values);
- c.output(outputRow);
}
@Teardown
public void teardown() {
- for (PreparedExpression projectExp : projectExps) {
- projectExp.close();
- }
- if (conditionExp != null) {
- conditionExp.close();
- }
+ exp.close();
}
}
}
diff --git a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlBeamTranslationUtils.java b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlBeamTranslationUtils.java
index 61127b7..80d5c80 100644
--- a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlBeamTranslationUtils.java
+++ b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlBeamTranslationUtils.java
@@ -269,7 +269,7 @@ public final class ZetaSqlBeamTranslationUtils {
.collect(Collectors.toList());
}
- private static Row zetaSqlStructValueToBeamRow(
+ public static Row zetaSqlStructValueToBeamRow(
Value structValue, Schema schema, boolean verifyValues) {
List<Object> objects = new ArrayList<>(schema.getFieldCount());
List<Value> values = structValue.getFieldList();