You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jb...@apache.org on 2017/05/24 09:53:00 UTC
[1/2] beam git commit: [BEAM-2310] Support encoding/decoding of TIME
and new DECIMAL data type
Repository: beam
Updated Branches:
refs/heads/DSL_SQL db982cfe1 -> dedabff1f
[BEAM-2310] Support encoding/decoding of TIME and new DECIMAL data type
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/23a037e8
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/23a037e8
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/23a037e8
Branch: refs/heads/DSL_SQL
Commit: 23a037e8fe98aeab176d3aceea94df02536f185c
Parents: db982cf
Author: James Xu <xu...@gmail.com>
Authored: Wed May 17 22:48:00 2017 +0800
Committer: Jean-Baptiste Onofré <jb...@apache.org>
Committed: Wed May 24 11:51:45 2017 +0200
----------------------------------------------------------------------
.../apache/beam/dsls/sql/schema/BeamSQLRow.java | 268 ++++++++++---------
.../beam/dsls/sql/schema/BeamSqlRowCoder.java | 23 +-
.../dsls/sql/schema/BeamSqlRowCoderTest.java | 37 ++-
3 files changed, 193 insertions(+), 135 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/23a037e8/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRow.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRow.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRow.java
index bc75eb1..ca045c8 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRow.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRow.java
@@ -18,6 +18,7 @@
package org.apache.beam.dsls.sql.schema;
import java.io.Serializable;
+import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Date;
import java.util.GregorianCalendar;
@@ -84,63 +85,69 @@ public class BeamSQLRow implements Serializable {
SqlTypeName fieldType = dataType.getFieldsType().get(index);
switch (fieldType) {
- case INTEGER:
- if (!(fieldValue instanceof Integer)) {
- throw new InvalidFieldException(
- String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
- }
- break;
- case SMALLINT:
- if (!(fieldValue instanceof Short)) {
- throw new InvalidFieldException(
- String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
- }
- break;
- case TINYINT:
- if (!(fieldValue instanceof Byte)) {
- throw new InvalidFieldException(
- String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
- }
- break;
- case DOUBLE:
- if (!(fieldValue instanceof Double)) {
- throw new InvalidFieldException(
- String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
- }
- break;
- case BIGINT:
- if (!(fieldValue instanceof Long)) {
- throw new InvalidFieldException(
- String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
- }
- break;
- case FLOAT:
- if (!(fieldValue instanceof Float)) {
- throw new InvalidFieldException(
- String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
- }
- break;
- case VARCHAR:
- case CHAR:
- if (!(fieldValue instanceof String)) {
- throw new InvalidFieldException(
- String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
- }
- break;
- case TIME:
- if (!(fieldValue instanceof GregorianCalendar)) {
- throw new InvalidFieldException(
- String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
- }
- break;
- case TIMESTAMP:
- if (!(fieldValue instanceof Date)) {
- throw new InvalidFieldException(
- String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
- }
- break;
- default:
- throw new UnsupportedDataTypeException(fieldType);
+ case INTEGER:
+ if (!(fieldValue instanceof Integer)) {
+ throw new InvalidFieldException(
+ String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
+ }
+ break;
+ case SMALLINT:
+ if (!(fieldValue instanceof Short)) {
+ throw new InvalidFieldException(
+ String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
+ }
+ break;
+ case TINYINT:
+ if (!(fieldValue instanceof Byte)) {
+ throw new InvalidFieldException(
+ String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
+ }
+ break;
+ case DOUBLE:
+ if (!(fieldValue instanceof Double)) {
+ throw new InvalidFieldException(
+ String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
+ }
+ break;
+ case BIGINT:
+ if (!(fieldValue instanceof Long)) {
+ throw new InvalidFieldException(
+ String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
+ }
+ break;
+ case FLOAT:
+ if (!(fieldValue instanceof Float)) {
+ throw new InvalidFieldException(
+ String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
+ }
+ break;
+ case DECIMAL:
+ if (!(fieldValue instanceof BigDecimal)) {
+ throw new InvalidFieldException(
+ String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
+ }
+ break;
+ case VARCHAR:
+ case CHAR:
+ if (!(fieldValue instanceof String)) {
+ throw new InvalidFieldException(
+ String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
+ }
+ break;
+ case TIME:
+ if (!(fieldValue instanceof GregorianCalendar)) {
+ throw new InvalidFieldException(
+ String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
+ }
+ break;
+ case TIMESTAMP:
+ if (!(fieldValue instanceof Date)) {
+ throw new InvalidFieldException(
+ String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
+ }
+ break;
+ default:
+ throw new UnsupportedDataTypeException(fieldType);
}
dataValues.set(index, fieldValue);
}
@@ -177,6 +184,14 @@ public class BeamSQLRow implements Serializable {
return (Date) getFieldValue(idx);
}
+ public GregorianCalendar getGregorianCalendar(int idx) {
+ return (GregorianCalendar) getFieldValue(idx);
+ }
+
+ public BigDecimal getBigDecimal(int idx) {
+ return (BigDecimal) getFieldValue(idx);
+ }
+
public Object getFieldValue(String fieldName) {
return getFieldValue(dataType.getFieldsName().indexOf(fieldName));
}
@@ -190,72 +205,79 @@ public class BeamSQLRow implements Serializable {
SqlTypeName fieldType = dataType.getFieldsType().get(fieldIdx);
switch (fieldType) {
- case INTEGER:
- if (!(fieldValue instanceof Integer)) {
- throw new InvalidFieldException(
- String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
- } else {
- return fieldValue;
- }
- case SMALLINT:
- if (!(fieldValue instanceof Short)) {
- throw new InvalidFieldException(
- String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
- } else {
- return fieldValue;
- }
- case TINYINT:
- if (!(fieldValue instanceof Byte)) {
- throw new InvalidFieldException(
- String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
- } else {
- return fieldValue;
- }
- case DOUBLE:
- if (!(fieldValue instanceof Double)) {
- throw new InvalidFieldException(
- String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
- } else {
- return fieldValue;
- }
- case BIGINT:
- if (!(fieldValue instanceof Long)) {
- throw new InvalidFieldException(
- String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
- } else {
- return fieldValue;
- }
- case FLOAT:
- if (!(fieldValue instanceof Float)) {
- throw new InvalidFieldException(
- String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
- } else {
- return fieldValue;
- }
- case VARCHAR:
- case CHAR:
- if (!(fieldValue instanceof String)) {
- throw new InvalidFieldException(
- String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
- } else {
- return fieldValue;
- }
- case TIME:
- if (!(fieldValue instanceof GregorianCalendar)) {
- throw new InvalidFieldException(
- String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
- } else {
- return fieldValue;
- }
- case TIMESTAMP:
- if (!(fieldValue instanceof Date)) {
- throw new InvalidFieldException(
- String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
- } else {
- return fieldValue;
- }
- default:
- throw new UnsupportedDataTypeException(fieldType);
+ case INTEGER:
+ if (!(fieldValue instanceof Integer)) {
+ throw new InvalidFieldException(
+ String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
+ } else {
+ return fieldValue;
+ }
+ case SMALLINT:
+ if (!(fieldValue instanceof Short)) {
+ throw new InvalidFieldException(
+ String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
+ } else {
+ return fieldValue;
+ }
+ case TINYINT:
+ if (!(fieldValue instanceof Byte)) {
+ throw new InvalidFieldException(
+ String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
+ } else {
+ return fieldValue;
+ }
+ case DOUBLE:
+ if (!(fieldValue instanceof Double)) {
+ throw new InvalidFieldException(
+ String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
+ } else {
+ return fieldValue;
+ }
+ case DECIMAL:
+ if (!(fieldValue instanceof BigDecimal)) {
+ throw new InvalidFieldException(
+ String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
+ } else {
+ return fieldValue;
+ }
+ case BIGINT:
+ if (!(fieldValue instanceof Long)) {
+ throw new InvalidFieldException(
+ String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
+ } else {
+ return fieldValue;
+ }
+ case FLOAT:
+ if (!(fieldValue instanceof Float)) {
+ throw new InvalidFieldException(
+ String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
+ } else {
+ return fieldValue;
+ }
+ case VARCHAR:
+ case CHAR:
+ if (!(fieldValue instanceof String)) {
+ throw new InvalidFieldException(
+ String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
+ } else {
+ return fieldValue;
+ }
+ case TIME:
+ if (!(fieldValue instanceof GregorianCalendar)) {
+ throw new InvalidFieldException(
+ String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
+ } else {
+ return fieldValue;
+ }
+ case TIMESTAMP:
+ if (!(fieldValue instanceof Date)) {
+ throw new InvalidFieldException(
+ String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
+ } else {
+ return fieldValue;
+ }
+ default:
+ throw new UnsupportedDataTypeException(fieldType);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/23a037e8/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java
index bfcb487..0bfe467 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java
@@ -21,7 +21,10 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Date;
+import java.util.GregorianCalendar;
import java.util.List;
+
+import org.apache.beam.sdk.coders.BigDecimalCoder;
import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
import org.apache.beam.sdk.coders.BigEndianLongCoder;
import org.apache.beam.sdk.coders.Coder;
@@ -46,6 +49,7 @@ public class BeamSqlRowCoder extends StandardCoder<BeamSQLRow>{
private static final BigEndianLongCoder longCoder = BigEndianLongCoder.of();
private static final DoubleCoder doubleCoder = DoubleCoder.of();
private static final InstantCoder instantCoder = InstantCoder.of();
+ private static final BigDecimalCoder bigDecimalCoder = BigDecimalCoder.of();
private static final BeamSqlRowCoder INSTANCE = new BeamSqlRowCoder();
private BeamSqlRowCoder(){}
@@ -81,6 +85,9 @@ public class BeamSqlRowCoder extends StandardCoder<BeamSQLRow>{
case FLOAT:
doubleCoder.encode((double) value.getFloat(idx), outStream, context.nested());
break;
+ case DECIMAL:
+ bigDecimalCoder.encode(value.getBigDecimal(idx), outStream, context.nested());
+ break;
case BIGINT:
longCoder.encode(value.getLong(idx), outStream, context.nested());
break;
@@ -88,8 +95,12 @@ public class BeamSqlRowCoder extends StandardCoder<BeamSQLRow>{
case CHAR:
stringCoder.encode(value.getString(idx), outStream, context.nested());
break;
+ case TIME:
+ longCoder.encode(value.getGregorianCalendar(idx).getTime().getTime(),
+ outStream, context.nested());
+ break;
case TIMESTAMP:
- longCoder.encode(value.getDate(idx).getTime(), outStream, context);
+ longCoder.encode(value.getDate(idx).getTime(), outStream, context.nested());
break;
default:
@@ -134,12 +145,20 @@ public class BeamSqlRowCoder extends StandardCoder<BeamSQLRow>{
case BIGINT:
record.addField(idx, longCoder.decode(inStream, context.nested()));
break;
+ case DECIMAL:
+ record.addField(idx, bigDecimalCoder.decode(inStream, context.nested()));
+ break;
case VARCHAR:
case CHAR:
record.addField(idx, stringCoder.decode(inStream, context.nested()));
break;
+ case TIME:
+ GregorianCalendar calendar = new GregorianCalendar();
+ calendar.setTime(new Date(longCoder.decode(inStream, context.nested())));
+ record.addField(idx, calendar);
+ break;
case TIMESTAMP:
- record.addField(idx, new Date(longCoder.decode(inStream, context)));
+ record.addField(idx, new Date(longCoder.decode(inStream, context.nested())));
break;
default:
http://git-wip-us.apache.org/repos/asf/beam/blob/23a037e8/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoderTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoderTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoderTest.java
index f207794..bc6343b 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoderTest.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoderTest.java
@@ -18,6 +18,10 @@
package org.apache.beam.dsls.sql.schema;
+import java.math.BigDecimal;
+import java.util.Date;
+import java.util.GregorianCalendar;
+
import org.apache.beam.sdk.testing.CoderProperties;
import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
import org.apache.calcite.rel.type.RelDataType;
@@ -38,11 +42,16 @@ public class BeamSqlRowCoderTest {
@Override
public RelDataType apply(RelDataTypeFactory a0) {
return a0.builder()
- .add("id", SqlTypeName.INTEGER)
- .add("order_id", SqlTypeName.BIGINT)
- .add("price", SqlTypeName.FLOAT)
- .add("amount", SqlTypeName.DOUBLE)
- .add("user_name", SqlTypeName.VARCHAR)
+ .add("col_tinyint", SqlTypeName.TINYINT)
+ .add("col_smallint", SqlTypeName.SMALLINT)
+ .add("col_integer", SqlTypeName.INTEGER)
+ .add("col_bigint", SqlTypeName.BIGINT)
+ .add("col_float", SqlTypeName.FLOAT)
+ .add("col_double", SqlTypeName.DOUBLE)
+ .add("col_decimal", SqlTypeName.DECIMAL)
+ .add("col_string_varchar", SqlTypeName.VARCHAR)
+ .add("col_time", SqlTypeName.TIME)
+ .add("col_timestamp", SqlTypeName.TIMESTAMP)
.build();
}
};
@@ -51,11 +60,19 @@ public class BeamSqlRowCoderTest {
protoRowType.apply(new JavaTypeFactoryImpl(
RelDataTypeSystem.DEFAULT)));
BeamSQLRow row = new BeamSQLRow(beamSQLRecordType);
- row.addField(0, 1);
- row.addField(1, 1L);
- row.addField(2, 1.1F);
- row.addField(3, 1.1);
- row.addField(4, "hello");
+ row.addField("col_tinyint", Byte.valueOf("1"));
+ row.addField("col_smallint", Short.valueOf("1"));
+ row.addField("col_integer", 1);
+ row.addField("col_bigint", 1L);
+ row.addField("col_float", 1.1F);
+ row.addField("col_double", 1.1);
+ row.addField("col_decimal", BigDecimal.ZERO);
+ row.addField("col_string_varchar", "hello");
+ GregorianCalendar calendar = new GregorianCalendar();
+ calendar.setTime(new Date());
+ row.addField("col_time", calendar);
+ row.addField("col_timestamp", new Date());
+
BeamSqlRowCoder coder = BeamSqlRowCoder.of();
CoderProperties.coderDecodeEncodeEqual(coder, row);
[2/2] beam git commit: [BEAM-2310] This closes #3168
Posted by jb...@apache.org.
[BEAM-2310] This closes #3168
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/dedabff1
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/dedabff1
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/dedabff1
Branch: refs/heads/DSL_SQL
Commit: dedabff1f5aa473a76ef2575466dda4fa6f5a205
Parents: db982cf 23a037e
Author: Jean-Baptiste Onofré <jb...@apache.org>
Authored: Wed May 24 11:52:54 2017 +0200
Committer: Jean-Baptiste Onofré <jb...@apache.org>
Committed: Wed May 24 11:52:54 2017 +0200
----------------------------------------------------------------------
.../apache/beam/dsls/sql/schema/BeamSQLRow.java | 268 ++++++++++---------
.../beam/dsls/sql/schema/BeamSqlRowCoder.java | 23 +-
.../dsls/sql/schema/BeamSqlRowCoderTest.java | 37 ++-
3 files changed, 193 insertions(+), 135 deletions(-)
----------------------------------------------------------------------