You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by mi...@apache.org on 2018/03/15 18:29:31 UTC

[beam] branch master updated: [SQL] Add support for DOT expression (#4863)

This is an automated email from the ASF dual-hosted git repository.

mingmxu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 52d40ff  [SQL] Add support for DOT expression (#4863)
52d40ff is described below

commit 52d40ff1d9686f437d6af426690d9d4db1f2bfec
Author: Anton Kedin <33...@users.noreply.github.com>
AuthorDate: Thu Mar 15 11:29:24 2018 -0700

    [SQL] Add support for DOT expression (#4863)
    
    Enables referencing dynamic row fields by name
---
 .../sql/impl/interpreter/BeamSqlFnExecutor.java    |  4 +
 .../interpreter/operator/BeamSqlDotExpression.java | 67 ++++++++++++++++
 .../beam/sdk/extensions/sql/BeamSqlArrayTest.java  | 57 ++++++++++++++
 .../operator/BeamSqlDotExpressionTest.java         | 91 ++++++++++++++++++++++
 4 files changed, 219 insertions(+)

diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java
index 3e7089a..d6307a9 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java
@@ -24,6 +24,7 @@ import java.util.List;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlCaseExpression;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlCastExpression;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlDefaultExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlDotExpression;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlInputRefExpression;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
@@ -414,6 +415,9 @@ public class BeamSqlFnExecutor implements BeamSqlExpressionExecutor {
         case "CARDINALITY":
           return new BeamSqlCardinalityExpression(subExps, node.type.getSqlTypeName());
 
+        case "DOT":
+          return new BeamSqlDotExpression(subExps, node.type.getSqlTypeName());
+
         //DEFAULT keyword for UDF with optional parameter
         case "DEFAULT":
           return new BeamSqlDefaultExpression();
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlDotExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlDotExpression.java
new file mode 100644
index 0000000..79c4645
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlDotExpression.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator;
+
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.SqlTypeCoder;
+import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.RowType;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * Implements DOT operator to access fields of dynamic ROWs.
+ */
+public class BeamSqlDotExpression extends BeamSqlExpression {
+
+  public BeamSqlDotExpression(List<BeamSqlExpression> operands, SqlTypeName sqlTypeName) {
+    super(operands, sqlTypeName);
+  }
+
+  @Override
+  public boolean accept() {
+    return
+        operands.size() == 2
+        && SqlTypeName.ROW.equals(operands.get(0).getOutputType())
+        && (SqlTypeName.VARCHAR.equals(operands.get(1).getOutputType())
+            || SqlTypeName.CHAR.equals(operands.get(1).getOutputType()));
+  }
+
+  @Override
+  public BeamSqlPrimitive evaluate(Row inputRow, BoundedWindow window) {
+    Row dynamicRow = opValueEvaluated(0, inputRow, window);
+    String fieldName = opValueEvaluated(1, inputRow, window);
+    SqlTypeName fieldType = getFieldType(dynamicRow, fieldName);
+
+    return BeamSqlPrimitive.of(fieldType, dynamicRow.getValue(fieldName));
+  }
+
+  private SqlTypeName getFieldType(Row row, String fieldName) {
+    RowType rowType = row.getRowType();
+    int fieldIndex = rowType.indexOf(fieldName);
+
+    if (fieldIndex < 0) {
+      throw new IllegalArgumentException(
+          "Cannot find field '" + fieldName + "' in " + row.getRowType());
+    }
+
+    SqlTypeCoder fieldCoder = (SqlTypeCoder) row.getRowType().getFieldCoder(fieldIndex);
+    return CalciteUtils.toCalciteType(fieldCoder);
+  }
+}
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlArrayTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlArrayTest.java
index 8553c7c..b920186 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlArrayTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlArrayTest.java
@@ -327,6 +327,63 @@ public class BeamSqlArrayTest {
     pipeline.run();
   }
 
+  @Test
+  public void testSelectRowFieldFromArrayOfRows() {
+    RowType elementRowType =
+        RowSqlType
+            .builder()
+            .withVarcharField("f_rowString")
+            .withIntegerField("f_rowInt")
+            .build();
+
+    RowType resultRowType =
+        RowSqlType
+            .builder()
+            .withVarcharField("f_stringField")
+            .build();
+
+    RowType inputType =
+        RowSqlType
+            .builder()
+            .withIntegerField("f_int")
+            .withArrayField("f_arrayOfRows", elementRowType)
+            .build();
+
+    PCollection<Row> input =
+        PBegin.in(pipeline)
+              .apply(
+                  Create.of(
+                      Row.withRowType(inputType)
+                         .addValues(
+                             1,
+                             Arrays.asList(
+                                 Row.withRowType(elementRowType).addValues("AA", 11).build(),
+                                 Row.withRowType(elementRowType).addValues("BB", 22).build()))
+                         .build(),
+                      Row.withRowType(inputType)
+                         .addValues(
+                             2,
+                             Arrays.asList(
+                                 Row.withRowType(elementRowType).addValues("CC", 33).build(),
+                                 Row.withRowType(elementRowType).addValues("DD", 44).build()))
+                         .build())
+                        .withCoder(inputType.getRowCoder()));
+
+    PCollection<Row> result =
+        input
+            .apply(
+                BeamSql.query(
+                    "SELECT f_arrayOfRows[1].f_rowString FROM PCOLLECTION"))
+            .setCoder(resultRowType.getRowCoder());
+
+    PAssert.that(result)
+           .containsInAnyOrder(
+               Row.withRowType(resultRowType).addValues("BB").build(),
+               Row.withRowType(resultRowType).addValues("DD").build());
+
+    pipeline.run();
+  }
+
   private PCollection<Row> pCollectionOf2Elements() {
     return
         PBegin
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlDotExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlDotExpressionTest.java
new file mode 100644
index 0000000..a054e1c
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlDotExpressionTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator;
+
+import static org.junit.Assert.assertEquals;
+
+import com.google.common.collect.ImmutableList;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.RowSqlType;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.RowType;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+/**
+ * Unit tests for {@link BeamSqlDotExpression}.
+ */
+public class BeamSqlDotExpressionTest {
+
+  private static final Row NULL_ROW = null;
+  private static final BoundedWindow NULL_WINDOW = null;
+
+  @Rule public ExpectedException thrown = ExpectedException.none();
+
+  @Test
+  public void testReturnsFieldValue() {
+    RowType rowType =
+        RowSqlType
+          .builder()
+          .withVarcharField("f_string")
+          .withIntegerField("f_int")
+          .build();
+
+    List<BeamSqlExpression> elements =
+        ImmutableList.of(
+            BeamSqlPrimitive.of(
+                SqlTypeName.ROW,
+                Row
+                  .withRowType(rowType)
+                  .addValues("aaa", 14)
+                  .build()),
+            BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "f_string"));
+
+    BeamSqlDotExpression arrayExpression = new BeamSqlDotExpression(elements, SqlTypeName.VARCHAR);
+
+    assertEquals("aaa", arrayExpression.evaluate(NULL_ROW, NULL_WINDOW).getValue());
+  }
+
+  @Test
+  public void testThrowsForNonExistentField() {
+    RowType rowType =
+        RowSqlType
+            .builder()
+            .withVarcharField("f_string")
+            .withIntegerField("f_int")
+            .build();
+
+    List<BeamSqlExpression> elements =
+        ImmutableList.of(
+            BeamSqlPrimitive.of(
+                SqlTypeName.ROW,
+                Row
+                    .withRowType(rowType)
+                    .addValues("aaa", 14)
+                    .build()),
+            BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "f_nonExistent"));
+
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Cannot find field");
+
+    new BeamSqlDotExpression(elements, SqlTypeName.VARCHAR).evaluate(NULL_ROW, NULL_WINDOW);
+  }
+}

-- 
To stop receiving notification emails like this one, please contact
mingmxu@apache.org.