You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2020/06/20 01:31:54 UTC

[beam] branch master updated: [BEAM-10267] ZetaSQLCalc makes grpc call per row

This is an automated email from the ASF dual-hosted git repository.

robinyqiu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 148cd8a  [BEAM-10267] ZetaSQLCalc makes grpc call per row
     new f3d5fc4  Merge pull request #12036 from apilloud/singleexp
148cd8a is described below

commit 148cd8a14d9188a81aff48a4b0bf99393c8658cb
Author: Andrew Pilloud <ap...@google.com>
AuthorDate: Tue Jun 16 15:40:55 2020 -0700

    [BEAM-10267] ZetaSQLCalc makes grpc call per row
---
 .../extensions/sql/zetasql/BeamZetaSqlCalcRel.java | 88 ++++++----------------
 .../sql/zetasql/ZetaSqlBeamTranslationUtils.java   |  2 +-
 2 files changed, 26 insertions(+), 64 deletions(-)

diff --git a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCalcRel.java b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCalcRel.java
index 05b1bac..d92d9c3 100644
--- a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCalcRel.java
+++ b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCalcRel.java
@@ -20,14 +20,10 @@ package org.apache.beam.sdk.extensions.sql.zetasql;
 import com.google.zetasql.AnalyzerOptions;
 import com.google.zetasql.PreparedExpression;
 import com.google.zetasql.Value;
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.function.IntFunction;
-import java.util.stream.Collectors;
-import javax.annotation.Nullable;
 import org.apache.beam.sdk.annotations.Internal;
 import org.apache.beam.sdk.extensions.sql.impl.BeamSqlPipelineOptions;
 import org.apache.beam.sdk.extensions.sql.impl.rel.AbstractBeamCalcRel;
@@ -46,15 +42,15 @@ import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelTraitSe
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelNode;
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.core.Calc;
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.rel2sql.SqlImplementor;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexBuilder;
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexNode;
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexProgram;
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.SqlDialect;
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.SqlIdentifier;
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.SqlNode;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.fun.SqlStdOperatorTable;
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.parser.SqlParserPos;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
 
 /**
  * BeamRelNode to replace {@code Project} and {@code Filter} node based on the {@code ZetaSQL}
@@ -97,19 +93,22 @@ public class BeamZetaSqlCalcRel extends AbstractBeamCalcRel {
           pinput.size());
       PCollection<Row> upstream = pinput.get(0);
 
-      final List<String> projects =
-          getProgram().getProjectList().stream()
-              .map(BeamZetaSqlCalcRel.this::unparseRexNode)
-              .collect(Collectors.toList());
+      final RexBuilder rexBuilder = getCluster().getRexBuilder();
+      RexNode rex = rexBuilder.makeCall(SqlStdOperatorTable.ROW, getProgram().getProjectList());
+
       final RexNode condition = getProgram().getCondition();
+      if (condition != null) {
+        rex =
+            rexBuilder.makeCall(
+                SqlStdOperatorTable.CASE, condition, rex, rexBuilder.makeNullLiteral(getRowType()));
+      }
 
       boolean verifyRowValues =
           pinput.getPipeline().getOptions().as(BeamSqlPipelineOptions.class).getVerifyRowValues();
       Schema outputSchema = CalciteUtils.toSchema(getRowType());
       CalcFn calcFn =
           new CalcFn(
-              projects,
-              condition == null ? null : unparseRexNode(condition),
+              context.toSql(getProgram(), rex).toSqlString(DIALECT).getSql(),
               upstream.getSchema(),
               outputSchema,
               verifyRowValues);
@@ -121,32 +120,19 @@ public class BeamZetaSqlCalcRel extends AbstractBeamCalcRel {
     }
   }
 
-  private String unparseRexNode(RexNode rex) {
-    return context.toSql(getProgram(), rex).toSqlString(DIALECT).getSql();
-  }
-
   /**
    * {@code CalcFn} is the executor for a {@link BeamZetaSqlCalcRel} step. The implementation is
    * based on the {@code ZetaSQL} expression evaluator.
    */
   private static class CalcFn extends DoFn<Row, Row> {
-    private final List<String> projects;
-    @Nullable private final String condition;
+    private final String sql;
     private final Schema inputSchema;
     private final Schema outputSchema;
     private final boolean verifyRowValues;
-    private transient List<PreparedExpression> projectExps;
-    @Nullable private transient PreparedExpression conditionExp;
-
-    CalcFn(
-        List<String> projects,
-        @Nullable String condition,
-        Schema inputSchema,
-        Schema outputSchema,
-        boolean verifyRowValues) {
-      Preconditions.checkArgument(projects.size() == outputSchema.getFieldCount());
-      this.projects = ImmutableList.copyOf(projects);
-      this.condition = condition;
+    private transient PreparedExpression exp;
+
+    CalcFn(String sql, Schema inputSchema, Schema outputSchema, boolean verifyRowValues) {
+      this.sql = sql;
       this.inputSchema = inputSchema;
       this.outputSchema = outputSchema;
       this.verifyRowValues = verifyRowValues;
@@ -162,17 +148,8 @@ public class BeamZetaSqlCalcRel extends AbstractBeamCalcRel {
                 inputSchema.getField(i).getType()));
       }
 
-      // TODO[BEAM-8630]: use a single PreparedExpression for all condition and projects
-      projectExps = new ArrayList<>();
-      for (String project : projects) {
-        PreparedExpression projectExp = new PreparedExpression(project);
-        projectExp.prepare(options);
-        projectExps.add(projectExp);
-      }
-      if (condition != null) {
-        conditionExp = new PreparedExpression(condition);
-        conditionExp.prepare(options);
-      }
+      exp = new PreparedExpression(sql);
+      exp.prepare(options);
     }
 
     @ProcessElement
