You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by xu...@apache.org on 2017/11/29 04:23:36 UTC

[beam] 02/02: [BEAM-3238][SQL] Add BeamRecordSqlTypeBuilder

This is an automated email from the ASF dual-hosted git repository.

xumingming pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 2eb7de0fe6e96da9805fc827294da1e1329ff716
Author: Anton Kedin <ke...@google.com>
AuthorDate: Wed Nov 22 19:17:21 2017 -0800

    [BEAM-3238][SQL] Add BeamRecordSqlTypeBuilder
    
    To improve readability of creating BeamRecordSqlTypes.
---
 .../beam/sdk/extensions/sql/BeamRecordSqlType.java |  85 ++++++++++++++-
 .../sdk/extensions/sql/BeamRecordSqlTypeTest.java  | 115 +++++++++++++++++++++
 ...BeamSqlBuiltinFunctionsIntegrationTestBase.java |  56 +++++-----
 3 files changed, 229 insertions(+), 27 deletions(-)

diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamRecordSqlType.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamRecordSqlType.java
index 1784ec1..a6b23b6 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamRecordSqlType.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamRecordSqlType.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.sdk.extensions.sql;
 
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import java.math.BigDecimal;
 import java.sql.Types;
@@ -108,8 +109,8 @@ public class BeamRecordSqlType extends BeamRecordType {
       Integer fieldType = fieldTypes.get(idx);
 
       if (!CODERS.containsKey(fieldType)) {
-          throw new UnsupportedOperationException(
-              "Data type: " + fieldType + " not supported yet!");
+        throw new UnsupportedOperationException(
+            "Data type: " + fieldType + " not supported yet!");
       }
 
       fieldCoders.add(CODERS.get(fieldType));
@@ -166,4 +167,84 @@ public class BeamRecordSqlType extends BeamRecordType {
     return "BeamRecordSqlType [fieldNames=" + getFieldNames()
         + ", fieldTypes=" + fieldTypes + "]";
   }
+
+  public static Builder builder() {
+    return new Builder();
+  }
+
+  /**
+   * Builder class to construct {@link BeamRecordSqlType}.
+   */
+  public static class Builder {
+
+    private ImmutableList.Builder<String> fieldNames;
+    private ImmutableList.Builder<Integer> fieldTypes;
+
+    public Builder withField(String fieldName, Integer fieldType) {
+      fieldNames.add(fieldName);
+      fieldTypes.add(fieldType);
+      return this;
+    }
+
+    public Builder withTinyIntField(String fieldName) {
+      return withField(fieldName, Types.TINYINT);
+    }
+
+    public Builder withSmallIntField(String fieldName) {
+      return withField(fieldName, Types.SMALLINT);
+    }
+
+    public Builder withIntegerField(String fieldName) {
+      return withField(fieldName, Types.INTEGER);
+    }
+
+    public Builder withBigIntField(String fieldName) {
+      return withField(fieldName, Types.BIGINT);
+    }
+
+    public Builder withFloatField(String fieldName) {
+      return withField(fieldName, Types.FLOAT);
+    }
+
+    public Builder withDoubleField(String fieldName) {
+      return withField(fieldName, Types.DOUBLE);
+    }
+
+    public Builder withDecimalField(String fieldName) {
+      return withField(fieldName, Types.DECIMAL);
+    }
+
+    public Builder withBooleanField(String fieldName) {
+      return withField(fieldName, Types.BOOLEAN);
+    }
+
+    public Builder withCharField(String fieldName) {
+      return withField(fieldName, Types.CHAR);
+    }
+
+    public Builder withVarcharField(String fieldName) {
+      return withField(fieldName, Types.VARCHAR);
+    }
+
+    public Builder withTimeField(String fieldName) {
+      return withField(fieldName, Types.TIME);
+    }
+
+    public Builder withDateField(String fieldName) {
+      return withField(fieldName, Types.DATE);
+    }
+
+    public Builder withTimestampField(String fieldName) {
+      return withField(fieldName, Types.TIMESTAMP);
+    }
+
+    private Builder() {
+      this.fieldNames = ImmutableList.builder();
+      this.fieldTypes = ImmutableList.builder();
+    }
+
+    public BeamRecordSqlType build() {
+      return create(fieldNames.build(), fieldTypes.build());
+    }
+  }
 }
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamRecordSqlTypeTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamRecordSqlTypeTest.java
new file mode 100644
index 0000000..78ff221
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamRecordSqlTypeTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql;
+
+import static org.junit.Assert.assertEquals;
+
+import com.google.common.collect.ImmutableList;
+import java.sql.Types;
+import java.util.List;
+import org.junit.Test;
+
+/**
+ * Unit tests for {@link BeamRecordSqlType}.
+ */
+public class BeamRecordSqlTypeTest {
+
+  private static final List<Integer> TYPES = ImmutableList.of(
+      Types.TINYINT,
+      Types.SMALLINT,
+      Types.INTEGER,
+      Types.BIGINT,
+      Types.FLOAT,
+      Types.DOUBLE,
+      Types.DECIMAL,
+      Types.BOOLEAN,
+      Types.CHAR,
+      Types.VARCHAR,
+      Types.TIME,
+      Types.DATE,
+      Types.TIMESTAMP);
+
+  private static final List<String> NAMES = ImmutableList.of(
+      "TINYINT_FIELD",
+      "SMALLINT_FIELD",
+      "INTEGER_FIELD",
+      "BIGINT_FIELD",
+      "FLOAT_FIELD",
+      "DOUBLE_FIELD",
+      "DECIMAL_FIELD",
+      "BOOLEAN_FIELD",
+      "CHAR_FIELD",
+      "VARCHAR_FIELD",
+      "TIME_FIELD",
+      "DATE_FIELD",
+      "TIMESTAMP_FIELD");
+
+  private static final List<String> MORE_NAMES = ImmutableList.of(
+      "ANOTHER_TINYINT_FIELD",
+      "ANOTHER_SMALLINT_FIELD",
+      "ANOTHER_INTEGER_FIELD",
+      "ANOTHER_BIGINT_FIELD",
+      "ANOTHER_FLOAT_FIELD",
+      "ANOTHER_DOUBLE_FIELD",
+      "ANOTHER_DECIMAL_FIELD",
+      "ANOTHER_BOOLEAN_FIELD",
+      "ANOTHER_CHAR_FIELD",
+      "ANOTHER_VARCHAR_FIELD",
+      "ANOTHER_TIME_FIELD",
+      "ANOTHER_DATE_FIELD",
+      "ANOTHER_TIMESTAMP_FIELD");
+
+  @Test
+  public void testBuildsWithCorrectFields() throws Exception {
+    BeamRecordSqlType.Builder recordTypeBuilder = BeamRecordSqlType.builder();
+
+    for (int i = 0; i < TYPES.size(); i++) {
+      recordTypeBuilder.withField(NAMES.get(i), TYPES.get(i));
+    }
+
+    recordTypeBuilder.withTinyIntField(MORE_NAMES.get(0));
+    recordTypeBuilder.withSmallIntField(MORE_NAMES.get(1));
+    recordTypeBuilder.withIntegerField(MORE_NAMES.get(2));
+    recordTypeBuilder.withBigIntField(MORE_NAMES.get(3));
+    recordTypeBuilder.withFloatField(MORE_NAMES.get(4));
+    recordTypeBuilder.withDoubleField(MORE_NAMES.get(5));
+    recordTypeBuilder.withDecimalField(MORE_NAMES.get(6));
+    recordTypeBuilder.withBooleanField(MORE_NAMES.get(7));
+    recordTypeBuilder.withCharField(MORE_NAMES.get(8));
+    recordTypeBuilder.withVarcharField(MORE_NAMES.get(9));
+    recordTypeBuilder.withTimeField(MORE_NAMES.get(10));
+    recordTypeBuilder.withDateField(MORE_NAMES.get(11));
+    recordTypeBuilder.withTimestampField(MORE_NAMES.get(12));
+
+    BeamRecordSqlType recordSqlType = recordTypeBuilder.build();
+
+    List<String> expectedNames = ImmutableList.<String>builder()
+        .addAll(NAMES)
+        .addAll(MORE_NAMES)
+        .build();
+
+    List<Integer> expectedTypes = ImmutableList.<Integer>builder()
+        .addAll(TYPES)
+        .addAll(TYPES)
+        .build();
+
+    assertEquals(expectedNames, recordSqlType.getFieldNames());
+    assertEquals(expectedTypes, recordSqlType.getFieldTypes());
+  }
+}
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java
index 3395269..5997540 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java
@@ -19,13 +19,12 @@
 package org.apache.beam.sdk.extensions.sql.integrationtest;
 
 import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableMap;
 import java.math.BigDecimal;
 import java.sql.Types;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Date;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.TimeZone;
