You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by xu...@apache.org on 2017/11/29 04:23:36 UTC
[beam] 02/02: [BEAM-3238][SQL] Add BeamRecordSqlTypeBuilder
This is an automated email from the ASF dual-hosted git repository.
xumingming pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
commit 2eb7de0fe6e96da9805fc827294da1e1329ff716
Author: Anton Kedin <ke...@google.com>
AuthorDate: Wed Nov 22 19:17:21 2017 -0800
[BEAM-3238][SQL] Add BeamRecordSqlTypeBuilder
To improve readability of creating BeamRecordSqlTypes.
---
.../beam/sdk/extensions/sql/BeamRecordSqlType.java | 85 ++++++++++++++-
.../sdk/extensions/sql/BeamRecordSqlTypeTest.java | 115 +++++++++++++++++++++
...BeamSqlBuiltinFunctionsIntegrationTestBase.java | 56 +++++-----
3 files changed, 229 insertions(+), 27 deletions(-)
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamRecordSqlType.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamRecordSqlType.java
index 1784ec1..a6b23b6 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamRecordSqlType.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamRecordSqlType.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.sdk.extensions.sql;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import java.math.BigDecimal;
import java.sql.Types;
@@ -108,8 +109,8 @@ public class BeamRecordSqlType extends BeamRecordType {
Integer fieldType = fieldTypes.get(idx);
if (!CODERS.containsKey(fieldType)) {
- throw new UnsupportedOperationException(
- "Data type: " + fieldType + " not supported yet!");
+ throw new UnsupportedOperationException(
+ "Data type: " + fieldType + " not supported yet!");
}
fieldCoders.add(CODERS.get(fieldType));
@@ -166,4 +167,84 @@ public class BeamRecordSqlType extends BeamRecordType {
return "BeamRecordSqlType [fieldNames=" + getFieldNames()
+ ", fieldTypes=" + fieldTypes + "]";
}
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ /**
+ * Builder class to construct {@link BeamRecordSqlType}.
+ */
+ public static class Builder {
+
+ private ImmutableList.Builder<String> fieldNames;
+ private ImmutableList.Builder<Integer> fieldTypes;
+
+ public Builder withField(String fieldName, Integer fieldType) {
+ fieldNames.add(fieldName);
+ fieldTypes.add(fieldType);
+ return this;
+ }
+
+ public Builder withTinyIntField(String fieldName) {
+ return withField(fieldName, Types.TINYINT);
+ }
+
+ public Builder withSmallIntField(String fieldName) {
+ return withField(fieldName, Types.SMALLINT);
+ }
+
+ public Builder withIntegerField(String fieldName) {
+ return withField(fieldName, Types.INTEGER);
+ }
+
+ public Builder withBigIntField(String fieldName) {
+ return withField(fieldName, Types.BIGINT);
+ }
+
+ public Builder withFloatField(String fieldName) {
+ return withField(fieldName, Types.FLOAT);
+ }
+
+ public Builder withDoubleField(String fieldName) {
+ return withField(fieldName, Types.DOUBLE);
+ }
+
+ public Builder withDecimalField(String fieldName) {
+ return withField(fieldName, Types.DECIMAL);
+ }
+
+ public Builder withBooleanField(String fieldName) {
+ return withField(fieldName, Types.BOOLEAN);
+ }
+
+ public Builder withCharField(String fieldName) {
+ return withField(fieldName, Types.CHAR);
+ }
+
+ public Builder withVarcharField(String fieldName) {
+ return withField(fieldName, Types.VARCHAR);
+ }
+
+ public Builder withTimeField(String fieldName) {
+ return withField(fieldName, Types.TIME);
+ }
+
+ public Builder withDateField(String fieldName) {
+ return withField(fieldName, Types.DATE);
+ }
+
+ public Builder withTimestampField(String fieldName) {
+ return withField(fieldName, Types.TIMESTAMP);
+ }
+
+ private Builder() {
+ this.fieldNames = ImmutableList.builder();
+ this.fieldTypes = ImmutableList.builder();
+ }
+
+ public BeamRecordSqlType build() {
+ return create(fieldNames.build(), fieldTypes.build());
+ }
+ }
}
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamRecordSqlTypeTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamRecordSqlTypeTest.java
new file mode 100644
index 0000000..78ff221
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamRecordSqlTypeTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql;
+
+import static org.junit.Assert.assertEquals;
+
+import com.google.common.collect.ImmutableList;
+import java.sql.Types;
+import java.util.List;
+import org.junit.Test;
+
+/**
+ * Unit tests for {@link BeamRecordSqlType}.
+ */
+public class BeamRecordSqlTypeTest {
+
+ private static final List<Integer> TYPES = ImmutableList.of(
+ Types.TINYINT,
+ Types.SMALLINT,
+ Types.INTEGER,
+ Types.BIGINT,
+ Types.FLOAT,
+ Types.DOUBLE,
+ Types.DECIMAL,
+ Types.BOOLEAN,
+ Types.CHAR,
+ Types.VARCHAR,
+ Types.TIME,
+ Types.DATE,
+ Types.TIMESTAMP);
+
+ private static final List<String> NAMES = ImmutableList.of(
+ "TINYINT_FIELD",
+ "SMALLINT_FIELD",
+ "INTEGER_FIELD",
+ "BIGINT_FIELD",
+ "FLOAT_FIELD",
+ "DOUBLE_FIELD",
+ "DECIMAL_FIELD",
+ "BOOLEAN_FIELD",
+ "CHAR_FIELD",
+ "VARCHAR_FIELD",
+ "TIME_FIELD",
+ "DATE_FIELD",
+ "TIMESTAMP_FIELD");
+
+ private static final List<String> MORE_NAMES = ImmutableList.of(
+ "ANOTHER_TINYINT_FIELD",
+ "ANOTHER_SMALLINT_FIELD",
+ "ANOTHER_INTEGER_FIELD",
+ "ANOTHER_BIGINT_FIELD",
+ "ANOTHER_FLOAT_FIELD",
+ "ANOTHER_DOUBLE_FIELD",
+ "ANOTHER_DECIMAL_FIELD",
+ "ANOTHER_BOOLEAN_FIELD",
+ "ANOTHER_CHAR_FIELD",
+ "ANOTHER_VARCHAR_FIELD",
+ "ANOTHER_TIME_FIELD",
+ "ANOTHER_DATE_FIELD",
+ "ANOTHER_TIMESTAMP_FIELD");
+
+ @Test
+ public void testBuildsWithCorrectFields() throws Exception {
+ BeamRecordSqlType.Builder recordTypeBuilder = BeamRecordSqlType.builder();
+
+ for (int i = 0; i < TYPES.size(); i++) {
+ recordTypeBuilder.withField(NAMES.get(i), TYPES.get(i));
+ }
+
+ recordTypeBuilder.withTinyIntField(MORE_NAMES.get(0));
+ recordTypeBuilder.withSmallIntField(MORE_NAMES.get(1));
+ recordTypeBuilder.withIntegerField(MORE_NAMES.get(2));
+ recordTypeBuilder.withBigIntField(MORE_NAMES.get(3));
+ recordTypeBuilder.withFloatField(MORE_NAMES.get(4));
+ recordTypeBuilder.withDoubleField(MORE_NAMES.get(5));
+ recordTypeBuilder.withDecimalField(MORE_NAMES.get(6));
+ recordTypeBuilder.withBooleanField(MORE_NAMES.get(7));
+ recordTypeBuilder.withCharField(MORE_NAMES.get(8));
+ recordTypeBuilder.withVarcharField(MORE_NAMES.get(9));
+ recordTypeBuilder.withTimeField(MORE_NAMES.get(10));
+ recordTypeBuilder.withDateField(MORE_NAMES.get(11));
+ recordTypeBuilder.withTimestampField(MORE_NAMES.get(12));
+
+ BeamRecordSqlType recordSqlType = recordTypeBuilder.build();
+
+ List<String> expectedNames = ImmutableList.<String>builder()
+ .addAll(NAMES)
+ .addAll(MORE_NAMES)
+ .build();
+
+ List<Integer> expectedTypes = ImmutableList.<Integer>builder()
+ .addAll(TYPES)
+ .addAll(TYPES)
+ .build();
+
+ assertEquals(expectedNames, recordSqlType.getFieldNames());
+ assertEquals(expectedTypes, recordSqlType.getFieldTypes());
+ }
+}
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java
index 3395269..5997540 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java
@@ -19,13 +19,12 @@
package org.apache.beam.sdk.extensions.sql.integrationtest;
import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableMap;
import java.math.BigDecimal;
import java.sql.Types;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Date;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TimeZone;
@@ -44,35 +43,42 @@ import org.junit.Rule;
* Base class for all built-in functions integration tests.
*/
public class BeamSqlBuiltinFunctionsIntegrationTestBase {
- private static final Map<Class, Integer> JAVA_CLASS_TO_SQL_TYPE = new HashMap<>();
- static {
- JAVA_CLASS_TO_SQL_TYPE.put(Byte.class, Types.TINYINT);
- JAVA_CLASS_TO_SQL_TYPE.put(Short.class, Types.SMALLINT);
- JAVA_CLASS_TO_SQL_TYPE.put(Integer.class, Types.INTEGER);
- JAVA_CLASS_TO_SQL_TYPE.put(Long.class, Types.BIGINT);
- JAVA_CLASS_TO_SQL_TYPE.put(Float.class, Types.FLOAT);
- JAVA_CLASS_TO_SQL_TYPE.put(Double.class, Types.DOUBLE);
- JAVA_CLASS_TO_SQL_TYPE.put(BigDecimal.class, Types.DECIMAL);
- JAVA_CLASS_TO_SQL_TYPE.put(String.class, Types.VARCHAR);
- JAVA_CLASS_TO_SQL_TYPE.put(Date.class, Types.DATE);
- JAVA_CLASS_TO_SQL_TYPE.put(Boolean.class, Types.BOOLEAN);
- }
+ private static final Map<Class, Integer> JAVA_CLASS_TO_SQL_TYPE = ImmutableMap
+ .<Class, Integer> builder()
+ .put(Byte.class, Types.TINYINT)
+ .put(Short.class, Types.SMALLINT)
+ .put(Integer.class, Types.INTEGER)
+ .put(Long.class, Types.BIGINT)
+ .put(Float.class, Types.FLOAT)
+ .put(Double.class, Types.DOUBLE)
+ .put(BigDecimal.class, Types.DECIMAL)
+ .put(String.class, Types.VARCHAR)
+ .put(Date.class, Types.DATE)
+ .put(Boolean.class, Types.BOOLEAN)
+ .build();
+
+ private static final BeamRecordSqlType RECORD_SQL_TYPE = BeamRecordSqlType.builder()
+ .withDateField("ts")
+ .withTinyIntField("c_tinyint")
+ .withSmallIntField("c_smallint")
+ .withIntegerField("c_integer")
+ .withBigIntField("c_bigint")
+ .withFloatField("c_float")
+ .withDoubleField("c_double")
+ .withDecimalField("c_decimal")
+ .withTinyIntField("c_tinyint_max")
+ .withSmallIntField("c_smallint_max")
+ .withIntegerField("c_integer_max")
+ .withBigIntField("c_bigint_max")
+ .build();
@Rule
public final TestPipeline pipeline = TestPipeline.create();
protected PCollection<BeamRecord> getTestPCollection() {
- BeamRecordSqlType type = BeamRecordSqlType.create(
- Arrays.asList("ts", "c_tinyint", "c_smallint",
- "c_integer", "c_bigint", "c_float", "c_double", "c_decimal",
- "c_tinyint_max", "c_smallint_max", "c_integer_max", "c_bigint_max"),
- Arrays.asList(Types.DATE, Types.TINYINT, Types.SMALLINT,
- Types.INTEGER, Types.BIGINT, Types.FLOAT, Types.DOUBLE, Types.DECIMAL,
- Types.TINYINT, Types.SMALLINT, Types.INTEGER, Types.BIGINT)
- );
try {
return MockedBoundedTable
- .of(type)
+ .of(RECORD_SQL_TYPE)
.addRows(
parseDate("1986-02-15 11:35:26"),
(byte) 1,
@@ -88,7 +94,7 @@ public class BeamSqlBuiltinFunctionsIntegrationTestBase {
9223372036854775807L
)
.buildIOReader(pipeline)
- .setCoder(type.getRecordCoder());
+ .setCoder(RECORD_SQL_TYPE.getRecordCoder());
} catch (Exception e) {
throw new RuntimeException(e);
}
--
To stop receiving notification emails like this one, please contact
"commits@beam.apache.org" <co...@beam.apache.org>.