You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ap...@apache.org on 2020/01/10 23:03:44 UTC

[beam] branch num created (now a3b0f9a)

This is an automated email from the ASF dual-hosted git repository.

apilloud pushed a change to branch num
in repository https://gitbox.apache.org/repos/asf/beam.git.


      at a3b0f9a  [BEAM-8630] Use column numbers for BeamZetaSqlCalRel

This branch includes the following new commits:

     new a3b0f9a  [BEAM-8630] Use column numbers for BeamZetaSqlCalRel

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[beam] 01/01: [BEAM-8630] Use column numbers for BeamZetaSqlCalRel

Posted by ap...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

apilloud pushed a commit to branch num
in repository https://gitbox.apache.org/repos/asf/beam.git

commit a3b0f9a4456eaf750053f19b075ac317531bb4c4
Author: Andrew Pilloud <ap...@google.com>
AuthorDate: Wed Jan 8 15:14:59 2020 -0800

    [BEAM-8630] Use column numbers for BeamZetaSqlCalRel
---
 .../extensions/sql/zetasql/BeamZetaSqlCalcRel.java | 33 +++++++++-------------
 1 file changed, 14 insertions(+), 19 deletions(-)

diff --git a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCalcRel.java b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCalcRel.java
index 330fb2d..153df25 100644
--- a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCalcRel.java
+++ b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCalcRel.java
@@ -34,7 +34,6 @@ import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
 import org.apache.beam.sdk.extensions.sql.meta.provider.bigquery.BeamBigQuerySqlDialect;
 import org.apache.beam.sdk.extensions.sql.meta.provider.bigquery.BeamSqlUnparseContext;
 import org.apache.beam.sdk.schemas.Schema;
-import org.apache.beam.sdk.schemas.Schema.Field;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
@@ -67,13 +66,14 @@ public class BeamZetaSqlCalcRel extends AbstractBeamCalcRel {
   private static final SqlDialect DIALECT = BeamBigQuerySqlDialect.DEFAULT;
   private final SqlImplementor.Context context;
 
+  private static String columnName(int i) {
+    return "_" + i;
+  }
+
   public BeamZetaSqlCalcRel(
       RelOptCluster cluster, RelTraitSet traits, RelNode input, RexProgram program) {
     super(cluster, traits, input, program);
-    final IntFunction<SqlNode> fn =
-        i ->
-            new SqlIdentifier(
-                getProgram().getInputRowType().getFieldList().get(i).getName(), SqlParserPos.ZERO);
+    final IntFunction<SqlNode> fn = i -> new SqlIdentifier(columnName(i), SqlParserPos.ZERO);
     context = new BeamSqlUnparseContext(fn);
   }
 
@@ -146,20 +146,21 @@ public class BeamZetaSqlCalcRel extends AbstractBeamCalcRel {
     @Setup
     public void setup() {
       AnalyzerOptions options = SqlAnalyzer.initAnalyzerOptions();
-      for (Field field : inputSchema.getFields()) {
+      for (int i = 0; i < inputSchema.getFieldCount(); i++) {
         options.addExpressionColumn(
-            sanitize(field.getName()), ZetaSqlUtils.beamFieldTypeToZetaSqlType(field.getType()));
+            columnName(i),
+            ZetaSqlUtils.beamFieldTypeToZetaSqlType(inputSchema.getField(i).getType()));
       }
 
       // TODO[BEAM-8630]: use a single PreparedExpression for all condition and projects
       projectExps = new ArrayList<>();
       for (String project : projects) {
-        PreparedExpression projectExp = new PreparedExpression(sanitize(project));
+        PreparedExpression projectExp = new PreparedExpression(project);
         projectExp.prepare(options);
         projectExps.add(projectExp);
       }
       if (condition != null) {
-        conditionExp = new PreparedExpression(sanitize(condition));
+        conditionExp = new PreparedExpression(condition);
         conditionExp.prepare(options);
       }
     }
@@ -168,10 +169,11 @@ public class BeamZetaSqlCalcRel extends AbstractBeamCalcRel {
     public void processElement(ProcessContext c) {
       Map<String, Value> columns = new HashMap<>();
       Row row = c.element();
-      for (Field field : inputSchema.getFields()) {
+      for (int i = 0; i < inputSchema.getFieldCount(); i++) {
         columns.put(
-            sanitize(field.getName()),
-            ZetaSqlUtils.javaObjectToZetaSqlValue(row.getValue(field.getName()), field.getType()));
+            columnName(i),
+            ZetaSqlUtils.javaObjectToZetaSqlValue(
+                row.getValue(i), inputSchema.getField(i).getType()));
       }
 
       // TODO[BEAM-8630]: support parameters in expression evaluation
@@ -201,12 +203,5 @@ public class BeamZetaSqlCalcRel extends AbstractBeamCalcRel {
         conditionExp.close();
       }
     }
-
-    // Replaces "$" with "_" because "$" is not allowed in a valid ZetaSQL identifier
-    // (ZetaSQL identifier syntax: [A-Za-z_][A-Za-z_0-9]*)
-    // TODO[BEAM-8630]: check if this is sufficient and correct, or even better fix this in Calcite
-    private static String sanitize(String identifier) {
-      return identifier.replaceAll("\\$", "_");
-    }
   }
 }