You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jb...@apache.org on 2017/05/24 09:53:00 UTC

[1/2] beam git commit: [BEAM-2310] Support encoding/decoding of TIME and new DECIMAL data type

Repository: beam
Updated Branches:
  refs/heads/DSL_SQL db982cfe1 -> dedabff1f


[BEAM-2310] Support encoding/decoding of TIME and new DECIMAL data type


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/23a037e8
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/23a037e8
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/23a037e8

Branch: refs/heads/DSL_SQL
Commit: 23a037e8fe98aeab176d3aceea94df02536f185c
Parents: db982cf
Author: James Xu <xu...@gmail.com>
Authored: Wed May 17 22:48:00 2017 +0800
Committer: Jean-Baptiste Onofré <jb...@apache.org>
Committed: Wed May 24 11:51:45 2017 +0200

----------------------------------------------------------------------
 .../apache/beam/dsls/sql/schema/BeamSQLRow.java | 268 ++++++++++---------
 .../beam/dsls/sql/schema/BeamSqlRowCoder.java   |  23 +-
 .../dsls/sql/schema/BeamSqlRowCoderTest.java    |  37 ++-
 3 files changed, 193 insertions(+), 135 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/23a037e8/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRow.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRow.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRow.java
index bc75eb1..ca045c8 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRow.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRow.java
@@ -18,6 +18,7 @@
 package org.apache.beam.dsls.sql.schema;
 
 import java.io.Serializable;
