You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by mi...@apache.org on 2018/03/15 18:29:31 UTC
[beam] branch master updated: [SQL] Add support for DOT expression
(#4863)
This is an automated email from the ASF dual-hosted git repository.
mingmxu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 52d40ff [SQL] Add support for DOT expression (#4863)
52d40ff is described below
commit 52d40ff1d9686f437d6af426690d9d4db1f2bfec
Author: Anton Kedin <33...@users.noreply.github.com>
AuthorDate: Thu Mar 15 11:29:24 2018 -0700
[SQL] Add support for DOT expression (#4863)
Enables referencing dynamic row fields by name
---
.../sql/impl/interpreter/BeamSqlFnExecutor.java | 4 +
.../interpreter/operator/BeamSqlDotExpression.java | 67 ++++++++++++++++
.../beam/sdk/extensions/sql/BeamSqlArrayTest.java | 57 ++++++++++++++
.../operator/BeamSqlDotExpressionTest.java | 91 ++++++++++++++++++++++
4 files changed, 219 insertions(+)
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java
index 3e7089a..d6307a9 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java
@@ -24,6 +24,7 @@ import java.util.List;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlCaseExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlCastExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlDefaultExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlDotExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlInputRefExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
@@ -414,6 +415,9 @@ public class BeamSqlFnExecutor implements BeamSqlExpressionExecutor {
case "CARDINALITY":
return new BeamSqlCardinalityExpression(subExps, node.type.getSqlTypeName());
+ case "DOT":
+ return new BeamSqlDotExpression(subExps, node.type.getSqlTypeName());
+
//DEFAULT keyword for UDF with optional parameter
case "DEFAULT":
return new BeamSqlDefaultExpression();
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlDotExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlDotExpression.java
new file mode 100644
index 0000000..79c4645
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlDotExpression.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator;
+
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.SqlTypeCoder;
+import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.RowType;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * Implements DOT operator to access fields of dynamic ROWs.
+ */
+public class BeamSqlDotExpression extends BeamSqlExpression {
+
+ public BeamSqlDotExpression(List<BeamSqlExpression> operands, SqlTypeName sqlTypeName) {
+ super(operands, sqlTypeName);
+ }
+
+ @Override
+ public boolean accept() {
+ return
+ operands.size() == 2
+ && SqlTypeName.ROW.equals(operands.get(0).getOutputType())
+ && (SqlTypeName.VARCHAR.equals(operands.get(1).getOutputType())
+ || SqlTypeName.CHAR.equals(operands.get(1).getOutputType()));
+ }
+
+ @Override
+ public BeamSqlPrimitive evaluate(Row inputRow, BoundedWindow window) {
+ Row dynamicRow = opValueEvaluated(0, inputRow, window);
+ String fieldName = opValueEvaluated(1, inputRow, window);
+ SqlTypeName fieldType = getFieldType(dynamicRow, fieldName);
+
+ return BeamSqlPrimitive.of(fieldType, dynamicRow.getValue(fieldName));
+ }
+
+ private SqlTypeName getFieldType(Row row, String fieldName) {
+ RowType rowType = row.getRowType();
+ int fieldIndex = rowType.indexOf(fieldName);
+
+ if (fieldIndex < 0) {
+ throw new IllegalArgumentException(
+ "Cannot find field '" + fieldName + "' in " + row.getRowType());
+ }
+
+ SqlTypeCoder fieldCoder = (SqlTypeCoder) row.getRowType().getFieldCoder(fieldIndex);
+ return CalciteUtils.toCalciteType(fieldCoder);
+ }
+}
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlArrayTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlArrayTest.java
index 8553c7c..b920186 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlArrayTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlArrayTest.java
@@ -327,6 +327,63 @@ public class BeamSqlArrayTest {
pipeline.run();
}
+ @Test
+ public void testSelectRowFieldFromArrayOfRows() {
+ RowType elementRowType =
+ RowSqlType
+ .builder()
+ .withVarcharField("f_rowString")
+ .withIntegerField("f_rowInt")
+ .build();
+
+ RowType resultRowType =
+ RowSqlType
+ .builder()
+ .withVarcharField("f_stringField")
+ .build();
+
+ RowType inputType =
+ RowSqlType
+ .builder()
+ .withIntegerField("f_int")
+ .withArrayField("f_arrayOfRows", elementRowType)
+ .build();
+
+ PCollection<Row> input =
+ PBegin.in(pipeline)
+ .apply(
+ Create.of(
+ Row.withRowType(inputType)
+ .addValues(
+ 1,
+ Arrays.asList(
+ Row.withRowType(elementRowType).addValues("AA", 11).build(),
+ Row.withRowType(elementRowType).addValues("BB", 22).build()))
+ .build(),
+ Row.withRowType(inputType)
+ .addValues(
+ 2,
+ Arrays.asList(
+ Row.withRowType(elementRowType).addValues("CC", 33).build(),
+ Row.withRowType(elementRowType).addValues("DD", 44).build()))
+ .build())
+ .withCoder(inputType.getRowCoder()));
+
+ PCollection<Row> result =
+ input
+ .apply(
+ BeamSql.query(
+ "SELECT f_arrayOfRows[1].f_rowString FROM PCOLLECTION"))
+ .setCoder(resultRowType.getRowCoder());
+
+ PAssert.that(result)
+ .containsInAnyOrder(
+ Row.withRowType(resultRowType).addValues("BB").build(),
+ Row.withRowType(resultRowType).addValues("DD").build());
+
+ pipeline.run();
+ }
+
private PCollection<Row> pCollectionOf2Elements() {
return
PBegin
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlDotExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlDotExpressionTest.java
new file mode 100644
index 0000000..a054e1c
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlDotExpressionTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator;
+
+import static org.junit.Assert.assertEquals;
+
+import com.google.common.collect.ImmutableList;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.RowSqlType;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.RowType;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+/**
+ * Unit tests for {@link BeamSqlDotExpression}.
+ */
+public class BeamSqlDotExpressionTest {
+
+ private static final Row NULL_ROW = null;
+ private static final BoundedWindow NULL_WINDOW = null;
+
+ @Rule public ExpectedException thrown = ExpectedException.none();
+
+ @Test
+ public void testReturnsFieldValue() {
+ RowType rowType =
+ RowSqlType
+ .builder()
+ .withVarcharField("f_string")
+ .withIntegerField("f_int")
+ .build();
+
+ List<BeamSqlExpression> elements =
+ ImmutableList.of(
+ BeamSqlPrimitive.of(
+ SqlTypeName.ROW,
+ Row
+ .withRowType(rowType)
+ .addValues("aaa", 14)
+ .build()),
+ BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "f_string"));
+
+ BeamSqlDotExpression arrayExpression = new BeamSqlDotExpression(elements, SqlTypeName.VARCHAR);
+
+ assertEquals("aaa", arrayExpression.evaluate(NULL_ROW, NULL_WINDOW).getValue());
+ }
+
+ @Test
+ public void testThrowsForNonExistentField() {
+ RowType rowType =
+ RowSqlType
+ .builder()
+ .withVarcharField("f_string")
+ .withIntegerField("f_int")
+ .build();
+
+ List<BeamSqlExpression> elements =
+ ImmutableList.of(
+ BeamSqlPrimitive.of(
+ SqlTypeName.ROW,
+ Row
+ .withRowType(rowType)
+ .addValues("aaa", 14)
+ .build()),
+ BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "f_nonExistent"));
+
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage("Cannot find field");
+
+ new BeamSqlDotExpression(elements, SqlTypeName.VARCHAR).evaluate(NULL_ROW, NULL_WINDOW);
+ }
+}
--
To stop receiving notification emails like this one, please contact
mingmxu@apache.org.