@@ -190,33 +167,18 @@ public class BeamZetaSqlCalcRel extends AbstractBeamCalcRel {
       // The map is empty because parameters in the query string have already been substituted.
       Map<String, Value> params = Collections.emptyMap();
 
-      if (conditionExp != null && !conditionExp.execute(columns, params).getBoolValue()) {
-        return;
-      }
-
-      List<Object> values = Lists.newArrayListWithExpectedSize(outputSchema.getFieldCount());
-      for (int i = 0; i < outputSchema.getFieldCount(); i++) {
-        // TODO[BEAM-8630]: performance optimization by bundling the gRPC calls
-        Value v = projectExps.get(i).execute(columns, params);
-        values.add(
-            ZetaSqlBeamTranslationUtils.zetaSqlValueToJavaObject(
-                v, outputSchema.getField(i).getType(), verifyRowValues));
+      Value v = exp.execute(columns, params);
+      if (!v.isNull()) {
+        Row outputRow =
+            ZetaSqlBeamTranslationUtils.zetaSqlStructValueToBeamRow(
+                v, outputSchema, verifyRowValues);
+        c.output(outputRow);
       }
-      Row outputRow =
-          verifyRowValues
-              ? Row.withSchema(outputSchema).addValues(values).build()
-              : Row.withSchema(outputSchema).attachValues(values);
-      c.output(outputRow);
     }
 
     @Teardown
     public void teardown() {
-      for (PreparedExpression projectExp : projectExps) {
-        projectExp.close();
-      }
-      if (conditionExp != null) {
-        conditionExp.close();
-      }
+      exp.close();
     }
   }
 }
diff --git a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlBeamTranslationUtils.java b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlBeamTranslationUtils.java
index 61127b7..80d5c80 100644
--- a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlBeamTranslationUtils.java
+++ b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlBeamTranslationUtils.java
@@ -269,7 +269,7 @@ public final class ZetaSqlBeamTranslationUtils {
         .collect(Collectors.toList());
   }
 
-  private static Row zetaSqlStructValueToBeamRow(
+  public static Row zetaSqlStructValueToBeamRow(
       Value structValue, Schema schema, boolean verifyValues) {
     List<Object> objects = new ArrayList<>(schema.getFieldCount());
     List<Value> values = structValue.getFieldList();