You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2017/08/04 17:09:47 UTC
[4/7] beam git commit: refactor BeamRecord, BeamRecordType,
BeamSqlRecordType, BeamRecordCoder
http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRecordHelper.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRecordHelper.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRecordHelper.java
new file mode 100644
index 0000000..b910c84
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRecordHelper.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.schema;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.math.BigDecimal;
+import java.util.Date;
+import java.util.GregorianCalendar;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.BigDecimalCoder;
+import org.apache.beam.sdk.coders.BigEndianLongCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CustomCoder;
+import org.apache.beam.sdk.values.BeamRecord;
+
+/**
+ * A {@link Coder} encodes {@link BeamRecord}.
+ */
+@Experimental
+public class BeamSqlRecordHelper {
+
+ public static BeamSqlRecordType getSqlRecordType(BeamRecord record) {
+ return (BeamSqlRecordType) record.getDataType();
+ }
+
+ /**
+ * {@link Coder} for Java type {@link Short}.
+ */
+ public static class ShortCoder extends CustomCoder<Short> {
+ private static final ShortCoder INSTANCE = new ShortCoder();
+
+ public static ShortCoder of() {
+ return INSTANCE;
+ }
+
+ private ShortCoder() {
+ }
+
+ @Override
+ public void encode(Short value, OutputStream outStream) throws CoderException, IOException {
+ new DataOutputStream(outStream).writeShort(value);
+ }
+
+ @Override
+ public Short decode(InputStream inStream) throws CoderException, IOException {
+ return new DataInputStream(inStream).readShort();
+ }
+
+ @Override
+ public void verifyDeterministic() throws NonDeterministicException {
+ }
+ }
+ /**
+ * {@link Coder} for Java type {@link Float}, it's stored as {@link BigDecimal}.
+ */
+ public static class FloatCoder extends CustomCoder<Float> {
+ private static final FloatCoder INSTANCE = new FloatCoder();
+ private static final BigDecimalCoder CODER = BigDecimalCoder.of();
+
+ public static FloatCoder of() {
+ return INSTANCE;
+ }
+
+ private FloatCoder() {
+ }
+
+ @Override
+ public void encode(Float value, OutputStream outStream) throws CoderException, IOException {
+ CODER.encode(new BigDecimal(value), outStream);
+ }
+
+ @Override
+ public Float decode(InputStream inStream) throws CoderException, IOException {
+ return CODER.decode(inStream).floatValue();
+ }
+
+ @Override
+ public void verifyDeterministic() throws NonDeterministicException {
+ }
+ }
+ /**
+ * {@link Coder} for Java type {@link Double}, it's stored as {@link BigDecimal}.
+ */
+ public static class DoubleCoder extends CustomCoder<Double> {
+ private static final DoubleCoder INSTANCE = new DoubleCoder();
+ private static final BigDecimalCoder CODER = BigDecimalCoder.of();
+
+ public static DoubleCoder of() {
+ return INSTANCE;
+ }
+
+ private DoubleCoder() {
+ }
+
+ @Override
+ public void encode(Double value, OutputStream outStream) throws CoderException, IOException {
+ CODER.encode(new BigDecimal(value), outStream);
+ }
+
+ @Override
+ public Double decode(InputStream inStream) throws CoderException, IOException {
+ return CODER.decode(inStream).doubleValue();
+ }
+
+ @Override
+ public void verifyDeterministic() throws NonDeterministicException {
+ }
+ }
+
+ /**
+ * {@link Coder} for Java type {@link GregorianCalendar}, it's stored as {@link Long}.
+ */
+ public static class TimeCoder extends CustomCoder<GregorianCalendar> {
+ private static final BigEndianLongCoder longCoder = BigEndianLongCoder.of();
+ private static final TimeCoder INSTANCE = new TimeCoder();
+
+ public static TimeCoder of() {
+ return INSTANCE;
+ }
+
+ private TimeCoder() {
+ }
+
+ @Override
+ public void encode(GregorianCalendar value, OutputStream outStream)
+ throws CoderException, IOException {
+ longCoder.encode(value.getTime().getTime(), outStream);
+ }
+
+ @Override
+ public GregorianCalendar decode(InputStream inStream) throws CoderException, IOException {
+ GregorianCalendar calendar = new GregorianCalendar();
+ calendar.setTime(new Date(longCoder.decode(inStream)));
+ return calendar;
+ }
+
+ @Override
+ public void verifyDeterministic() throws NonDeterministicException {
+ }
+ }
+ /**
+ * {@link Coder} for Java type {@link Date}, it's stored as {@link Long}.
+ */
+ public static class DateCoder extends CustomCoder<Date> {
+ private static final BigEndianLongCoder longCoder = BigEndianLongCoder.of();
+ private static final DateCoder INSTANCE = new DateCoder();
+
+ public static DateCoder of() {
+ return INSTANCE;
+ }
+
+ private DateCoder() {
+ }
+
+ @Override
+ public void encode(Date value, OutputStream outStream) throws CoderException, IOException {
+ longCoder.encode(value.getTime(), outStream);
+ }
+
+ @Override
+ public Date decode(InputStream inStream) throws CoderException, IOException {
+ return new Date(longCoder.decode(inStream));
+ }
+
+ @Override
+ public void verifyDeterministic() throws NonDeterministicException {
+ }
+ }
+
+ /**
+ * {@link Coder} for Java type {@link Boolean}.
+ */
+ public static class BooleanCoder extends CustomCoder<Boolean> {
+ private static final BooleanCoder INSTANCE = new BooleanCoder();
+
+ public static BooleanCoder of() {
+ return INSTANCE;
+ }
+
+ private BooleanCoder() {
+ }
+
+ @Override
+ public void encode(Boolean value, OutputStream outStream) throws CoderException, IOException {
+ new DataOutputStream(outStream).writeBoolean(value);
+ }
+
+ @Override
+ public Boolean decode(InputStream inStream) throws CoderException, IOException {
+ return new DataInputStream(inStream).readBoolean();
+ }
+
+ @Override
+ public void verifyDeterministic() throws NonDeterministicException {
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRecordType.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRecordType.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRecordType.java
new file mode 100644
index 0000000..b295049
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRecordType.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.schema;
+
+import java.math.BigDecimal;
+import java.sql.Types;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.GregorianCalendar;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.coders.BigDecimalCoder;
+import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
+import org.apache.beam.sdk.coders.BigEndianLongCoder;
+import org.apache.beam.sdk.coders.ByteCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordHelper.BooleanCoder;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordHelper.DateCoder;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordHelper.DoubleCoder;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordHelper.FloatCoder;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordHelper.ShortCoder;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordHelper.TimeCoder;
+import org.apache.beam.sdk.values.BeamRecord;
+import org.apache.beam.sdk.values.BeamRecordType;
+
+/**
+ * Type provider for {@link BeamRecord} with SQL types.
+ *
+ * <p>Limited SQL types are supported now, visit
+ * <a href="https://beam.apache.org/blog/2017/07/21/sql-dsl.html#data-type">data types</a>
+ * for more details.
+ *
+ */
+public class BeamSqlRecordType extends BeamRecordType {
+ private static final Map<Integer, Class> SQL_TYPE_TO_JAVA_CLASS = new HashMap<>();
+ static {
+ SQL_TYPE_TO_JAVA_CLASS.put(Types.TINYINT, Byte.class);
+ SQL_TYPE_TO_JAVA_CLASS.put(Types.SMALLINT, Short.class);
+ SQL_TYPE_TO_JAVA_CLASS.put(Types.INTEGER, Integer.class);
+ SQL_TYPE_TO_JAVA_CLASS.put(Types.BIGINT, Long.class);
+ SQL_TYPE_TO_JAVA_CLASS.put(Types.FLOAT, Float.class);
+ SQL_TYPE_TO_JAVA_CLASS.put(Types.DOUBLE, Double.class);
+ SQL_TYPE_TO_JAVA_CLASS.put(Types.DECIMAL, BigDecimal.class);
+
+ SQL_TYPE_TO_JAVA_CLASS.put(Types.BOOLEAN, Boolean.class);
+
+ SQL_TYPE_TO_JAVA_CLASS.put(Types.CHAR, String.class);
+ SQL_TYPE_TO_JAVA_CLASS.put(Types.VARCHAR, String.class);
+
+ SQL_TYPE_TO_JAVA_CLASS.put(Types.TIME, GregorianCalendar.class);
+
+ SQL_TYPE_TO_JAVA_CLASS.put(Types.DATE, Date.class);
+ SQL_TYPE_TO_JAVA_CLASS.put(Types.TIMESTAMP, Date.class);
+ }
+
+ public List<Integer> fieldsType;
+
+ protected BeamSqlRecordType(List<String> fieldsName, List<Coder> fieldsCoder) {
+ super(fieldsName, fieldsCoder);
+ }
+
+ private BeamSqlRecordType(List<String> fieldsName, List<Integer> fieldsType
+ , List<Coder> fieldsCoder) {
+ super(fieldsName, fieldsCoder);
+ this.fieldsType = fieldsType;
+ }
+
+ public static BeamSqlRecordType create(List<String> fieldNames,
+ List<Integer> fieldTypes) {
+ List<Coder> fieldCoders = new ArrayList<>();
+ for (int idx = 0; idx < fieldTypes.size(); ++idx) {
+ switch (fieldTypes.get(idx)) {
+ case Types.INTEGER:
+ fieldCoders.add(BigEndianIntegerCoder.of());
+ break;
+ case Types.SMALLINT:
+ fieldCoders.add(ShortCoder.of());
+ break;
+ case Types.TINYINT:
+ fieldCoders.add(ByteCoder.of());
+ break;
+ case Types.DOUBLE:
+ fieldCoders.add(DoubleCoder.of());
+ break;
+ case Types.FLOAT:
+ fieldCoders.add(FloatCoder.of());
+ break;
+ case Types.DECIMAL:
+ fieldCoders.add(BigDecimalCoder.of());
+ break;
+ case Types.BIGINT:
+ fieldCoders.add(BigEndianLongCoder.of());
+ break;
+ case Types.VARCHAR:
+ case Types.CHAR:
+ fieldCoders.add(StringUtf8Coder.of());
+ break;
+ case Types.TIME:
+ fieldCoders.add(TimeCoder.of());
+ break;
+ case Types.DATE:
+ case Types.TIMESTAMP:
+ fieldCoders.add(DateCoder.of());
+ break;
+ case Types.BOOLEAN:
+ fieldCoders.add(BooleanCoder.of());
+ break;
+
+ default:
+ throw new UnsupportedOperationException(
+ "Data type: " + fieldTypes.get(idx) + " not supported yet!");
+ }
+ }
+ return new BeamSqlRecordType(fieldNames, fieldTypes, fieldCoders);
+ }
+
+ @Override
+ public void validateValueType(int index, Object fieldValue) throws IllegalArgumentException {
+ int fieldType = fieldsType.get(index);
+ Class javaClazz = SQL_TYPE_TO_JAVA_CLASS.get(fieldType);
+ if (javaClazz == null) {
+ throw new IllegalArgumentException("Data type: " + fieldType + " not supported yet!");
+ }
+
+ if (!fieldValue.getClass().equals(javaClazz)) {
+ throw new IllegalArgumentException(
+ String.format("[%s](%s) doesn't match type [%s]",
+ fieldValue, fieldValue.getClass(), fieldType)
+ );
+ }
+ }
+
+ public List<Integer> getFieldsType() {
+ return fieldsType;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj != null && obj instanceof BeamSqlRecordType) {
+ BeamSqlRecordType ins = (BeamSqlRecordType) obj;
+ return fieldsType.equals(ins.getFieldsType()) && getFieldsName().equals(ins.getFieldsName());
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ return 31 * getFieldsName().hashCode() + getFieldsType().hashCode();
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRow.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRow.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRow.java
deleted file mode 100644
index cb5c7ea..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRow.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.sql.schema;
-
-import java.util.List;
-import org.apache.beam.sdk.values.BeamRecord;
-import org.apache.beam.sdk.values.PCollection;
-
-/**
- * {@link BeamSqlRow} represents one row element in a {@link PCollection},
- * with type provider {@link BeamSqlRowType}.
- */
-public class BeamSqlRow extends BeamRecord {
- public BeamSqlRow(BeamSqlRowType dataType, List<Object> dataValues) {
- super(dataType, dataValues);
- }
-
- public BeamSqlRow(BeamSqlRowType dataType) {
- super(dataType);
- }
-
- @Override
- public BeamSqlRowType getDataType() {
- return (BeamSqlRowType) super.getDataType();
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowCoder.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowCoder.java
deleted file mode 100644
index c7656af..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowCoder.java
+++ /dev/null
@@ -1,186 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.sql.schema;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.sql.Types;
-import java.util.BitSet;
-import java.util.Date;
-import java.util.GregorianCalendar;
-import org.apache.beam.sdk.coders.BigDecimalCoder;
-import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
-import org.apache.beam.sdk.coders.BigEndianLongCoder;
-import org.apache.beam.sdk.coders.BitSetCoder;
-import org.apache.beam.sdk.coders.ByteCoder;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
-import org.apache.beam.sdk.coders.DoubleCoder;
-import org.apache.beam.sdk.coders.InstantCoder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-
-/**
- * A {@link Coder} encodes {@link BeamSqlRow}.
- */
-public class BeamSqlRowCoder extends CustomCoder<BeamSqlRow> {
- private BeamSqlRowType sqlRecordType;
-
- private static final BitSetCoder nullListCoder = BitSetCoder.of();
-
- private static final StringUtf8Coder stringCoder = StringUtf8Coder.of();
- private static final BigEndianIntegerCoder intCoder = BigEndianIntegerCoder.of();
- private static final BigEndianLongCoder longCoder = BigEndianLongCoder.of();
- private static final DoubleCoder doubleCoder = DoubleCoder.of();
- private static final InstantCoder instantCoder = InstantCoder.of();
- private static final BigDecimalCoder bigDecimalCoder = BigDecimalCoder.of();
- private static final ByteCoder byteCoder = ByteCoder.of();
-
- public BeamSqlRowCoder(BeamSqlRowType sqlRecordType) {
- this.sqlRecordType = sqlRecordType;
- }
-
- @Override
- public void encode(BeamSqlRow value, OutputStream outStream)
- throws CoderException, IOException {
- nullListCoder.encode(value.getNullFields(), outStream);
- for (int idx = 0; idx < value.size(); ++idx) {
- if (value.getNullFields().get(idx)) {
- continue;
- }
-
- switch (sqlRecordType.getFieldsType().get(idx)) {
- case Types.INTEGER:
- intCoder.encode(value.getInteger(idx), outStream);
- break;
- case Types.SMALLINT:
- intCoder.encode((int) value.getShort(idx), outStream);
- break;
- case Types.TINYINT:
- byteCoder.encode(value.getByte(idx), outStream);
- break;
- case Types.DOUBLE:
- doubleCoder.encode(value.getDouble(idx), outStream);
- break;
- case Types.FLOAT:
- doubleCoder.encode((double) value.getFloat(idx), outStream);
- break;
- case Types.DECIMAL:
- bigDecimalCoder.encode(value.getBigDecimal(idx), outStream);
- break;
- case Types.BIGINT:
- longCoder.encode(value.getLong(idx), outStream);
- break;
- case Types.VARCHAR:
- case Types.CHAR:
- stringCoder.encode(value.getString(idx), outStream);
- break;
- case Types.TIME:
- longCoder.encode(value.getGregorianCalendar(idx).getTime().getTime(), outStream);
- break;
- case Types.DATE:
- case Types.TIMESTAMP:
- longCoder.encode(value.getDate(idx).getTime(), outStream);
- break;
- case Types.BOOLEAN:
- byteCoder.encode((byte) (value.getBoolean(idx) ? 1 : 0), outStream);
- break;
-
- default:
- throw new UnsupportedOperationException(
- "Data type: " + sqlRecordType.getFieldsType().get(idx) + " not supported yet!");
- }
- }
-
- instantCoder.encode(value.getWindowStart(), outStream);
- instantCoder.encode(value.getWindowEnd(), outStream);
- }
-
- @Override
- public BeamSqlRow decode(InputStream inStream) throws CoderException, IOException {
- BitSet nullFields = nullListCoder.decode(inStream);
-
- BeamSqlRow record = new BeamSqlRow(sqlRecordType);
- record.setNullFields(nullFields);
- for (int idx = 0; idx < sqlRecordType.size(); ++idx) {
- if (nullFields.get(idx)) {
- continue;
- }
-
- switch (sqlRecordType.getFieldsType().get(idx)) {
- case Types.INTEGER:
- record.addField(idx, intCoder.decode(inStream));
- break;
- case Types.SMALLINT:
- record.addField(idx, intCoder.decode(inStream).shortValue());
- break;
- case Types.TINYINT:
- record.addField(idx, byteCoder.decode(inStream));
- break;
- case Types.DOUBLE:
- record.addField(idx, doubleCoder.decode(inStream));
- break;
- case Types.FLOAT:
- record.addField(idx, doubleCoder.decode(inStream).floatValue());
- break;
- case Types.BIGINT:
- record.addField(idx, longCoder.decode(inStream));
- break;
- case Types.DECIMAL:
- record.addField(idx, bigDecimalCoder.decode(inStream));
- break;
- case Types.VARCHAR:
- case Types.CHAR:
- record.addField(idx, stringCoder.decode(inStream));
- break;
- case Types.TIME:
- GregorianCalendar calendar = new GregorianCalendar();
- calendar.setTime(new Date(longCoder.decode(inStream)));
- record.addField(idx, calendar);
- break;
- case Types.DATE:
- case Types.TIMESTAMP:
- record.addField(idx, new Date(longCoder.decode(inStream)));
- break;
- case Types.BOOLEAN:
- record.addField(idx, byteCoder.decode(inStream) == 1);
- break;
-
- default:
- throw new UnsupportedOperationException("Data type: "
- + sqlRecordType.getFieldsType().get(idx)
- + " not supported yet!");
- }
- }
-
- record.setWindowStart(instantCoder.decode(inStream));
- record.setWindowEnd(instantCoder.decode(inStream));
-
- return record;
- }
-
- public BeamSqlRowType getSqlRecordType() {
- return sqlRecordType;
- }
-
- @Override
- public void verifyDeterministic()
- throws org.apache.beam.sdk.coders.Coder.NonDeterministicException {
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowType.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowType.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowType.java
deleted file mode 100644
index 7584dad..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowType.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.sql.schema;
-
-import java.math.BigDecimal;
-import java.sql.Types;
-import java.util.Date;
-import java.util.GregorianCalendar;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import org.apache.beam.sdk.values.BeamRecordTypeProvider;
-
-/**
- * Type provider for {@link BeamSqlRow} with SQL types.
- *
- * <p>Limited SQL types are supported now, visit
- * <a href="https://beam.apache.org/blog/2017/07/21/sql-dsl.html#data-type">data types</a>
- * for more details.
- *
- */
-public class BeamSqlRowType extends BeamRecordTypeProvider {
- private static final Map<Integer, Class> SQL_TYPE_TO_JAVA_CLASS = new HashMap<>();
- static {
- SQL_TYPE_TO_JAVA_CLASS.put(Types.TINYINT, Byte.class);
- SQL_TYPE_TO_JAVA_CLASS.put(Types.SMALLINT, Short.class);
- SQL_TYPE_TO_JAVA_CLASS.put(Types.INTEGER, Integer.class);
- SQL_TYPE_TO_JAVA_CLASS.put(Types.BIGINT, Long.class);
- SQL_TYPE_TO_JAVA_CLASS.put(Types.FLOAT, Float.class);
- SQL_TYPE_TO_JAVA_CLASS.put(Types.DOUBLE, Double.class);
- SQL_TYPE_TO_JAVA_CLASS.put(Types.DECIMAL, BigDecimal.class);
-
- SQL_TYPE_TO_JAVA_CLASS.put(Types.BOOLEAN, Boolean.class);
-
- SQL_TYPE_TO_JAVA_CLASS.put(Types.CHAR, String.class);
- SQL_TYPE_TO_JAVA_CLASS.put(Types.VARCHAR, String.class);
-
- SQL_TYPE_TO_JAVA_CLASS.put(Types.TIME, GregorianCalendar.class);
-
- SQL_TYPE_TO_JAVA_CLASS.put(Types.DATE, Date.class);
- SQL_TYPE_TO_JAVA_CLASS.put(Types.TIMESTAMP, Date.class);
- }
-
- public List<Integer> fieldsType;
-
- protected BeamSqlRowType(List<String> fieldsName) {
- super(fieldsName);
- }
-
- public BeamSqlRowType(List<String> fieldsName, List<Integer> fieldsType) {
- super(fieldsName);
- this.fieldsType = fieldsType;
- }
-
- public static BeamSqlRowType create(List<String> fieldNames,
- List<Integer> fieldTypes) {
- return new BeamSqlRowType(fieldNames, fieldTypes);
- }
-
- @Override
- public void validateValueType(int index, Object fieldValue) throws IllegalArgumentException {
- int fieldType = fieldsType.get(index);
- Class javaClazz = SQL_TYPE_TO_JAVA_CLASS.get(fieldType);
- if (javaClazz == null) {
- throw new IllegalArgumentException("Data type: " + fieldType + " not supported yet!");
- }
-
- if (!fieldValue.getClass().equals(javaClazz)) {
- throw new IllegalArgumentException(
- String.format("[%s](%s) doesn't match type [%s]",
- fieldValue, fieldValue.getClass(), fieldType)
- );
- }
- }
-
- public List<Integer> getFieldsType() {
- return fieldsType;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj != null && obj instanceof BeamSqlRowType) {
- BeamSqlRowType ins = (BeamSqlRowType) obj;
- return fieldsType.equals(ins.getFieldsType()) && getFieldsName().equals(ins.getFieldsName());
- } else {
- return false;
- }
- }
-
- @Override
- public int hashCode() {
- return 31 * getFieldsName().hashCode() + getFieldsType().hashCode();
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlTable.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlTable.java
index c179935..b370d9d 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlTable.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlTable.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.extensions.sql.schema;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.BeamRecord;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
@@ -37,16 +38,16 @@ public interface BeamSqlTable {
* create a {@code PCollection<BeamSqlRow>} from source.
*
*/
- PCollection<BeamSqlRow> buildIOReader(Pipeline pipeline);
+ PCollection<BeamRecord> buildIOReader(Pipeline pipeline);
/**
* create a {@code IO.write()} instance to write to target.
*
*/
- PTransform<? super PCollection<BeamSqlRow>, PDone> buildIOWriter();
+ PTransform<? super PCollection<BeamRecord>, PDone> buildIOWriter();
/**
* Get the schema info of the table.
*/
- BeamSqlRowType getRowType();
+ BeamSqlRecordType getRowType();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamTableUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamTableUtils.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamTableUtils.java
index c769928..63c9720 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamTableUtils.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamTableUtils.java
@@ -23,6 +23,7 @@ import java.io.StringReader;
import java.io.StringWriter;
import java.math.BigDecimal;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
+import org.apache.beam.sdk.values.BeamRecord;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.util.NlsString;
import org.apache.commons.csv.CSVFormat;
@@ -34,11 +35,11 @@ import org.apache.commons.csv.CSVRecord;
* Utility methods for working with {@code BeamTable}.
*/
public final class BeamTableUtils {
- public static BeamSqlRow csvLine2BeamSqlRow(
+ public static BeamRecord csvLine2BeamSqlRow(
CSVFormat csvFormat,
String line,
- BeamSqlRowType beamSqlRowType) {
- BeamSqlRow row = new BeamSqlRow(beamSqlRowType);
+ BeamSqlRecordType beamSqlRowType) {
+ BeamRecord row = new BeamRecord(beamSqlRowType);
try (StringReader reader = new StringReader(line)) {
CSVParser parser = csvFormat.parse(reader);
CSVRecord rawRecord = parser.getRecords().get(0);
@@ -60,7 +61,7 @@ public final class BeamTableUtils {
return row;
}
- public static String beamSqlRow2CsvLine(BeamSqlRow row, CSVFormat csvFormat) {
+ public static String beamSqlRow2CsvLine(BeamRecord row, CSVFormat csvFormat) {
StringWriter writer = new StringWriter();
try (CSVPrinter printer = csvFormat.print(writer)) {
for (int i = 0; i < row.size(); i++) {
@@ -73,13 +74,14 @@ public final class BeamTableUtils {
return writer.toString();
}
- public static void addFieldWithAutoTypeCasting(BeamSqlRow row, int idx, Object rawObj) {
+ public static void addFieldWithAutoTypeCasting(BeamRecord row, int idx, Object rawObj) {
if (rawObj == null) {
row.addField(idx, null);
return;
}
- SqlTypeName columnType = CalciteUtils.getFieldType(row.getDataType(), idx);
+ SqlTypeName columnType = CalciteUtils.getFieldType(BeamSqlRecordHelper.getSqlRecordType(row)
+ , idx);
// auto-casting for numberics
if ((rawObj instanceof String && SqlTypeName.NUMERIC_TYPES.contains(columnType))
|| (rawObj instanceof BigDecimal && columnType != SqlTypeName.DECIMAL)) {
http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaCSVTable.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaCSVTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaCSVTable.java
index 2a50947..f137379 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaCSVTable.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaCSVTable.java
@@ -18,12 +18,12 @@
package org.apache.beam.sdk.extensions.sql.schema.kafka;
import java.util.List;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType;
import org.apache.beam.sdk.extensions.sql.schema.BeamTableUtils;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.BeamRecord;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.commons.csv.CSVFormat;
@@ -34,45 +34,45 @@ import org.apache.commons.csv.CSVFormat;
*/
public class BeamKafkaCSVTable extends BeamKafkaTable {
private CSVFormat csvFormat;
- public BeamKafkaCSVTable(BeamSqlRowType beamSqlRowType, String bootstrapServers,
+ public BeamKafkaCSVTable(BeamSqlRecordType beamSqlRowType, String bootstrapServers,
List<String> topics) {
this(beamSqlRowType, bootstrapServers, topics, CSVFormat.DEFAULT);
}
- public BeamKafkaCSVTable(BeamSqlRowType beamSqlRowType, String bootstrapServers,
+ public BeamKafkaCSVTable(BeamSqlRecordType beamSqlRowType, String bootstrapServers,
List<String> topics, CSVFormat format) {
super(beamSqlRowType, bootstrapServers, topics);
this.csvFormat = format;
}
@Override
- public PTransform<PCollection<KV<byte[], byte[]>>, PCollection<BeamSqlRow>>
+ public PTransform<PCollection<KV<byte[], byte[]>>, PCollection<BeamRecord>>
getPTransformForInput() {
return new CsvRecorderDecoder(beamSqlRowType, csvFormat);
}
@Override
- public PTransform<PCollection<BeamSqlRow>, PCollection<KV<byte[], byte[]>>>
+ public PTransform<PCollection<BeamRecord>, PCollection<KV<byte[], byte[]>>>
getPTransformForOutput() {
return new CsvRecorderEncoder(beamSqlRowType, csvFormat);
}
/**
- * A PTransform to convert {@code KV<byte[], byte[]>} to {@link BeamSqlRow}.
+ * A PTransform to convert {@code KV<byte[], byte[]>} to {@link BeamRecord}.
*
*/
public static class CsvRecorderDecoder
- extends PTransform<PCollection<KV<byte[], byte[]>>, PCollection<BeamSqlRow>> {
- private BeamSqlRowType rowType;
+ extends PTransform<PCollection<KV<byte[], byte[]>>, PCollection<BeamRecord>> {
+ private BeamSqlRecordType rowType;
private CSVFormat format;
- public CsvRecorderDecoder(BeamSqlRowType rowType, CSVFormat format) {
+ public CsvRecorderDecoder(BeamSqlRecordType rowType, CSVFormat format) {
this.rowType = rowType;
this.format = format;
}
@Override
- public PCollection<BeamSqlRow> expand(PCollection<KV<byte[], byte[]>> input) {
- return input.apply("decodeRecord", ParDo.of(new DoFn<KV<byte[], byte[]>, BeamSqlRow>() {
+ public PCollection<BeamRecord> expand(PCollection<KV<byte[], byte[]>> input) {
+ return input.apply("decodeRecord", ParDo.of(new DoFn<KV<byte[], byte[]>, BeamRecord>() {
@ProcessElement
public void processElement(ProcessContext c) {
String rowInString = new String(c.element().getValue());
@@ -83,24 +83,24 @@ public class BeamKafkaCSVTable extends BeamKafkaTable {
}
/**
- * A PTransform to convert {@link BeamSqlRow} to {@code KV<byte[], byte[]>}.
+ * A PTransform to convert {@link BeamRecord} to {@code KV<byte[], byte[]>}.
*
*/
public static class CsvRecorderEncoder
- extends PTransform<PCollection<BeamSqlRow>, PCollection<KV<byte[], byte[]>>> {
- private BeamSqlRowType rowType;
+ extends PTransform<PCollection<BeamRecord>, PCollection<KV<byte[], byte[]>>> {
+ private BeamSqlRecordType rowType;
private CSVFormat format;
- public CsvRecorderEncoder(BeamSqlRowType rowType, CSVFormat format) {
+ public CsvRecorderEncoder(BeamSqlRecordType rowType, CSVFormat format) {
this.rowType = rowType;
this.format = format;
}
@Override
- public PCollection<KV<byte[], byte[]>> expand(PCollection<BeamSqlRow> input) {
- return input.apply("encodeRecord", ParDo.of(new DoFn<BeamSqlRow, KV<byte[], byte[]>>() {
+ public PCollection<KV<byte[], byte[]>> expand(PCollection<BeamRecord> input) {
+ return input.apply("encodeRecord", ParDo.of(new DoFn<BeamRecord, KV<byte[], byte[]>>() {
@ProcessElement
public void processElement(ProcessContext c) {
- BeamSqlRow in = c.element();
+ BeamRecord in = c.element();
c.output(KV.of(new byte[] {}, BeamTableUtils.beamSqlRow2CsvLine(in, format).getBytes()));
}
}));
http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaTable.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaTable.java
index 2cc664f..fac57bf 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaTable.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaTable.java
@@ -26,10 +26,10 @@ import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.extensions.sql.schema.BaseBeamTable;
import org.apache.beam.sdk.extensions.sql.schema.BeamIOType;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.BeamRecord;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
@@ -48,11 +48,11 @@ public abstract class BeamKafkaTable extends BaseBeamTable implements Serializab
private List<String> topics;
private Map<String, Object> configUpdates;
- protected BeamKafkaTable(BeamSqlRowType beamSqlRowType) {
+ protected BeamKafkaTable(BeamSqlRecordType beamSqlRowType) {
super(beamSqlRowType);
}
- public BeamKafkaTable(BeamSqlRowType beamSqlRowType, String bootstrapServers,
+ public BeamKafkaTable(BeamSqlRecordType beamSqlRowType, String bootstrapServers,
List<String> topics) {
super(beamSqlRowType);
this.bootstrapServers = bootstrapServers;
@@ -69,14 +69,14 @@ public abstract class BeamKafkaTable extends BaseBeamTable implements Serializab
return BeamIOType.UNBOUNDED;
}
- public abstract PTransform<PCollection<KV<byte[], byte[]>>, PCollection<BeamSqlRow>>
+ public abstract PTransform<PCollection<KV<byte[], byte[]>>, PCollection<BeamRecord>>
getPTransformForInput();
- public abstract PTransform<PCollection<BeamSqlRow>, PCollection<KV<byte[], byte[]>>>
+ public abstract PTransform<PCollection<BeamRecord>, PCollection<KV<byte[], byte[]>>>
getPTransformForOutput();
@Override
- public PCollection<BeamSqlRow> buildIOReader(Pipeline pipeline) {
+ public PCollection<BeamRecord> buildIOReader(Pipeline pipeline) {
return PBegin.in(pipeline).apply("read",
KafkaIO.<byte[], byte[]>read()
.withBootstrapServers(bootstrapServers)
@@ -89,13 +89,13 @@ public abstract class BeamKafkaTable extends BaseBeamTable implements Serializab
}
@Override
- public PTransform<? super PCollection<BeamSqlRow>, PDone> buildIOWriter() {
+ public PTransform<? super PCollection<BeamRecord>, PDone> buildIOWriter() {
checkArgument(topics != null && topics.size() == 1,
"Only one topic can be acceptable as output.");
- return new PTransform<PCollection<BeamSqlRow>, PDone>() {
+ return new PTransform<PCollection<BeamRecord>, PDone>() {
@Override
- public PDone expand(PCollection<BeamSqlRow> input) {
+ public PDone expand(PCollection<BeamRecord> input) {
return input.apply("out_reformat", getPTransformForOutput()).apply("persistent",
KafkaIO.<byte[], byte[]>write()
.withBootstrapServers(bootstrapServers)
http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTable.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTable.java
index c44faab..0ec418c 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTable.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTable.java
@@ -19,10 +19,10 @@
package org.apache.beam.sdk.extensions.sql.schema.text;
import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.BeamRecord;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
@@ -46,25 +46,25 @@ public class BeamTextCSVTable extends BeamTextTable {
/**
* CSV table with {@link CSVFormat#DEFAULT DEFAULT} format.
*/
- public BeamTextCSVTable(BeamSqlRowType beamSqlRowType, String filePattern) {
+ public BeamTextCSVTable(BeamSqlRecordType beamSqlRowType, String filePattern) {
this(beamSqlRowType, filePattern, CSVFormat.DEFAULT);
}
- public BeamTextCSVTable(BeamSqlRowType beamSqlRowType, String filePattern,
+ public BeamTextCSVTable(BeamSqlRecordType beamSqlRowType, String filePattern,
CSVFormat csvFormat) {
super(beamSqlRowType, filePattern);
this.csvFormat = csvFormat;
}
@Override
- public PCollection<BeamSqlRow> buildIOReader(Pipeline pipeline) {
+ public PCollection<BeamRecord> buildIOReader(Pipeline pipeline) {
return PBegin.in(pipeline).apply("decodeRecord", TextIO.read().from(filePattern))
.apply("parseCSVLine",
new BeamTextCSVTableIOReader(beamSqlRowType, filePattern, csvFormat));
}
@Override
- public PTransform<? super PCollection<BeamSqlRow>, PDone> buildIOWriter() {
+ public PTransform<? super PCollection<BeamRecord>, PDone> buildIOWriter() {
return new BeamTextCSVTableIOWriter(beamSqlRowType, filePattern, csvFormat);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableIOReader.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableIOReader.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableIOReader.java
index 06109c3..ecb77e0 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableIOReader.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableIOReader.java
@@ -19,12 +19,12 @@
package org.apache.beam.sdk.extensions.sql.schema.text;
import java.io.Serializable;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType;
import org.apache.beam.sdk.extensions.sql.schema.BeamTableUtils;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.BeamRecord;
import org.apache.beam.sdk.values.PCollection;
import org.apache.commons.csv.CSVFormat;
@@ -32,13 +32,13 @@ import org.apache.commons.csv.CSVFormat;
* IOReader for {@code BeamTextCSVTable}.
*/
public class BeamTextCSVTableIOReader
- extends PTransform<PCollection<String>, PCollection<BeamSqlRow>>
+ extends PTransform<PCollection<String>, PCollection<BeamRecord>>
implements Serializable {
private String filePattern;
- protected BeamSqlRowType beamSqlRowType;
+ protected BeamSqlRecordType beamSqlRowType;
protected CSVFormat csvFormat;
- public BeamTextCSVTableIOReader(BeamSqlRowType beamSqlRowType, String filePattern,
+ public BeamTextCSVTableIOReader(BeamSqlRecordType beamSqlRowType, String filePattern,
CSVFormat csvFormat) {
this.filePattern = filePattern;
this.beamSqlRowType = beamSqlRowType;
@@ -46,8 +46,8 @@ public class BeamTextCSVTableIOReader
}
@Override
- public PCollection<BeamSqlRow> expand(PCollection<String> input) {
- return input.apply(ParDo.of(new DoFn<String, BeamSqlRow>() {
+ public PCollection<BeamRecord> expand(PCollection<String> input) {
+ return input.apply(ParDo.of(new DoFn<String, BeamRecord>() {
@ProcessElement
public void processElement(ProcessContext ctx) {
String str = ctx.element();
http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableIOWriter.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableIOWriter.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableIOWriter.java
index 1684b37..c616973 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableIOWriter.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextCSVTableIOWriter.java
@@ -19,13 +19,13 @@
package org.apache.beam.sdk.extensions.sql.schema.text;
import java.io.Serializable;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType;
import org.apache.beam.sdk.extensions.sql.schema.BeamTableUtils;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.BeamRecord;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import org.apache.commons.csv.CSVFormat;
@@ -33,24 +33,24 @@ import org.apache.commons.csv.CSVFormat;
/**
* IOWriter for {@code BeamTextCSVTable}.
*/
-public class BeamTextCSVTableIOWriter extends PTransform<PCollection<BeamSqlRow>, PDone>
+public class BeamTextCSVTableIOWriter extends PTransform<PCollection<BeamRecord>, PDone>
implements Serializable {
private String filePattern;
- protected BeamSqlRowType beamSqlRowType;
+ protected BeamSqlRecordType beamSqlRowType;
protected CSVFormat csvFormat;
- public BeamTextCSVTableIOWriter(BeamSqlRowType beamSqlRowType, String filePattern,
+ public BeamTextCSVTableIOWriter(BeamSqlRecordType beamSqlRowType, String filePattern,
CSVFormat csvFormat) {
this.filePattern = filePattern;
this.beamSqlRowType = beamSqlRowType;
this.csvFormat = csvFormat;
}
- @Override public PDone expand(PCollection<BeamSqlRow> input) {
- return input.apply("encodeRecord", ParDo.of(new DoFn<BeamSqlRow, String>() {
+ @Override public PDone expand(PCollection<BeamRecord> input) {
+ return input.apply("encodeRecord", ParDo.of(new DoFn<BeamRecord, String>() {
@ProcessElement public void processElement(ProcessContext ctx) {
- BeamSqlRow row = ctx.element();
+ BeamRecord row = ctx.element();
ctx.output(BeamTableUtils.beamSqlRow2CsvLine(row, csvFormat));
}
})).apply(TextIO.write().to(filePattern));
http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextTable.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextTable.java
index e85608d..4284366 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextTable.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/text/BeamTextTable.java
@@ -21,7 +21,7 @@ package org.apache.beam.sdk.extensions.sql.schema.text;
import java.io.Serializable;
import org.apache.beam.sdk.extensions.sql.schema.BaseBeamTable;
import org.apache.beam.sdk.extensions.sql.schema.BeamIOType;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType;
/**
* {@code BeamTextTable} represents a text file/directory(backed by {@code TextIO}).
@@ -29,7 +29,7 @@ import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType;
public abstract class BeamTextTable extends BaseBeamTable implements Serializable {
protected String filePattern;
- protected BeamTextTable(BeamSqlRowType beamSqlRowType, String filePattern) {
+ protected BeamTextTable(BeamSqlRecordType beamSqlRowType, String filePattern) {
super(beamSqlRowType);
this.filePattern = filePattern;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java
index e6ca18f..8501157 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java
@@ -19,9 +19,9 @@ package org.apache.beam.sdk.extensions.sql;
import java.sql.Types;
import java.util.Arrays;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType;
import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.values.BeamRecord;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
@@ -49,16 +49,16 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase {
runAggregationWithoutWindow(unboundedInput1);
}
- private void runAggregationWithoutWindow(PCollection<BeamSqlRow> input) throws Exception {
+ private void runAggregationWithoutWindow(PCollection<BeamRecord> input) throws Exception {
String sql = "SELECT f_int2, COUNT(*) AS `size` FROM PCOLLECTION GROUP BY f_int2";
- PCollection<BeamSqlRow> result =
+ PCollection<BeamRecord> result =
input.apply("testAggregationWithoutWindow", BeamSql.simpleQuery(sql));
- BeamSqlRowType resultType = BeamSqlRowType.create(Arrays.asList("f_int2", "size"),
+ BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("f_int2", "size"),
Arrays.asList(Types.INTEGER, Types.BIGINT));
- BeamSqlRow record = new BeamSqlRow(resultType);
+ BeamRecord record = new BeamRecord(resultType);
record.addField("f_int2", 0);
record.addField("size", 4L);
@@ -83,7 +83,7 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase {
runAggregationFunctions(unboundedInput1);
}
- private void runAggregationFunctions(PCollection<BeamSqlRow> input) throws Exception{
+ private void runAggregationFunctions(PCollection<BeamRecord> input) throws Exception{
String sql = "select f_int2, count(*) as size, "
+ "sum(f_long) as sum1, avg(f_long) as avg1, max(f_long) as max1, min(f_long) as min1,"
+ "sum(f_short) as sum2, avg(f_short) as avg2, max(f_short) as max2, min(f_short) as min2,"
@@ -94,11 +94,11 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase {
+ "max(f_timestamp) as max6, min(f_timestamp) as min6 "
+ "FROM TABLE_A group by f_int2";
- PCollection<BeamSqlRow> result =
- PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input)
+ PCollection<BeamRecord> result =
+ PCollectionTuple.of(new TupleTag<BeamRecord>("TABLE_A"), input)
.apply("testAggregationFunctions", BeamSql.query(sql));
- BeamSqlRowType resultType = BeamSqlRowType.create(
+ BeamSqlRecordType resultType = BeamSqlRecordType.create(
Arrays.asList("f_int2", "size", "sum1", "avg1", "max1", "min1", "sum2", "avg2", "max2",
"min2", "sum3", "avg3", "max3", "min3", "sum4", "avg4", "max4", "min4", "sum5", "avg5",
"max5", "min5", "max6", "min6"),
@@ -108,7 +108,7 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase {
Types.FLOAT, Types.FLOAT, Types.DOUBLE, Types.DOUBLE, Types.DOUBLE, Types.DOUBLE,
Types.TIMESTAMP, Types.TIMESTAMP));
- BeamSqlRow record = new BeamSqlRow(resultType);
+ BeamRecord record = new BeamRecord(resultType);
record.addField("f_int2", 0);
record.addField("size", 4L);
@@ -161,28 +161,28 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase {
runDistinct(unboundedInput1);
}
- private void runDistinct(PCollection<BeamSqlRow> input) throws Exception {
+ private void runDistinct(PCollection<BeamRecord> input) throws Exception {
String sql = "SELECT distinct f_int, f_long FROM PCOLLECTION ";
- PCollection<BeamSqlRow> result =
+ PCollection<BeamRecord> result =
input.apply("testDistinct", BeamSql.simpleQuery(sql));
- BeamSqlRowType resultType = BeamSqlRowType.create(Arrays.asList("f_int", "f_long"),
+ BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("f_int", "f_long"),
Arrays.asList(Types.INTEGER, Types.BIGINT));
- BeamSqlRow record1 = new BeamSqlRow(resultType);
+ BeamRecord record1 = new BeamRecord(resultType);
record1.addField("f_int", 1);
record1.addField("f_long", 1000L);
- BeamSqlRow record2 = new BeamSqlRow(resultType);
+ BeamRecord record2 = new BeamRecord(resultType);
record2.addField("f_int", 2);
record2.addField("f_long", 2000L);
- BeamSqlRow record3 = new BeamSqlRow(resultType);
+ BeamRecord record3 = new BeamRecord(resultType);
record3.addField("f_int", 3);
record3.addField("f_long", 3000L);
- BeamSqlRow record4 = new BeamSqlRow(resultType);
+ BeamRecord record4 = new BeamRecord(resultType);
record4.addField("f_int", 4);
record4.addField("f_long", 4000L);
@@ -207,27 +207,27 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase {
runTumbleWindow(unboundedInput1);
}
- private void runTumbleWindow(PCollection<BeamSqlRow> input) throws Exception {
+ private void runTumbleWindow(PCollection<BeamRecord> input) throws Exception {
String sql = "SELECT f_int2, COUNT(*) AS `size`,"
+ " TUMBLE_START(f_timestamp, INTERVAL '1' HOUR) AS `window_start`"
+ " FROM TABLE_A"
+ " GROUP BY f_int2, TUMBLE(f_timestamp, INTERVAL '1' HOUR)";
- PCollection<BeamSqlRow> result =
- PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input)
+ PCollection<BeamRecord> result =
+ PCollectionTuple.of(new TupleTag<BeamRecord>("TABLE_A"), input)
.apply("testTumbleWindow", BeamSql.query(sql));
- BeamSqlRowType resultType = BeamSqlRowType.create(
+ BeamSqlRecordType resultType = BeamSqlRecordType.create(
Arrays.asList("f_int2", "size", "window_start"),
Arrays.asList(Types.INTEGER, Types.BIGINT, Types.TIMESTAMP));
- BeamSqlRow record1 = new BeamSqlRow(resultType);
+ BeamRecord record1 = new BeamRecord(resultType);
record1.addField("f_int2", 0);
record1.addField("size", 3L);
record1.addField("window_start", FORMAT.parse("2017-01-01 01:00:00"));
record1.setWindowStart(new Instant(FORMAT.parse("2017-01-01 01:00:00").getTime()));
record1.setWindowEnd(new Instant(FORMAT.parse("2017-01-01 02:00:00").getTime()));
- BeamSqlRow record2 = new BeamSqlRow(resultType);
+ BeamRecord record2 = new BeamRecord(resultType);
record2.addField("f_int2", 0);
record2.addField("size", 1L);
record2.addField("window_start", FORMAT.parse("2017-01-01 02:00:00"));
@@ -255,40 +255,40 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase {
runHopWindow(unboundedInput1);
}
- private void runHopWindow(PCollection<BeamSqlRow> input) throws Exception {
+ private void runHopWindow(PCollection<BeamRecord> input) throws Exception {
String sql = "SELECT f_int2, COUNT(*) AS `size`,"
+ " HOP_START(f_timestamp, INTERVAL '1' HOUR, INTERVAL '30' MINUTE) AS `window_start`"
+ " FROM PCOLLECTION"
+ " GROUP BY f_int2, HOP(f_timestamp, INTERVAL '1' HOUR, INTERVAL '30' MINUTE)";
- PCollection<BeamSqlRow> result =
+ PCollection<BeamRecord> result =
input.apply("testHopWindow", BeamSql.simpleQuery(sql));
- BeamSqlRowType resultType = BeamSqlRowType.create(
+ BeamSqlRecordType resultType = BeamSqlRecordType.create(
Arrays.asList("f_int2", "size", "window_start"),
Arrays.asList(Types.INTEGER, Types.BIGINT, Types.TIMESTAMP));
- BeamSqlRow record1 = new BeamSqlRow(resultType);
+ BeamRecord record1 = new BeamRecord(resultType);
record1.addField("f_int2", 0);
record1.addField("size", 3L);
record1.addField("window_start", FORMAT.parse("2017-01-01 00:30:00"));
record1.setWindowStart(new Instant(FORMAT.parse("2017-01-01 00:30:00").getTime()));
record1.setWindowEnd(new Instant(FORMAT.parse("2017-01-01 01:30:00").getTime()));
- BeamSqlRow record2 = new BeamSqlRow(resultType);
+ BeamRecord record2 = new BeamRecord(resultType);
record2.addField("f_int2", 0);
record2.addField("size", 3L);
record2.addField("window_start", FORMAT.parse("2017-01-01 01:00:00"));
record2.setWindowStart(new Instant(FORMAT.parse("2017-01-01 01:00:00").getTime()));
record2.setWindowEnd(new Instant(FORMAT.parse("2017-01-01 02:00:00").getTime()));
- BeamSqlRow record3 = new BeamSqlRow(resultType);
+ BeamRecord record3 = new BeamRecord(resultType);
record3.addField("f_int2", 0);
record3.addField("size", 1L);
record3.addField("window_start", FORMAT.parse("2017-01-01 01:30:00"));
record3.setWindowStart(new Instant(FORMAT.parse("2017-01-01 01:30:00").getTime()));
record3.setWindowEnd(new Instant(FORMAT.parse("2017-01-01 02:30:00").getTime()));
- BeamSqlRow record4 = new BeamSqlRow(resultType);
+ BeamRecord record4 = new BeamRecord(resultType);
record4.addField("f_int2", 0);
record4.addField("size", 1L);
record4.addField("window_start", FORMAT.parse("2017-01-01 02:00:00"));
@@ -316,27 +316,27 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase {
runSessionWindow(unboundedInput1);
}
- private void runSessionWindow(PCollection<BeamSqlRow> input) throws Exception {
+ private void runSessionWindow(PCollection<BeamRecord> input) throws Exception {
String sql = "SELECT f_int2, COUNT(*) AS `size`,"
+ " SESSION_START(f_timestamp, INTERVAL '5' MINUTE) AS `window_start`"
+ " FROM TABLE_A"
+ " GROUP BY f_int2, SESSION(f_timestamp, INTERVAL '5' MINUTE)";
- PCollection<BeamSqlRow> result =
- PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input)
+ PCollection<BeamRecord> result =
+ PCollectionTuple.of(new TupleTag<BeamRecord>("TABLE_A"), input)
.apply("testSessionWindow", BeamSql.query(sql));
- BeamSqlRowType resultType = BeamSqlRowType.create(
+ BeamSqlRecordType resultType = BeamSqlRecordType.create(
Arrays.asList("f_int2", "size", "window_start"),
Arrays.asList(Types.INTEGER, Types.BIGINT, Types.TIMESTAMP));
- BeamSqlRow record1 = new BeamSqlRow(resultType);
+ BeamRecord record1 = new BeamRecord(resultType);
record1.addField("f_int2", 0);
record1.addField("size", 3L);
record1.addField("window_start", FORMAT.parse("2017-01-01 01:01:03"));
record1.setWindowStart(new Instant(FORMAT.parse("2017-01-01 01:01:03").getTime()));
record1.setWindowEnd(new Instant(FORMAT.parse("2017-01-01 01:11:03").getTime()));
- BeamSqlRow record2 = new BeamSqlRow(resultType);
+ BeamRecord record2 = new BeamRecord(resultType);
record2.addField("f_int2", 0);
record2.addField("size", 1L);
record2.addField("window_start", FORMAT.parse("2017-01-01 02:04:03"));
@@ -357,8 +357,8 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase {
String sql = "SELECT f_int2, COUNT(*) AS `size` FROM TABLE_A "
+ "GROUP BY f_int2, TUMBLE(f_long, INTERVAL '1' HOUR)";
- PCollection<BeamSqlRow> result =
- PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), boundedInput1)
+ PCollection<BeamRecord> result =
+ PCollectionTuple.of(new TupleTag<BeamRecord>("TABLE_A"), boundedInput1)
.apply("testWindowOnNonTimestampField", BeamSql.query(sql));
pipeline.run().waitUntilFinish();
@@ -372,7 +372,7 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase {
String sql = "SELECT f_int2, COUNT(DISTINCT *) AS `size` FROM PCOLLECTION GROUP BY f_int2";
- PCollection<BeamSqlRow> result =
+ PCollection<BeamRecord> result =
boundedInput1.apply("testUnsupportedDistinct", BeamSql.simpleQuery(sql));
pipeline.run().waitUntilFinish();
http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslBase.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslBase.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslBase.java
index 0c1ce1c..d09caf0 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslBase.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslBase.java
@@ -25,12 +25,11 @@ import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowCoder;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.BeamRecord;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.joda.time.Instant;
@@ -53,20 +52,20 @@ public class BeamSqlDslBase {
@Rule
public ExpectedException exceptions = ExpectedException.none();
- public static BeamSqlRowType rowTypeInTableA;
- public static List<BeamSqlRow> recordsInTableA;
+ public static BeamSqlRecordType rowTypeInTableA;
+ public static List<BeamRecord> recordsInTableA;
//bounded PCollections
- public PCollection<BeamSqlRow> boundedInput1;
- public PCollection<BeamSqlRow> boundedInput2;
+ public PCollection<BeamRecord> boundedInput1;
+ public PCollection<BeamRecord> boundedInput2;
//unbounded PCollections
- public PCollection<BeamSqlRow> unboundedInput1;
- public PCollection<BeamSqlRow> unboundedInput2;
+ public PCollection<BeamRecord> unboundedInput1;
+ public PCollection<BeamRecord> unboundedInput2;
@BeforeClass
public static void prepareClass() throws ParseException {
- rowTypeInTableA = BeamSqlRowType.create(
+ rowTypeInTableA = BeamSqlRecordType.create(
Arrays.asList("f_int", "f_long", "f_short", "f_byte", "f_float", "f_double", "f_string",
"f_timestamp", "f_int2", "f_decimal"),
Arrays.asList(Types.INTEGER, Types.BIGINT, Types.SMALLINT, Types.TINYINT, Types.FLOAT,
@@ -78,20 +77,20 @@ public class BeamSqlDslBase {
@Before
public void preparePCollections(){
boundedInput1 = PBegin.in(pipeline).apply("boundedInput1",
- Create.of(recordsInTableA).withCoder(new BeamSqlRowCoder(rowTypeInTableA)));
+ Create.of(recordsInTableA).withCoder(rowTypeInTableA.getRecordCoder()));
boundedInput2 = PBegin.in(pipeline).apply("boundedInput2",
- Create.of(recordsInTableA.get(0)).withCoder(new BeamSqlRowCoder(rowTypeInTableA)));
+ Create.of(recordsInTableA.get(0)).withCoder(rowTypeInTableA.getRecordCoder()));
unboundedInput1 = prepareUnboundedPCollection1();
unboundedInput2 = prepareUnboundedPCollection2();
}
- private PCollection<BeamSqlRow> prepareUnboundedPCollection1() {
- TestStream.Builder<BeamSqlRow> values = TestStream
- .create(new BeamSqlRowCoder(rowTypeInTableA));
+ private PCollection<BeamRecord> prepareUnboundedPCollection1() {
+ TestStream.Builder<BeamRecord> values = TestStream
+ .create(rowTypeInTableA.getRecordCoder());
- for (BeamSqlRow row : recordsInTableA) {
+ for (BeamRecord row : recordsInTableA) {
values = values.advanceWatermarkTo(new Instant(row.getDate("f_timestamp")));
values = values.addElements(row);
}
@@ -99,21 +98,21 @@ public class BeamSqlDslBase {
return PBegin.in(pipeline).apply("unboundedInput1", values.advanceWatermarkToInfinity());
}
- private PCollection<BeamSqlRow> prepareUnboundedPCollection2() {
- TestStream.Builder<BeamSqlRow> values = TestStream
- .create(new BeamSqlRowCoder(rowTypeInTableA));
+ private PCollection<BeamRecord> prepareUnboundedPCollection2() {
+ TestStream.Builder<BeamRecord> values = TestStream
+ .create(rowTypeInTableA.getRecordCoder());
- BeamSqlRow row = recordsInTableA.get(0);
+ BeamRecord row = recordsInTableA.get(0);
values = values.advanceWatermarkTo(new Instant(row.getDate("f_timestamp")));
values = values.addElements(row);
return PBegin.in(pipeline).apply("unboundedInput2", values.advanceWatermarkToInfinity());
}
- private static List<BeamSqlRow> prepareInputRowsInTableA() throws ParseException{
- List<BeamSqlRow> rows = new ArrayList<>();
+ private static List<BeamRecord> prepareInputRowsInTableA() throws ParseException{
+ List<BeamRecord> rows = new ArrayList<>();
- BeamSqlRow row1 = new BeamSqlRow(rowTypeInTableA);
+ BeamRecord row1 = new BeamRecord(rowTypeInTableA);
row1.addField(0, 1);
row1.addField(1, 1000L);
row1.addField(2, Short.valueOf("1"));
@@ -126,7 +125,7 @@ public class BeamSqlDslBase {
row1.addField(9, new BigDecimal(1));
rows.add(row1);
- BeamSqlRow row2 = new BeamSqlRow(rowTypeInTableA);
+ BeamRecord row2 = new BeamRecord(rowTypeInTableA);
row2.addField(0, 2);
row2.addField(1, 2000L);
row2.addField(2, Short.valueOf("2"));
@@ -139,7 +138,7 @@ public class BeamSqlDslBase {
row2.addField(9, new BigDecimal(2));
rows.add(row2);
- BeamSqlRow row3 = new BeamSqlRow(rowTypeInTableA);
+ BeamRecord row3 = new BeamRecord(rowTypeInTableA);
row3.addField(0, 3);
row3.addField(1, 3000L);
row3.addField(2, Short.valueOf("3"));
@@ -152,7 +151,7 @@ public class BeamSqlDslBase {
row3.addField(9, new BigDecimal(3));
rows.add(row3);
- BeamSqlRow row4 = new BeamSqlRow(rowTypeInTableA);
+ BeamRecord row4 = new BeamRecord(rowTypeInTableA);
row4.addField(0, 4);
row4.addField(1, 4000L);
row4.addField(2, Short.valueOf("4"));
http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslFilterTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslFilterTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslFilterTest.java
index 16b6426..e1d463b 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslFilterTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslFilterTest.java
@@ -17,8 +17,8 @@
*/
package org.apache.beam.sdk.extensions.sql;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.values.BeamRecord;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
@@ -44,10 +44,10 @@ public class BeamSqlDslFilterTest extends BeamSqlDslBase {
runSingleFilter(unboundedInput1);
}
- private void runSingleFilter(PCollection<BeamSqlRow> input) throws Exception {
+ private void runSingleFilter(PCollection<BeamRecord> input) throws Exception {
String sql = "SELECT * FROM PCOLLECTION WHERE f_int = 1";
- PCollection<BeamSqlRow> result =
+ PCollection<BeamRecord> result =
input.apply("testSingleFilter", BeamSql.simpleQuery(sql));
PAssert.that(result).containsInAnyOrder(recordsInTableA.get(0));
@@ -71,12 +71,12 @@ public class BeamSqlDslFilterTest extends BeamSqlDslBase {
runCompositeFilter(unboundedInput1);
}
- private void runCompositeFilter(PCollection<BeamSqlRow> input) throws Exception {
+ private void runCompositeFilter(PCollection<BeamRecord> input) throws Exception {
String sql = "SELECT * FROM TABLE_A"
+ " WHERE f_int > 1 AND (f_long < 3000 OR f_string = 'string_row3')";
- PCollection<BeamSqlRow> result =
- PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input)
+ PCollection<BeamRecord> result =
+ PCollectionTuple.of(new TupleTag<BeamRecord>("TABLE_A"), input)
.apply("testCompositeFilter", BeamSql.query(sql));
PAssert.that(result).containsInAnyOrder(recordsInTableA.get(1), recordsInTableA.get(2));
@@ -100,11 +100,11 @@ public class BeamSqlDslFilterTest extends BeamSqlDslBase {
runNoReturnFilter(unboundedInput1);
}
- private void runNoReturnFilter(PCollection<BeamSqlRow> input) throws Exception {
+ private void runNoReturnFilter(PCollection<BeamRecord> input) throws Exception {
String sql = "SELECT * FROM TABLE_A WHERE f_int < 1";
- PCollection<BeamSqlRow> result =
- PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input)
+ PCollection<BeamRecord> result =
+ PCollectionTuple.of(new TupleTag<BeamRecord>("TABLE_A"), input)
.apply("testNoReturnFilter", BeamSql.query(sql));
PAssert.that(result).empty();
@@ -120,8 +120,8 @@ public class BeamSqlDslFilterTest extends BeamSqlDslBase {
String sql = "SELECT * FROM TABLE_B WHERE f_int < 1";
- PCollection<BeamSqlRow> result =
- PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), boundedInput1)
+ PCollection<BeamRecord> result =
+ PCollectionTuple.of(new TupleTag<BeamRecord>("TABLE_A"), boundedInput1)
.apply("testFromInvalidTableName1", BeamSql.query(sql));
pipeline.run().waitUntilFinish();
@@ -135,7 +135,7 @@ public class BeamSqlDslFilterTest extends BeamSqlDslBase {
String sql = "SELECT * FROM PCOLLECTION_NA";
- PCollection<BeamSqlRow> result = boundedInput1.apply(BeamSql.simpleQuery(sql));
+ PCollection<BeamRecord> result = boundedInput1.apply(BeamSql.simpleQuery(sql));
pipeline.run().waitUntilFinish();
}
@@ -148,7 +148,7 @@ public class BeamSqlDslFilterTest extends BeamSqlDslBase {
String sql = "SELECT * FROM PCOLLECTION WHERE f_int_na = 0";
- PCollection<BeamSqlRow> result = boundedInput1.apply(BeamSql.simpleQuery(sql));
+ PCollection<BeamRecord> result = boundedInput1.apply(BeamSql.simpleQuery(sql));
pipeline.run().waitUntilFinish();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslJoinTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslJoinTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslJoinTest.java
index d75af9b..d5d0a24 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslJoinTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslJoinTest.java
@@ -23,11 +23,11 @@ import static org.apache.beam.sdk.extensions.sql.impl.rel.BeamJoinRelBoundedVsBo
import java.sql.Types;
import java.util.Arrays;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowCoder;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType;
+import org.apache.beam.sdk.coders.BeamRecordCoder;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.values.BeamRecord;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
@@ -41,8 +41,8 @@ public class BeamSqlDslJoinTest {
@Rule
public final TestPipeline pipeline = TestPipeline.create();
- private static final BeamSqlRowType SOURCE_RECORD_TYPE =
- BeamSqlRowType.create(
+ private static final BeamSqlRecordType SOURCE_RECORD_TYPE =
+ BeamSqlRecordType.create(
Arrays.asList(
"order_id", "site_id", "price"
),
@@ -51,11 +51,10 @@ public class BeamSqlDslJoinTest {
)
);
- private static final BeamSqlRowCoder SOURCE_CODER =
- new BeamSqlRowCoder(SOURCE_RECORD_TYPE);
+ private static final BeamRecordCoder SOURCE_CODER = SOURCE_RECORD_TYPE.getRecordCoder();
- private static final BeamSqlRowType RESULT_RECORD_TYPE =
- BeamSqlRowType.create(
+ private static final BeamSqlRecordType RESULT_RECORD_TYPE =
+ BeamSqlRecordType.create(
Arrays.asList(
"order_id", "site_id", "price", "order_id0", "site_id0", "price0"
),
@@ -65,8 +64,7 @@ public class BeamSqlDslJoinTest {
)
);
- private static final BeamSqlRowCoder RESULT_CODER =
- new BeamSqlRowCoder(RESULT_RECORD_TYPE);
+ private static final BeamRecordCoder RESULT_CODER = RESULT_RECORD_TYPE.getRecordCoder();
@Test
public void testInnerJoin() throws Exception {
@@ -178,13 +176,13 @@ public class BeamSqlDslJoinTest {
pipeline.run();
}
- private PCollection<BeamSqlRow> queryFromOrderTables(String sql) {
+ private PCollection<BeamRecord> queryFromOrderTables(String sql) {
return PCollectionTuple
.of(
- new TupleTag<BeamSqlRow>("ORDER_DETAILS1"),
+ new TupleTag<BeamRecord>("ORDER_DETAILS1"),
ORDER_DETAILS1.buildIOReader(pipeline).setCoder(SOURCE_CODER)
)
- .and(new TupleTag<BeamSqlRow>("ORDER_DETAILS2"),
+ .and(new TupleTag<BeamRecord>("ORDER_DETAILS2"),
ORDER_DETAILS2.buildIOReader(pipeline).setCoder(SOURCE_CODER)
).apply("join", BeamSql.query(sql)).setCoder(RESULT_CODER);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/89109b8c/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslProjectTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslProjectTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslProjectTest.java
index 6468011..ddb90d5 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslProjectTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslProjectTest.java
@@ -19,9 +19,9 @@ package org.apache.beam.sdk.extensions.sql;
import java.sql.Types;
import java.util.Arrays;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRecordType;
import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.values.BeamRecord;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
@@ -47,10 +47,10 @@ public class BeamSqlDslProjectTest extends BeamSqlDslBase {
runSelectAll(unboundedInput2);
}
- private void runSelectAll(PCollection<BeamSqlRow> input) throws Exception {
+ private void runSelectAll(PCollection<BeamRecord> input) throws Exception {
String sql = "SELECT * FROM PCOLLECTION";
- PCollection<BeamSqlRow> result =
+ PCollection<BeamRecord> result =
input.apply("testSelectAll", BeamSql.simpleQuery(sql));
PAssert.that(result).containsInAnyOrder(recordsInTableA.get(0));
@@ -74,17 +74,17 @@ public class BeamSqlDslProjectTest extends BeamSqlDslBase {
runPartialFields(unboundedInput2);
}
- private void runPartialFields(PCollection<BeamSqlRow> input) throws Exception {
+ private void runPartialFields(PCollection<BeamRecord> input) throws Exception {
String sql = "SELECT f_int, f_long FROM TABLE_A";
- PCollection<BeamSqlRow> result =
- PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input)
+ PCollection<BeamRecord> result =
+ PCollectionTuple.of(new TupleTag<BeamRecord>("TABLE_A"), input)
.apply("testPartialFields", BeamSql.query(sql));
- BeamSqlRowType resultType = BeamSqlRowType.create(Arrays.asList("f_int", "f_long"),
+ BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("f_int", "f_long"),
Arrays.asList(Types.INTEGER, Types.BIGINT));
- BeamSqlRow record = new BeamSqlRow(resultType);
+ BeamRecord record = new BeamRecord(resultType);
record.addField("f_int", recordsInTableA.get(0).getFieldValue(0));
record.addField("f_long", recordsInTableA.get(0).getFieldValue(1));
@@ -109,29 +109,29 @@ public class BeamSqlDslProjectTest extends BeamSqlDslBase {
runPartialFieldsInMultipleRow(unboundedInput1);
}
- private void runPartialFieldsInMultipleRow(PCollection<BeamSqlRow> input) throws Exception {
+ private void runPartialFieldsInMultipleRow(PCollection<BeamRecord> input) throws Exception {
String sql = "SELECT f_int, f_long FROM TABLE_A";
- PCollection<BeamSqlRow> result =
- PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input)
+ PCollection<BeamRecord> result =
+ PCollectionTuple.of(new TupleTag<BeamRecord>("TABLE_A"), input)
.apply("testPartialFieldsInMultipleRow", BeamSql.query(sql));
- BeamSqlRowType resultType = BeamSqlRowType.create(Arrays.asList("f_int", "f_long"),
+ BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("f_int", "f_long"),
Arrays.asList(Types.INTEGER, Types.BIGINT));
- BeamSqlRow record1 = new BeamSqlRow(resultType);
+ BeamRecord record1 = new BeamRecord(resultType);
record1.addField("f_int", recordsInTableA.get(0).getFieldValue(0));
record1.addField("f_long", recordsInTableA.get(0).getFieldValue(1));
- BeamSqlRow record2 = new BeamSqlRow(resultType);
+ BeamRecord record2 = new BeamRecord(resultType);
record2.addField("f_int", recordsInTableA.get(1).getFieldValue(0));
record2.addField("f_long", recordsInTableA.get(1).getFieldValue(1));
- BeamSqlRow record3 = new BeamSqlRow(resultType);
+ BeamRecord record3 = new BeamRecord(resultType);
record3.addField("f_int", recordsInTableA.get(2).getFieldValue(0));
record3.addField("f_long", recordsInTableA.get(2).getFieldValue(1));
- BeamSqlRow record4 = new BeamSqlRow(resultType);
+ BeamRecord record4 = new BeamRecord(resultType);
record4.addField("f_int", recordsInTableA.get(3).getFieldValue(0));
record4.addField("f_long", recordsInTableA.get(3).getFieldValue(1));
@@ -156,29 +156,29 @@ public class BeamSqlDslProjectTest extends BeamSqlDslBase {
runPartialFieldsInRows(unboundedInput1);
}
- private void runPartialFieldsInRows(PCollection<BeamSqlRow> input) throws Exception {
+ private void runPartialFieldsInRows(PCollection<BeamRecord> input) throws Exception {
String sql = "SELECT f_int, f_long FROM TABLE_A";
- PCollection<BeamSqlRow> result =
- PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input)
+ PCollection<BeamRecord> result =
+ PCollectionTuple.of(new TupleTag<BeamRecord>("TABLE_A"), input)
.apply("testPartialFieldsInRows", BeamSql.query(sql));
- BeamSqlRowType resultType = BeamSqlRowType.create(Arrays.asList("f_int", "f_long"),
+ BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("f_int", "f_long"),
Arrays.asList(Types.INTEGER, Types.BIGINT));
- BeamSqlRow record1 = new BeamSqlRow(resultType);
+ BeamRecord record1 = new BeamRecord(resultType);
record1.addField("f_int", recordsInTableA.get(0).getFieldValue(0));
record1.addField("f_long", recordsInTableA.get(0).getFieldValue(1));
- BeamSqlRow record2 = new BeamSqlRow(resultType);
+ BeamRecord record2 = new BeamRecord(resultType);
record2.addField("f_int", recordsInTableA.get(1).getFieldValue(0));
record2.addField("f_long", recordsInTableA.get(1).getFieldValue(1));
- BeamSqlRow record3 = new BeamSqlRow(resultType);
+ BeamRecord record3 = new BeamRecord(resultType);
record3.addField("f_int", recordsInTableA.get(2).getFieldValue(0));
record3.addField("f_long", recordsInTableA.get(2).getFieldValue(1));
- BeamSqlRow record4 = new BeamSqlRow(resultType);
+ BeamRecord record4 = new BeamRecord(resultType);
record4.addField("f_int", recordsInTableA.get(3).getFieldValue(0));
record4.addField("f_long", recordsInTableA.get(3).getFieldValue(1));
@@ -203,17 +203,17 @@ public class BeamSqlDslProjectTest extends BeamSqlDslBase {
runLiteralField(unboundedInput2);
}
- public void runLiteralField(PCollection<BeamSqlRow> input) throws Exception {
+ public void runLiteralField(PCollection<BeamRecord> input) throws Exception {
String sql = "SELECT 1 as literal_field FROM TABLE_A";
- PCollection<BeamSqlRow> result =
- PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input)
+ PCollection<BeamRecord> result =
+ PCollectionTuple.of(new TupleTag<BeamRecord>("TABLE_A"), input)
.apply("testLiteralField", BeamSql.query(sql));
- BeamSqlRowType resultType = BeamSqlRowType.create(Arrays.asList("literal_field"),
+ BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("literal_field"),
Arrays.asList(Types.INTEGER));
- BeamSqlRow record = new BeamSqlRow(resultType);
+ BeamRecord record = new BeamRecord(resultType);
record.addField("literal_field", 1);
PAssert.that(result).containsInAnyOrder(record);
@@ -229,8 +229,8 @@ public class BeamSqlDslProjectTest extends BeamSqlDslBase {
String sql = "SELECT f_int_na FROM TABLE_A";
- PCollection<BeamSqlRow> result =
- PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), boundedInput1)
+ PCollection<BeamRecord> result =
+ PCollectionTuple.of(new TupleTag<BeamRecord>("TABLE_A"), boundedInput1)
.apply("testProjectUnknownField", BeamSql.query(sql));
pipeline.run().waitUntilFinish();