+import java.math.BigDecimal;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.GregorianCalendar;
@@ -84,63 +85,69 @@ public class BeamSQLRow implements Serializable {
 
     SqlTypeName fieldType = dataType.getFieldsType().get(index);
     switch (fieldType) {
-    case INTEGER:
-      if (!(fieldValue instanceof Integer)) {
-        throw new InvalidFieldException(
-            String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
-      }
-      break;
-    case SMALLINT:
-      if (!(fieldValue instanceof Short)) {
-        throw new InvalidFieldException(
-            String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
-      }
-      break;
-    case TINYINT:
-      if (!(fieldValue instanceof Byte)) {
-        throw new InvalidFieldException(
-            String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
-      }
-      break;
-    case DOUBLE:
-      if (!(fieldValue instanceof Double)) {
-        throw new InvalidFieldException(
-            String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
-      }
-      break;
-    case BIGINT:
-      if (!(fieldValue instanceof Long)) {
-        throw new InvalidFieldException(
-            String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
-      }
-      break;
-    case FLOAT:
-      if (!(fieldValue instanceof Float)) {
-        throw new InvalidFieldException(
-            String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
-      }
-      break;
-    case VARCHAR:
-    case CHAR:
-      if (!(fieldValue instanceof String)) {
-        throw new InvalidFieldException(
-            String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
-      }
-      break;
-    case TIME:
-      if (!(fieldValue instanceof GregorianCalendar)) {
-        throw new InvalidFieldException(
-            String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
-      }
-      break;
-    case TIMESTAMP:
-      if (!(fieldValue instanceof Date)) {
-        throw new InvalidFieldException(
-            String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
-      }
-      break;
-    default:
-      throw new UnsupportedDataTypeException(fieldType);
+      case INTEGER:
+        if (!(fieldValue instanceof Integer)) {
+          throw new InvalidFieldException(
+              String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
+        }
+        break;
+      case SMALLINT:
+        if (!(fieldValue instanceof Short)) {
+          throw new InvalidFieldException(
+              String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
+        }
+        break;
+      case TINYINT:
+        if (!(fieldValue instanceof Byte)) {
+          throw new InvalidFieldException(
+              String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
+        }
+        break;
+      case DOUBLE:
+        if (!(fieldValue instanceof Double)) {
+          throw new InvalidFieldException(
+              String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
+        }
+        break;
+      case BIGINT:
+        if (!(fieldValue instanceof Long)) {
+          throw new InvalidFieldException(
+              String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
+        }
+        break;
+      case FLOAT:
+        if (!(fieldValue instanceof Float)) {
+          throw new InvalidFieldException(
+              String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
+        }
+        break;
+      case DECIMAL:
+        if (!(fieldValue instanceof BigDecimal)) {
+          throw new InvalidFieldException(
+              String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
+        }
+        break;
+      case VARCHAR:
+      case CHAR:
+        if (!(fieldValue instanceof String)) {
+          throw new InvalidFieldException(
+              String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
+        }
+        break;
+      case TIME:
+        if (!(fieldValue instanceof GregorianCalendar)) {
+          throw new InvalidFieldException(
+              String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
+        }
+        break;
+      case TIMESTAMP:
+        if (!(fieldValue instanceof Date)) {
+          throw new InvalidFieldException(
+              String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
+        }
+        break;
+      default:
+        throw new UnsupportedDataTypeException(fieldType);
     }
     dataValues.set(index, fieldValue);
   }
@@ -177,6 +184,14 @@ public class BeamSQLRow implements Serializable {
     return (Date) getFieldValue(idx);
   }
 
+  public GregorianCalendar getGregorianCalendar(int idx) {
+    return (GregorianCalendar) getFieldValue(idx);
+  }
+
+  public BigDecimal getBigDecimal(int idx) {
+    return (BigDecimal) getFieldValue(idx);
+  }
+
   public Object getFieldValue(String fieldName) {
     return getFieldValue(dataType.getFieldsName().indexOf(fieldName));
   }
@@ -190,72 +205,79 @@ public class BeamSQLRow implements Serializable {
     SqlTypeName fieldType = dataType.getFieldsType().get(fieldIdx);
 
     switch (fieldType) {
-    case INTEGER:
-      if (!(fieldValue instanceof Integer)) {
-        throw new InvalidFieldException(
-            String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
-      } else {
-        return fieldValue;
-      }
-    case SMALLINT:
-      if (!(fieldValue instanceof Short)) {
-        throw new InvalidFieldException(
-            String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
-      } else {
-        return fieldValue;
-      }
-    case TINYINT:
-      if (!(fieldValue instanceof Byte)) {
-        throw new InvalidFieldException(
-            String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
-      } else {
-        return fieldValue;
-      }
-    case DOUBLE:
-      if (!(fieldValue instanceof Double)) {
-        throw new InvalidFieldException(
-            String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
-      } else {
-        return fieldValue;
-      }
-    case BIGINT:
-      if (!(fieldValue instanceof Long)) {
-        throw new InvalidFieldException(
-            String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
-      } else {
-        return fieldValue;
-      }
-    case FLOAT:
-      if (!(fieldValue instanceof Float)) {
-        throw new InvalidFieldException(
-            String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
-      } else {
-        return fieldValue;
-      }
-    case VARCHAR:
-    case CHAR:
-      if (!(fieldValue instanceof String)) {
-        throw new InvalidFieldException(
-            String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
-      } else {
-        return fieldValue;
-      }
-    case TIME:
-      if (!(fieldValue instanceof GregorianCalendar)) {
-        throw new InvalidFieldException(
-            String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
-      } else {
-        return fieldValue;
-      }
-    case TIMESTAMP:
-      if (!(fieldValue instanceof Date)) {
-        throw new InvalidFieldException(
-            String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
-      } else {
-        return fieldValue;
-      }
-    default:
-      throw new UnsupportedDataTypeException(fieldType);
+      case INTEGER:
+        if (!(fieldValue instanceof Integer)) {
+          throw new InvalidFieldException(
+              String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
+        } else {
+          return fieldValue;
+        }
+      case SMALLINT:
+        if (!(fieldValue instanceof Short)) {
+          throw new InvalidFieldException(
+              String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
+        } else {
+          return fieldValue;
+        }
+      case TINYINT:
+        if (!(fieldValue instanceof Byte)) {
+          throw new InvalidFieldException(
+              String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
+        } else {
+          return fieldValue;
+        }
+      case DOUBLE:
+        if (!(fieldValue instanceof Double)) {
+          throw new InvalidFieldException(
+              String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
+        } else {
+          return fieldValue;
+        }
+      case DECIMAL:
+        if (!(fieldValue instanceof BigDecimal)) {
+          throw new InvalidFieldException(
+              String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
+        } else {
+          return fieldValue;
+        }
+      case BIGINT:
+        if (!(fieldValue instanceof Long)) {
+          throw new InvalidFieldException(
+              String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
+        } else {
+          return fieldValue;
+        }
+      case FLOAT:
+        if (!(fieldValue instanceof Float)) {
+          throw new InvalidFieldException(
+              String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
+        } else {
+          return fieldValue;
+        }
+      case VARCHAR:
+      case CHAR:
+        if (!(fieldValue instanceof String)) {
+          throw new InvalidFieldException(
+              String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
+        } else {
+          return fieldValue;
+        }
+      case TIME:
+        if (!(fieldValue instanceof GregorianCalendar)) {
+          throw new InvalidFieldException(
+              String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
+        } else {
+          return fieldValue;
+        }
+      case TIMESTAMP:
+        if (!(fieldValue instanceof Date)) {
+          throw new InvalidFieldException(
+              String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
+        } else {
+          return fieldValue;
+        }
+      default:
+        throw new UnsupportedDataTypeException(fieldType);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/23a037e8/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java
index bfcb487..0bfe467 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java
@@ -21,7 +21,10 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.Date;
+import java.util.GregorianCalendar;
 import java.util.List;
+
+import org.apache.beam.sdk.coders.BigDecimalCoder;
 import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
 import org.apache.beam.sdk.coders.BigEndianLongCoder;
 import org.apache.beam.sdk.coders.Coder;
@@ -46,6 +49,7 @@ public class BeamSqlRowCoder extends StandardCoder<BeamSQLRow>{
   private static final BigEndianLongCoder longCoder = BigEndianLongCoder.of();
   private static final DoubleCoder doubleCoder = DoubleCoder.of();
   private static final InstantCoder instantCoder = InstantCoder.of();
+  private static final BigDecimalCoder bigDecimalCoder = BigDecimalCoder.of();
 
   private static final BeamSqlRowCoder INSTANCE = new BeamSqlRowCoder();
   private BeamSqlRowCoder(){}
@@ -81,6 +85,9 @@ public class BeamSqlRowCoder extends StandardCoder<BeamSQLRow>{
         case FLOAT:
           doubleCoder.encode((double) value.getFloat(idx), outStream, context.nested());
           break;
+        case DECIMAL:
+          bigDecimalCoder.encode(value.getBigDecimal(idx), outStream, context.nested());
+          break;
         case BIGINT:
           longCoder.encode(value.getLong(idx), outStream, context.nested());
           break;
@@ -88,8 +95,12 @@ public class BeamSqlRowCoder extends StandardCoder<BeamSQLRow>{
         case CHAR:
           stringCoder.encode(value.getString(idx), outStream, context.nested());
           break;
+        case TIME:
+          longCoder.encode(value.getGregorianCalendar(idx).getTime().getTime(),
+              outStream, context.nested());
+          break;
         case TIMESTAMP:
-          longCoder.encode(value.getDate(idx).getTime(), outStream, context);
+          longCoder.encode(value.getDate(idx).getTime(), outStream, context.nested());
           break;
 
         default:
@@ -134,12 +145,20 @@ public class BeamSqlRowCoder extends StandardCoder<BeamSQLRow>{
         case BIGINT:
           record.addField(idx, longCoder.decode(inStream, context.nested()));
           break;
+        case DECIMAL:
+          record.addField(idx, bigDecimalCoder.decode(inStream, context.nested()));
+          break;
         case VARCHAR:
         case CHAR:
           record.addField(idx, stringCoder.decode(inStream, context.nested()));
           break;
+        case TIME:
+          GregorianCalendar calendar = new GregorianCalendar();
+          calendar.setTime(new Date(longCoder.decode(inStream, context.nested())));
+          record.addField(idx, calendar);
+          break;
         case TIMESTAMP:
-          record.addField(idx, new Date(longCoder.decode(inStream, context)));
+          record.addField(idx, new Date(longCoder.decode(inStream, context.nested())));
           break;
 
         default:

http://git-wip-us.apache.org/repos/asf/beam/blob/23a037e8/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoderTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoderTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoderTest.java
index f207794..bc6343b 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoderTest.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoderTest.java
@@ -18,6 +18,10 @@
 
 package org.apache.beam.dsls.sql.schema;
 
+import java.math.BigDecimal;
+import java.util.Date;
+import java.util.GregorianCalendar;
+
 import org.apache.beam.sdk.testing.CoderProperties;
 import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
 import org.apache.calcite.rel.type.RelDataType;
@@ -38,11 +42,16 @@ public class BeamSqlRowCoderTest {
       @Override
       public RelDataType apply(RelDataTypeFactory a0) {
         return a0.builder()
-            .add("id", SqlTypeName.INTEGER)
-            .add("order_id", SqlTypeName.BIGINT)
-            .add("price", SqlTypeName.FLOAT)
-            .add("amount", SqlTypeName.DOUBLE)
-            .add("user_name", SqlTypeName.VARCHAR)
+            .add("col_tinyint", SqlTypeName.TINYINT)
+            .add("col_smallint", SqlTypeName.SMALLINT)
+            .add("col_integer", SqlTypeName.INTEGER)
+            .add("col_bigint", SqlTypeName.BIGINT)
+            .add("col_float", SqlTypeName.FLOAT)
+            .add("col_double", SqlTypeName.DOUBLE)
+            .add("col_decimal", SqlTypeName.DECIMAL)
+            .add("col_string_varchar", SqlTypeName.VARCHAR)
+            .add("col_time", SqlTypeName.TIME)
+            .add("col_timestamp", SqlTypeName.TIMESTAMP)
             .build();
       }
     };
@@ -51,11 +60,19 @@ public class BeamSqlRowCoderTest {
         protoRowType.apply(new JavaTypeFactoryImpl(
         RelDataTypeSystem.DEFAULT)));
     BeamSQLRow row = new BeamSQLRow(beamSQLRecordType);
-    row.addField(0, 1);
-    row.addField(1, 1L);
-    row.addField(2, 1.1F);
-    row.addField(3, 1.1);
-    row.addField(4, "hello");
+    row.addField("col_tinyint", Byte.valueOf("1"));
+    row.addField("col_smallint", Short.valueOf("1"));
+    row.addField("col_integer", 1);
+    row.addField("col_bigint", 1L);
+    row.addField("col_float", 1.1F);
+    row.addField("col_double", 1.1);
+    row.addField("col_decimal", BigDecimal.ZERO);
+    row.addField("col_string_varchar", "hello");
+    GregorianCalendar calendar = new GregorianCalendar();
+    calendar.setTime(new Date());
+    row.addField("col_time", calendar);
+    row.addField("col_timestamp", new Date());
+
 
     BeamSqlRowCoder coder = BeamSqlRowCoder.of();
     CoderProperties.coderDecodeEncodeEqual(coder, row);


[2/2] beam git commit: [BEAM-2310] This closes #3168

Posted by jb...@apache.org.
[BEAM-2310] This closes #3168


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/dedabff1
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/dedabff1
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/dedabff1

Branch: refs/heads/DSL_SQL
Commit: dedabff1f5aa473a76ef2575466dda4fa6f5a205
Parents: db982cf 23a037e
Author: Jean-Baptiste Onofré <jb...@apache.org>
Authored: Wed May 24 11:52:54 2017 +0200
Committer: Jean-Baptiste Onofré <jb...@apache.org>
Committed: Wed May 24 11:52:54 2017 +0200

----------------------------------------------------------------------
 .../apache/beam/dsls/sql/schema/BeamSQLRow.java | 268 ++++++++++---------
 .../beam/dsls/sql/schema/BeamSqlRowCoder.java   |  23 +-
 .../dsls/sql/schema/BeamSqlRowCoderTest.java    |  37 ++-
 3 files changed, 193 insertions(+), 135 deletions(-)
----------------------------------------------------------------------