@@ -44,35 +43,42 @@ import org.junit.Rule;
  * Base class for all built-in functions integration tests.
  */
 public class BeamSqlBuiltinFunctionsIntegrationTestBase {
-  private static final Map<Class, Integer> JAVA_CLASS_TO_SQL_TYPE = new HashMap<>();
-  static {
-    JAVA_CLASS_TO_SQL_TYPE.put(Byte.class, Types.TINYINT);
-    JAVA_CLASS_TO_SQL_TYPE.put(Short.class, Types.SMALLINT);
-    JAVA_CLASS_TO_SQL_TYPE.put(Integer.class, Types.INTEGER);
-    JAVA_CLASS_TO_SQL_TYPE.put(Long.class, Types.BIGINT);
-    JAVA_CLASS_TO_SQL_TYPE.put(Float.class, Types.FLOAT);
-    JAVA_CLASS_TO_SQL_TYPE.put(Double.class, Types.DOUBLE);
-    JAVA_CLASS_TO_SQL_TYPE.put(BigDecimal.class, Types.DECIMAL);
-    JAVA_CLASS_TO_SQL_TYPE.put(String.class, Types.VARCHAR);
-    JAVA_CLASS_TO_SQL_TYPE.put(Date.class, Types.DATE);
-    JAVA_CLASS_TO_SQL_TYPE.put(Boolean.class, Types.BOOLEAN);
-  }
+  private static final Map<Class, Integer> JAVA_CLASS_TO_SQL_TYPE = ImmutableMap
+      .<Class, Integer> builder()
+      .put(Byte.class, Types.TINYINT)
+      .put(Short.class, Types.SMALLINT)
+      .put(Integer.class, Types.INTEGER)
+      .put(Long.class, Types.BIGINT)
+      .put(Float.class, Types.FLOAT)
+      .put(Double.class, Types.DOUBLE)
+      .put(BigDecimal.class, Types.DECIMAL)
+      .put(String.class, Types.VARCHAR)
+      .put(Date.class, Types.DATE)
+      .put(Boolean.class, Types.BOOLEAN)
+      .build();
+
+  private static final BeamRecordSqlType RECORD_SQL_TYPE = BeamRecordSqlType.builder()
+      .withDateField("ts")
+      .withTinyIntField("c_tinyint")
+      .withSmallIntField("c_smallint")
+      .withIntegerField("c_integer")
+      .withBigIntField("c_bigint")
+      .withFloatField("c_float")
+      .withDoubleField("c_double")
+      .withDecimalField("c_decimal")
+      .withTinyIntField("c_tinyint_max")
+      .withSmallIntField("c_smallint_max")
+      .withIntegerField("c_integer_max")
+      .withBigIntField("c_bigint_max")
+      .build();
 
   @Rule
   public final TestPipeline pipeline = TestPipeline.create();
 
   protected PCollection<BeamRecord> getTestPCollection() {
-    BeamRecordSqlType type = BeamRecordSqlType.create(
-        Arrays.asList("ts", "c_tinyint", "c_smallint",
-            "c_integer", "c_bigint", "c_float", "c_double", "c_decimal",
-            "c_tinyint_max", "c_smallint_max", "c_integer_max", "c_bigint_max"),
-        Arrays.asList(Types.DATE, Types.TINYINT, Types.SMALLINT,
-            Types.INTEGER, Types.BIGINT, Types.FLOAT, Types.DOUBLE, Types.DECIMAL,
-            Types.TINYINT, Types.SMALLINT, Types.INTEGER, Types.BIGINT)
-    );
     try {
       return MockedBoundedTable
-          .of(type)
+          .of(RECORD_SQL_TYPE)
           .addRows(
               parseDate("1986-02-15 11:35:26"),
               (byte) 1,
@@ -88,7 +94,7 @@ public class BeamSqlBuiltinFunctionsIntegrationTestBase {
               9223372036854775807L
           )
           .buildIOReader(pipeline)
-          .setCoder(type.getRecordCoder());
+          .setCoder(RECORD_SQL_TYPE.getRecordCoder());
     } catch (Exception e) {
       throw new RuntimeException(e);
     }

-- 
To stop receiving notification emails like this one, please contact
"commits@beam.apache.org" <co...@beam.apache.org>.