You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by mi...@apache.org on 2018/03/15 04:20:10 UTC

[beam] branch master updated: [SQL] Add support for arrays of rows (#4857)

This is an automated email from the ASF dual-hosted git repository.

mingmxu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 7d96ee6  [SQL] Add support for arrays of rows (#4857)
7d96ee6 is described below

commit 7d96ee6250ea93b4822c32ace84f2f5ea21e2da9
Author: Anton Kedin <33...@users.noreply.github.com>
AuthorDate: Wed Mar 14 21:20:02 2018 -0700

    [SQL] Add support for arrays of rows (#4857)
    
    Support array fields containing rows
---
 .../apache/beam/sdk/extensions/sql/RowSqlType.java |   9 ++
 .../beam/sdk/extensions/sql/SqlTypeCoder.java      |   4 -
 .../beam/sdk/extensions/sql/SqlTypeCoders.java     |  16 ++-
 .../beam/sdk/extensions/sql/impl/BeamSqlEnv.java   |   3 +-
 .../sql/impl/interpreter/BeamSqlFnExecutor.java    |   8 +-
 .../interpreter/operator/BeamSqlPrimitive.java     |   5 +-
 .../operator/row/BeamSqlFieldAccessExpression.java |  27 ++++-
 .../extensions/sql/impl/utils/CalciteUtils.java    |  51 ++++-----
 .../beam/sdk/extensions/sql/BeamSqlArrayTest.java  | 121 +++++++++++++++++++++
 .../row/BeamSqlFieldAccessExpressionTest.java      |  91 ++++++++++++++++
 10 files changed, 290 insertions(+), 45 deletions(-)

diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/RowSqlType.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/RowSqlType.java
index 77eda6a..fc16c84 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/RowSqlType.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/RowSqlType.java
@@ -102,9 +102,18 @@ public class RowSqlType {
       return withField(fieldName, SqlTypeCoders.TIMESTAMP);
     }
 
+    /**
+     * Adds an ARRAY field with elements of {@code elementCoder}.
+     */
     public Builder withArrayField(String fieldName, SqlTypeCoder elementCoder) {
       return withField(fieldName, SqlTypeCoders.arrayOf(elementCoder));
+    }
 
+    /**
+     * Adds an ARRAY field with elements of {@code rowType}.
+     */
+    public Builder withArrayField(String fieldName, RowType rowType) {
+      return withField(fieldName, SqlTypeCoders.arrayOf(rowType));
     }
 
     public Builder withRowField(String fieldName, RowType rowType) {
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTypeCoder.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTypeCoder.java
index 5b6e104..c2a6a3a 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTypeCoder.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTypeCoder.java
@@ -65,10 +65,6 @@ public abstract class SqlTypeCoder extends CustomCoder<Object> {
     return this.getClass().hashCode();
   }
 
-  public static boolean isArray(SqlTypeCoder sqlTypeCoder) {
-    return sqlTypeCoder instanceof SqlArrayCoder;
-  }
-
   static class SqlTinyIntCoder extends SqlTypeCoder {
 
     @Override
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTypeCoders.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTypeCoders.java
index 9eea2df..d9a132c 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTypeCoders.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTypeCoders.java
@@ -58,10 +58,6 @@ public class SqlTypeCoders {
   public static final SqlTypeCoder DATE = new SqlDateCoder();
   public static final SqlTypeCoder TIMESTAMP = new SqlTimestampCoder();
 
-  public static SqlTypeCoder arrayOf(SqlTypeCoder elementCoder) {
-    return SqlArrayCoder.of(elementCoder);
-  }
-
   public static final Set<SqlTypeCoder> NUMERIC_TYPES =
       ImmutableSet.of(
           SqlTypeCoders.TINYINT,
@@ -72,6 +68,18 @@ public class SqlTypeCoders {
           SqlTypeCoders.DOUBLE,
           SqlTypeCoders.DECIMAL);
 
+  public static SqlTypeCoder arrayOf(SqlTypeCoder elementCoder) {
+    return SqlArrayCoder.of(elementCoder);
+  }
+
+  public static SqlTypeCoder arrayOf(RowType rowType) {
+    return SqlArrayCoder.of(rowOf(rowType));
+  }
+
+  public static boolean isArray(SqlTypeCoder sqlTypeCoder) {
+    return sqlTypeCoder instanceof SqlArrayCoder;
+  }
+
   public static boolean isRow(SqlTypeCoder sqlTypeCoder) {
     return sqlTypeCoder instanceof SqlRowCoder;
   }
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java
index e81b927..7bd04f2 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java
@@ -169,8 +169,7 @@ public class BeamSqlEnv implements Serializable {
 
     @Override
     public RelDataType getRowType(RelDataTypeFactory typeFactory) {
-      return CalciteUtils.toCalciteRowType(this.beamRowType)
-          .apply(BeamQueryPlanner.TYPE_FACTORY);
+      return CalciteUtils.toCalciteRowType(this.beamRowType, BeamQueryPlanner.TYPE_FACTORY);
     }
 
     @Override
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java
index 5a47aa4..3e7089a 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java
@@ -202,10 +202,14 @@ public class BeamSqlFnExecutor implements BeamSqlExpressionExecutor {
       ret = new BeamSqlInputRefExpression(node.getType().getSqlTypeName(), node.getIndex());
     } else if (rexNode instanceof RexFieldAccess) {
       RexFieldAccess fieldAccessNode = (RexFieldAccess) rexNode;
-      int rowFieldIndex = ((RexInputRef) fieldAccessNode.getReferenceExpr()).getIndex();
+      BeamSqlExpression referenceExpression = buildExpression(fieldAccessNode.getReferenceExpr());
       int nestedFieldIndex = fieldAccessNode.getField().getIndex();
       SqlTypeName nestedFieldType = fieldAccessNode.getField().getType().getSqlTypeName();
-      ret = new BeamSqlFieldAccessExpression(rowFieldIndex, nestedFieldIndex, nestedFieldType);
+
+      ret = new BeamSqlFieldAccessExpression(
+          referenceExpression,
+          nestedFieldIndex,
+          nestedFieldType);
     } else if (rexNode instanceof RexCall) {
       RexCall node = (RexCall) rexNode;
       String opName = node.op.getName();
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlPrimitive.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlPrimitive.java
index 544734a..2efbc7f 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlPrimitive.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlPrimitive.java
@@ -145,8 +145,11 @@ public class BeamSqlPrimitive<T> extends BeamSqlExpression {
       return true;
     case ARRAY:
       return value instanceof List;
+    case ROW:
+      return value instanceof Row;
     default:
-      throw new UnsupportedOperationException(outputType.name());
+      throw new UnsupportedOperationException(
+          "Unsupported Beam SQL type in expression: " + outputType.name());
     }
   }
 
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/row/BeamSqlFieldAccessExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/row/BeamSqlFieldAccessExpression.java
index 478b4e3..5027086 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/row/BeamSqlFieldAccessExpression.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/row/BeamSqlFieldAccessExpression.java
@@ -15,9 +15,11 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.row;
 
 import java.util.Collections;
+import java.util.List;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -29,16 +31,16 @@ import org.apache.calcite.sql.type.SqlTypeName;
  */
 public class BeamSqlFieldAccessExpression extends BeamSqlExpression {
 
-  private int rowFieldIndex;
+  private BeamSqlExpression referenceExpression;
   private int nestedFieldIndex;
 
   public BeamSqlFieldAccessExpression(
-      int rowFieldIndex,
+      BeamSqlExpression referenceExpression,
       int nestedFieldIndex,
       SqlTypeName nestedFieldType) {
 
     super(Collections.emptyList(), nestedFieldType);
-    this.rowFieldIndex = rowFieldIndex;
+    this.referenceExpression = referenceExpression;
     this.nestedFieldIndex = nestedFieldIndex;
   }
 
@@ -49,7 +51,22 @@ public class BeamSqlFieldAccessExpression extends BeamSqlExpression {
 
   @Override
   public BeamSqlPrimitive evaluate(Row inputRow, BoundedWindow window) {
-    Row nestedRow = inputRow.getValue(rowFieldIndex);
-    return BeamSqlPrimitive.of(outputType, nestedRow.getValue(nestedFieldIndex));
+    BeamSqlPrimitive targetObject = referenceExpression.evaluate(inputRow, window);
+    SqlTypeName targetFieldType = targetObject.getOutputType();
+
+    Object targetFieldValue;
+
+    if (SqlTypeName.ARRAY.equals(targetFieldType)) {
+      targetFieldValue = ((List) targetObject.getValue()).get(nestedFieldIndex);
+    } else if (SqlTypeName.ROW.equals(targetFieldType)) {
+      targetFieldValue = ((Row) targetObject.getValue()).getValue(nestedFieldIndex);
+    } else {
+      throw new IllegalArgumentException(
+          "Attempt to access field of unsupported type "
+          + targetFieldType.getClass().getSimpleName()
+          + ". Field access operator is only supported for arrays or rows");
+    }
+
+    return BeamSqlPrimitive.of(outputType, targetFieldValue);
   }
 }
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java
index eccbed8..d8e93f9 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.beam.sdk.extensions.sql.impl.utils;
 
 import static org.apache.beam.sdk.values.RowType.toRowType;
@@ -30,7 +31,6 @@ import org.apache.beam.sdk.values.RowType;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeFactory;
 import org.apache.calcite.rel.type.RelDataTypeField;
-import org.apache.calcite.rel.type.RelProtoDataType;
 import org.apache.calcite.sql.type.SqlTypeName;
 
 /**
@@ -68,7 +68,7 @@ public class CalciteUtils {
    * for supported Beam SQL type coder, see {@link SqlTypeCoder}.
    */
   public static SqlTypeName toCalciteType(SqlTypeCoder coder) {
-    if (SqlTypeCoder.isArray(coder)) {
+    if (SqlTypeCoders.isArray(coder)) {
         return SqlTypeName.ARRAY;
     }
 
@@ -135,19 +135,16 @@ public class CalciteUtils {
   /**
    * Create an instance of {@code RelDataType} so it can be used to create a table.
    */
-  public static RelProtoDataType toCalciteRowType(final RowType rowType) {
-    return dataTypeFactory -> {
-      RelDataTypeFactory.Builder builder = new RelDataTypeFactory.Builder(dataTypeFactory);
-
-      IntStream
-          .range(0, rowType.getFieldCount())
-          .forEach(idx ->
-                       builder.add(
-                           rowType.getFieldName(idx),
-                           toRelDataType(dataTypeFactory, rowType, idx)));
-
-      return builder.build();
-    };
+  public static RelDataType toCalciteRowType(RowType rowType, RelDataTypeFactory dataTypeFactory) {
+    RelDataTypeFactory.Builder builder = new RelDataTypeFactory.Builder(dataTypeFactory);
+
+    IntStream
+        .range(0, rowType.getFieldCount())
+        .forEach(idx ->
+                     builder.add(
+                         rowType.getFieldName(idx),
+                         toRelDataType(dataTypeFactory, rowType, idx)));
+    return builder.build();
   }
 
   private static RelDataType toRelDataType(
@@ -163,7 +160,7 @@ public class CalciteUtils {
     }
 
     if (SqlTypeName.ROW.equals(typeName)) {
-      return createRowRelType(dataTypeFactory, (SqlRowCoder) fieldCoder);
+      return toCalciteRowType(((SqlRowCoder) fieldCoder).getRowType(), dataTypeFactory);
     }
 
     return dataTypeFactory.createSqlType(typeName);
@@ -172,18 +169,18 @@ public class CalciteUtils {
   private static RelDataType createArrayRelType(
       RelDataTypeFactory dataTypeFactory,
       SqlArrayCoder arrayFieldCoder) {
-    SqlTypeName elementType = toCalciteType(arrayFieldCoder.getElementCoder());
-    return
-        dataTypeFactory
-            .createArrayType(
-                dataTypeFactory.createSqlType(elementType), UNLIMITED_ARRAY_SIZE);
-  }
 
-  private static RelDataType createRowRelType(
-      RelDataTypeFactory dataTypeFactory,
-      SqlRowCoder rowFieldCoder) {
+    SqlTypeName elementTypeName = toCalciteType(arrayFieldCoder.getElementCoder());
+
+    RelDataType elementType;
+
+    if (SqlTypeName.ROW.equals(elementTypeName)) {
+      RowType rowType = ((SqlRowCoder) arrayFieldCoder.getElementCoder()).getRowType();
+      elementType = toCalciteRowType(rowType, dataTypeFactory);
+    } else {
+      elementType = dataTypeFactory.createSqlType(elementTypeName);
+    }
 
-    RelProtoDataType relProtoDataType = toCalciteRowType(rowFieldCoder.getRowType());
-    return relProtoDataType.apply(dataTypeFactory);
+    return dataTypeFactory.createArrayType(elementType, UNLIMITED_ARRAY_SIZE);
   }
 }
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlArrayTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlArrayTest.java
index 295ec5f..8553c7c 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlArrayTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlArrayTest.java
@@ -206,6 +206,127 @@ public class BeamSqlArrayTest {
     pipeline.run();
   }
 
+  @Test
+  public void testSelectRowsFromArrayOfRows() {
+    RowType elementRowType =
+        RowSqlType
+            .builder()
+            .withVarcharField("f_rowString")
+            .withIntegerField("f_rowInt")
+            .build();
+
+    RowType resultRowType =
+        RowSqlType
+            .builder()
+            .withArrayField("f_resultArray", elementRowType)
+            .build();
+
+    RowType inputType =
+        RowSqlType
+            .builder()
+            .withIntegerField("f_int")
+            .withArrayField("f_arrayOfRows", elementRowType)
+            .build();
+
+    PCollection<Row> input =
+        PBegin.in(pipeline)
+              .apply(
+                  Create.of(
+                      Row.withRowType(inputType)
+                         .addValues(
+                             1,
+                             Arrays.asList(
+                                 Row.withRowType(elementRowType).addValues("AA", 11).build(),
+                                 Row.withRowType(elementRowType).addValues("BB", 22).build()))
+                         .build(),
+                      Row.withRowType(inputType)
+                         .addValues(
+                             2,
+                             Arrays.asList(
+                                 Row.withRowType(elementRowType).addValues("CC", 33).build(),
+                                 Row.withRowType(elementRowType).addValues("DD", 44).build()))
+                         .build())
+                        .withCoder(inputType.getRowCoder()));
+
+    PCollection<Row> result =
+        input
+            .apply(
+                BeamSql.query(
+                    "SELECT f_arrayOfRows FROM PCOLLECTION"))
+            .setCoder(resultRowType.getRowCoder());
+
+    PAssert.that(result)
+           .containsInAnyOrder(
+               Row.withRowType(resultRowType)
+                  .addArray(
+                      Arrays.asList(
+                          Row.withRowType(elementRowType).addValues("AA", 11).build(),
+                          Row.withRowType(elementRowType).addValues("BB", 22).build()))
+                  .build(),
+               Row.withRowType(resultRowType)
+                  .addArray(
+                      Arrays.asList(
+                          Row.withRowType(elementRowType).addValues("CC", 33).build(),
+                          Row.withRowType(elementRowType).addValues("DD", 44).build()))
+                  .build()
+           );
+
+    pipeline.run();
+  }
+
+  @Test
+  public void testSelectSingleRowFromArrayOfRows() {
+    RowType elementRowType =
+        RowSqlType
+            .builder()
+            .withVarcharField("f_rowString")
+            .withIntegerField("f_rowInt")
+            .build();
+
+    RowType resultRowType = elementRowType;
+
+    RowType inputType =
+        RowSqlType
+            .builder()
+            .withIntegerField("f_int")
+            .withArrayField("f_arrayOfRows", elementRowType)
+            .build();
+
+    PCollection<Row> input =
+        PBegin.in(pipeline)
+              .apply(
+                  Create.of(
+                      Row.withRowType(inputType)
+                         .addValues(
+                             1,
+                             Arrays.asList(
+                                 Row.withRowType(elementRowType).addValues("AA", 11).build(),
+                                 Row.withRowType(elementRowType).addValues("BB", 22).build()))
+                         .build(),
+                      Row.withRowType(inputType)
+                         .addValues(
+                             2,
+                             Arrays.asList(
+                                 Row.withRowType(elementRowType).addValues("CC", 33).build(),
+                                 Row.withRowType(elementRowType).addValues("DD", 44).build()))
+                         .build())
+                        .withCoder(inputType.getRowCoder()));
+
+    PCollection<Row> result =
+        input
+            .apply(
+                BeamSql.query(
+                    "SELECT f_arrayOfRows[1] FROM PCOLLECTION"))
+            .setCoder(resultRowType.getRowCoder());
+
+    PAssert.that(result)
+           .containsInAnyOrder(
+               Row.withRowType(elementRowType).addValues("BB", 22).build(),
+               Row.withRowType(elementRowType).addValues("DD", 44).build());
+
+    pipeline.run();
+  }
+
   private PCollection<Row> pCollectionOf2Elements() {
     return
         PBegin
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/row/BeamSqlFieldAccessExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/row/BeamSqlFieldAccessExpressionTest.java
new file mode 100644
index 0000000..ce971c3
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/row/BeamSqlFieldAccessExpressionTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.row;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Arrays;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.RowSqlType;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.RowType;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+/**
+ * Unit tests for {@link BeamSqlFieldAccessExpression}.
+ */
+public class BeamSqlFieldAccessExpressionTest {
+
+  private static final Row NULL_ROW = null;
+  private static final BoundedWindow NULL_WINDOW = null;
+
+  @Rule public ExpectedException thrown = ExpectedException.none();
+
+  @Test
+  public void testAccessesFieldOfArray() {
+    BeamSqlPrimitive<List<String>> targetArray =
+        BeamSqlPrimitive.of(
+            SqlTypeName.ARRAY,
+            Arrays.asList("aaa", "bbb", "ccc"));
+
+    BeamSqlFieldAccessExpression arrayExpression =
+        new BeamSqlFieldAccessExpression(targetArray, 1, SqlTypeName.VARCHAR);
+
+    assertEquals("bbb", arrayExpression.evaluate(NULL_ROW, NULL_WINDOW).getValue());
+  }
+
+  @Test
+  public void testAccessesFieldOfRow() {
+    RowType rowType =
+        RowSqlType
+            .builder()
+            .withVarcharField("f_string1")
+            .withVarcharField("f_string2")
+            .withVarcharField("f_string3")
+            .build();
+
+    BeamSqlPrimitive<Row> targetRow =
+        BeamSqlPrimitive.of(
+            SqlTypeName.ROW,
+            Row
+                .withRowType(rowType)
+                .addValues("aa", "bb", "cc")
+                .build());
+
+    BeamSqlFieldAccessExpression arrayExpression =
+        new BeamSqlFieldAccessExpression(targetRow, 1, SqlTypeName.VARCHAR);
+
+    assertEquals("bb", arrayExpression.evaluate(NULL_ROW, NULL_WINDOW).getValue());
+  }
+
+  @Test
+  public void testThrowsForUnsupportedType() {
+    BeamSqlPrimitive<Integer> targetRow = BeamSqlPrimitive.of(SqlTypeName.INTEGER, 5);
+
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("unsupported type");
+
+    new BeamSqlFieldAccessExpression(targetRow, 1, SqlTypeName.VARCHAR)
+        .evaluate(NULL_ROW, NULL_WINDOW).getValue();
+  }
+}

-- 
To stop receiving notification emails like this one, please contact
mingmxu@apache.org.