You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by mi...@apache.org on 2018/03/15 04:20:10 UTC
[beam] branch master updated: [SQL] Add support for arrays of rows
(#4857)
This is an automated email from the ASF dual-hosted git repository.
mingmxu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 7d96ee6 [SQL] Add support for arrays of rows (#4857)
7d96ee6 is described below
commit 7d96ee6250ea93b4822c32ace84f2f5ea21e2da9
Author: Anton Kedin <33...@users.noreply.github.com>
AuthorDate: Wed Mar 14 21:20:02 2018 -0700
[SQL] Add support for arrays of rows (#4857)
Support array fields containing rows
---
.../apache/beam/sdk/extensions/sql/RowSqlType.java | 9 ++
.../beam/sdk/extensions/sql/SqlTypeCoder.java | 4 -
.../beam/sdk/extensions/sql/SqlTypeCoders.java | 16 ++-
.../beam/sdk/extensions/sql/impl/BeamSqlEnv.java | 3 +-
.../sql/impl/interpreter/BeamSqlFnExecutor.java | 8 +-
.../interpreter/operator/BeamSqlPrimitive.java | 5 +-
.../operator/row/BeamSqlFieldAccessExpression.java | 27 ++++-
.../extensions/sql/impl/utils/CalciteUtils.java | 51 ++++-----
.../beam/sdk/extensions/sql/BeamSqlArrayTest.java | 121 +++++++++++++++++++++
.../row/BeamSqlFieldAccessExpressionTest.java | 91 ++++++++++++++++
10 files changed, 290 insertions(+), 45 deletions(-)
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/RowSqlType.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/RowSqlType.java
index 77eda6a..fc16c84 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/RowSqlType.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/RowSqlType.java
@@ -102,9 +102,18 @@ public class RowSqlType {
return withField(fieldName, SqlTypeCoders.TIMESTAMP);
}
+ /**
+ * Adds an ARRAY field with elements of {@code elementCoder}.
+ */
public Builder withArrayField(String fieldName, SqlTypeCoder elementCoder) {
return withField(fieldName, SqlTypeCoders.arrayOf(elementCoder));
+ }
+ /**
+ * Adds an ARRAY field with elements of {@code rowType}.
+ */
+ public Builder withArrayField(String fieldName, RowType rowType) {
+ return withField(fieldName, SqlTypeCoders.arrayOf(rowType));
}
public Builder withRowField(String fieldName, RowType rowType) {
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTypeCoder.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTypeCoder.java
index 5b6e104..c2a6a3a 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTypeCoder.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTypeCoder.java
@@ -65,10 +65,6 @@ public abstract class SqlTypeCoder extends CustomCoder<Object> {
return this.getClass().hashCode();
}
- public static boolean isArray(SqlTypeCoder sqlTypeCoder) {
- return sqlTypeCoder instanceof SqlArrayCoder;
- }
-
static class SqlTinyIntCoder extends SqlTypeCoder {
@Override
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTypeCoders.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTypeCoders.java
index 9eea2df..d9a132c 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTypeCoders.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTypeCoders.java
@@ -58,10 +58,6 @@ public class SqlTypeCoders {
public static final SqlTypeCoder DATE = new SqlDateCoder();
public static final SqlTypeCoder TIMESTAMP = new SqlTimestampCoder();
- public static SqlTypeCoder arrayOf(SqlTypeCoder elementCoder) {
- return SqlArrayCoder.of(elementCoder);
- }
-
public static final Set<SqlTypeCoder> NUMERIC_TYPES =
ImmutableSet.of(
SqlTypeCoders.TINYINT,
@@ -72,6 +68,18 @@ public class SqlTypeCoders {
SqlTypeCoders.DOUBLE,
SqlTypeCoders.DECIMAL);
+ public static SqlTypeCoder arrayOf(SqlTypeCoder elementCoder) {
+ return SqlArrayCoder.of(elementCoder);
+ }
+
+ public static SqlTypeCoder arrayOf(RowType rowType) {
+ return SqlArrayCoder.of(rowOf(rowType));
+ }
+
+ public static boolean isArray(SqlTypeCoder sqlTypeCoder) {
+ return sqlTypeCoder instanceof SqlArrayCoder;
+ }
+
public static boolean isRow(SqlTypeCoder sqlTypeCoder) {
return sqlTypeCoder instanceof SqlRowCoder;
}
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java
index e81b927..7bd04f2 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java
@@ -169,8 +169,7 @@ public class BeamSqlEnv implements Serializable {
@Override
public RelDataType getRowType(RelDataTypeFactory typeFactory) {
- return CalciteUtils.toCalciteRowType(this.beamRowType)
- .apply(BeamQueryPlanner.TYPE_FACTORY);
+ return CalciteUtils.toCalciteRowType(this.beamRowType, BeamQueryPlanner.TYPE_FACTORY);
}
@Override
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java
index 5a47aa4..3e7089a 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java
@@ -202,10 +202,14 @@ public class BeamSqlFnExecutor implements BeamSqlExpressionExecutor {
ret = new BeamSqlInputRefExpression(node.getType().getSqlTypeName(), node.getIndex());
} else if (rexNode instanceof RexFieldAccess) {
RexFieldAccess fieldAccessNode = (RexFieldAccess) rexNode;
- int rowFieldIndex = ((RexInputRef) fieldAccessNode.getReferenceExpr()).getIndex();
+ BeamSqlExpression referenceExpression = buildExpression(fieldAccessNode.getReferenceExpr());
int nestedFieldIndex = fieldAccessNode.getField().getIndex();
SqlTypeName nestedFieldType = fieldAccessNode.getField().getType().getSqlTypeName();
- ret = new BeamSqlFieldAccessExpression(rowFieldIndex, nestedFieldIndex, nestedFieldType);
+
+ ret = new BeamSqlFieldAccessExpression(
+ referenceExpression,
+ nestedFieldIndex,
+ nestedFieldType);
} else if (rexNode instanceof RexCall) {
RexCall node = (RexCall) rexNode;
String opName = node.op.getName();
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlPrimitive.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlPrimitive.java
index 544734a..2efbc7f 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlPrimitive.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlPrimitive.java
@@ -145,8 +145,11 @@ public class BeamSqlPrimitive<T> extends BeamSqlExpression {
return true;
case ARRAY:
return value instanceof List;
+ case ROW:
+ return value instanceof Row;
default:
- throw new UnsupportedOperationException(outputType.name());
+ throw new UnsupportedOperationException(
+ "Unsupported Beam SQL type in expression: " + outputType.name());
}
}
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/row/BeamSqlFieldAccessExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/row/BeamSqlFieldAccessExpression.java
index 478b4e3..5027086 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/row/BeamSqlFieldAccessExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/row/BeamSqlFieldAccessExpression.java
@@ -15,9 +15,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.row;
import java.util.Collections;
+import java.util.List;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -29,16 +31,16 @@ import org.apache.calcite.sql.type.SqlTypeName;
*/
public class BeamSqlFieldAccessExpression extends BeamSqlExpression {
- private int rowFieldIndex;
+ private BeamSqlExpression referenceExpression;
private int nestedFieldIndex;
public BeamSqlFieldAccessExpression(
- int rowFieldIndex,
+ BeamSqlExpression referenceExpression,
int nestedFieldIndex,
SqlTypeName nestedFieldType) {
super(Collections.emptyList(), nestedFieldType);
- this.rowFieldIndex = rowFieldIndex;
+ this.referenceExpression = referenceExpression;
this.nestedFieldIndex = nestedFieldIndex;
}
@@ -49,7 +51,22 @@ public class BeamSqlFieldAccessExpression extends BeamSqlExpression {
@Override
public BeamSqlPrimitive evaluate(Row inputRow, BoundedWindow window) {
- Row nestedRow = inputRow.getValue(rowFieldIndex);
- return BeamSqlPrimitive.of(outputType, nestedRow.getValue(nestedFieldIndex));
+ BeamSqlPrimitive targetObject = referenceExpression.evaluate(inputRow, window);
+ SqlTypeName targetFieldType = targetObject.getOutputType();
+
+ Object targetFieldValue;
+
+ if (SqlTypeName.ARRAY.equals(targetFieldType)) {
+ targetFieldValue = ((List) targetObject.getValue()).get(nestedFieldIndex);
+ } else if (SqlTypeName.ROW.equals(targetFieldType)) {
+ targetFieldValue = ((Row) targetObject.getValue()).getValue(nestedFieldIndex);
+ } else {
+ throw new IllegalArgumentException(
+ "Attempt to access field of unsupported type "
+ + targetFieldType.getClass().getSimpleName()
+ + ". Field access operator is only supported for arrays or rows");
+ }
+
+ return BeamSqlPrimitive.of(outputType, targetFieldValue);
}
}
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java
index eccbed8..d8e93f9 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.beam.sdk.extensions.sql.impl.utils;
import static org.apache.beam.sdk.values.RowType.toRowType;
@@ -30,7 +31,6 @@ import org.apache.beam.sdk.values.RowType;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rel.type.RelDataTypeField;
-import org.apache.calcite.rel.type.RelProtoDataType;
import org.apache.calcite.sql.type.SqlTypeName;
/**
@@ -68,7 +68,7 @@ public class CalciteUtils {
* for supported Beam SQL type coder, see {@link SqlTypeCoder}.
*/
public static SqlTypeName toCalciteType(SqlTypeCoder coder) {
- if (SqlTypeCoder.isArray(coder)) {
+ if (SqlTypeCoders.isArray(coder)) {
return SqlTypeName.ARRAY;
}
@@ -135,19 +135,16 @@ public class CalciteUtils {
/**
* Create an instance of {@code RelDataType} so it can be used to create a table.
*/
- public static RelProtoDataType toCalciteRowType(final RowType rowType) {
- return dataTypeFactory -> {
- RelDataTypeFactory.Builder builder = new RelDataTypeFactory.Builder(dataTypeFactory);
-
- IntStream
- .range(0, rowType.getFieldCount())
- .forEach(idx ->
- builder.add(
- rowType.getFieldName(idx),
- toRelDataType(dataTypeFactory, rowType, idx)));
-
- return builder.build();
- };
+ public static RelDataType toCalciteRowType(RowType rowType, RelDataTypeFactory dataTypeFactory) {
+ RelDataTypeFactory.Builder builder = new RelDataTypeFactory.Builder(dataTypeFactory);
+
+ IntStream
+ .range(0, rowType.getFieldCount())
+ .forEach(idx ->
+ builder.add(
+ rowType.getFieldName(idx),
+ toRelDataType(dataTypeFactory, rowType, idx)));
+ return builder.build();
}
private static RelDataType toRelDataType(
@@ -163,7 +160,7 @@ public class CalciteUtils {
}
if (SqlTypeName.ROW.equals(typeName)) {
- return createRowRelType(dataTypeFactory, (SqlRowCoder) fieldCoder);
+ return toCalciteRowType(((SqlRowCoder) fieldCoder).getRowType(), dataTypeFactory);
}
return dataTypeFactory.createSqlType(typeName);
@@ -172,18 +169,18 @@ public class CalciteUtils {
private static RelDataType createArrayRelType(
RelDataTypeFactory dataTypeFactory,
SqlArrayCoder arrayFieldCoder) {
- SqlTypeName elementType = toCalciteType(arrayFieldCoder.getElementCoder());
- return
- dataTypeFactory
- .createArrayType(
- dataTypeFactory.createSqlType(elementType), UNLIMITED_ARRAY_SIZE);
- }
- private static RelDataType createRowRelType(
- RelDataTypeFactory dataTypeFactory,
- SqlRowCoder rowFieldCoder) {
+ SqlTypeName elementTypeName = toCalciteType(arrayFieldCoder.getElementCoder());
+
+ RelDataType elementType;
+
+ if (SqlTypeName.ROW.equals(elementTypeName)) {
+ RowType rowType = ((SqlRowCoder) arrayFieldCoder.getElementCoder()).getRowType();
+ elementType = toCalciteRowType(rowType, dataTypeFactory);
+ } else {
+ elementType = dataTypeFactory.createSqlType(elementTypeName);
+ }
- RelProtoDataType relProtoDataType = toCalciteRowType(rowFieldCoder.getRowType());
- return relProtoDataType.apply(dataTypeFactory);
+ return dataTypeFactory.createArrayType(elementType, UNLIMITED_ARRAY_SIZE);
}
}
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlArrayTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlArrayTest.java
index 295ec5f..8553c7c 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlArrayTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlArrayTest.java
@@ -206,6 +206,127 @@ public class BeamSqlArrayTest {
pipeline.run();
}
+ @Test
+ public void testSelectRowsFromArrayOfRows() {
+ RowType elementRowType =
+ RowSqlType
+ .builder()
+ .withVarcharField("f_rowString")
+ .withIntegerField("f_rowInt")
+ .build();
+
+ RowType resultRowType =
+ RowSqlType
+ .builder()
+ .withArrayField("f_resultArray", elementRowType)
+ .build();
+
+ RowType inputType =
+ RowSqlType
+ .builder()
+ .withIntegerField("f_int")
+ .withArrayField("f_arrayOfRows", elementRowType)
+ .build();
+
+ PCollection<Row> input =
+ PBegin.in(pipeline)
+ .apply(
+ Create.of(
+ Row.withRowType(inputType)
+ .addValues(
+ 1,
+ Arrays.asList(
+ Row.withRowType(elementRowType).addValues("AA", 11).build(),
+ Row.withRowType(elementRowType).addValues("BB", 22).build()))
+ .build(),
+ Row.withRowType(inputType)
+ .addValues(
+ 2,
+ Arrays.asList(
+ Row.withRowType(elementRowType).addValues("CC", 33).build(),
+ Row.withRowType(elementRowType).addValues("DD", 44).build()))
+ .build())
+ .withCoder(inputType.getRowCoder()));
+
+ PCollection<Row> result =
+ input
+ .apply(
+ BeamSql.query(
+ "SELECT f_arrayOfRows FROM PCOLLECTION"))
+ .setCoder(resultRowType.getRowCoder());
+
+ PAssert.that(result)
+ .containsInAnyOrder(
+ Row.withRowType(resultRowType)
+ .addArray(
+ Arrays.asList(
+ Row.withRowType(elementRowType).addValues("AA", 11).build(),
+ Row.withRowType(elementRowType).addValues("BB", 22).build()))
+ .build(),
+ Row.withRowType(resultRowType)
+ .addArray(
+ Arrays.asList(
+ Row.withRowType(elementRowType).addValues("CC", 33).build(),
+ Row.withRowType(elementRowType).addValues("DD", 44).build()))
+ .build()
+ );
+
+ pipeline.run();
+ }
+
+ @Test
+ public void testSelectSingleRowFromArrayOfRows() {
+ RowType elementRowType =
+ RowSqlType
+ .builder()
+ .withVarcharField("f_rowString")
+ .withIntegerField("f_rowInt")
+ .build();
+
+ RowType resultRowType = elementRowType;
+
+ RowType inputType =
+ RowSqlType
+ .builder()
+ .withIntegerField("f_int")
+ .withArrayField("f_arrayOfRows", elementRowType)
+ .build();
+
+ PCollection<Row> input =
+ PBegin.in(pipeline)
+ .apply(
+ Create.of(
+ Row.withRowType(inputType)
+ .addValues(
+ 1,
+ Arrays.asList(
+ Row.withRowType(elementRowType).addValues("AA", 11).build(),
+ Row.withRowType(elementRowType).addValues("BB", 22).build()))
+ .build(),
+ Row.withRowType(inputType)
+ .addValues(
+ 2,
+ Arrays.asList(
+ Row.withRowType(elementRowType).addValues("CC", 33).build(),
+ Row.withRowType(elementRowType).addValues("DD", 44).build()))
+ .build())
+ .withCoder(inputType.getRowCoder()));
+
+ PCollection<Row> result =
+ input
+ .apply(
+ BeamSql.query(
+ "SELECT f_arrayOfRows[1] FROM PCOLLECTION"))
+ .setCoder(resultRowType.getRowCoder());
+
+ PAssert.that(result)
+ .containsInAnyOrder(
+ Row.withRowType(elementRowType).addValues("BB", 22).build(),
+ Row.withRowType(elementRowType).addValues("DD", 44).build());
+
+ pipeline.run();
+ }
+
private PCollection<Row> pCollectionOf2Elements() {
return
PBegin
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/row/BeamSqlFieldAccessExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/row/BeamSqlFieldAccessExpressionTest.java
new file mode 100644
index 0000000..ce971c3
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/row/BeamSqlFieldAccessExpressionTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.row;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Arrays;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.RowSqlType;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.RowType;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+/**
+ * Unit tests for {@link BeamSqlFieldAccessExpression}.
+ */
+public class BeamSqlFieldAccessExpressionTest {
+
+ private static final Row NULL_ROW = null;
+ private static final BoundedWindow NULL_WINDOW = null;
+
+ @Rule public ExpectedException thrown = ExpectedException.none();
+
+ @Test
+ public void testAccessesFieldOfArray() {
+ BeamSqlPrimitive<List<String>> targetArray =
+ BeamSqlPrimitive.of(
+ SqlTypeName.ARRAY,
+ Arrays.asList("aaa", "bbb", "ccc"));
+
+ BeamSqlFieldAccessExpression arrayExpression =
+ new BeamSqlFieldAccessExpression(targetArray, 1, SqlTypeName.VARCHAR);
+
+ assertEquals("bbb", arrayExpression.evaluate(NULL_ROW, NULL_WINDOW).getValue());
+ }
+
+ @Test
+ public void testAccessesFieldOfRow() {
+ RowType rowType =
+ RowSqlType
+ .builder()
+ .withVarcharField("f_string1")
+ .withVarcharField("f_string2")
+ .withVarcharField("f_string3")
+ .build();
+
+ BeamSqlPrimitive<Row> targetRow =
+ BeamSqlPrimitive.of(
+ SqlTypeName.ROW,
+ Row
+ .withRowType(rowType)
+ .addValues("aa", "bb", "cc")
+ .build());
+
+ BeamSqlFieldAccessExpression arrayExpression =
+ new BeamSqlFieldAccessExpression(targetRow, 1, SqlTypeName.VARCHAR);
+
+ assertEquals("bb", arrayExpression.evaluate(NULL_ROW, NULL_WINDOW).getValue());
+ }
+
+ @Test
+ public void testThrowsForUnsupportedType() {
+ BeamSqlPrimitive<Integer> targetRow = BeamSqlPrimitive.of(SqlTypeName.INTEGER, 5);
+
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage("unsupported type");
+
+ new BeamSqlFieldAccessExpression(targetRow, 1, SqlTypeName.VARCHAR)
+ .evaluate(NULL_ROW, NULL_WINDOW).getValue();
+ }
+}
--
To stop receiving notification emails like this one, please contact
mingmxu@apache.org.