You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/01/08 19:35:04 UTC

[jira] [Commented] (BEAM-3076) support TIMESTAMP in BeamRecordType

    [ https://issues.apache.org/jira/browse/BEAM-3076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16316861#comment-16316861 ] 

ASF GitHub Bot commented on BEAM-3076:
--------------------------------------

XuMingmin closed pull request #4036: [BEAM-3076] support TIMESTAMP in BeamRecordType
URL: https://github.com/apache/beam/pull/4036
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamRecordSqlType.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamRecordSqlType.java
index 982494ad2e5..8dc85646dd1 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamRecordSqlType.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamRecordSqlType.java
@@ -18,6 +18,7 @@
 package org.apache.beam.sdk.extensions.sql;
 
 import java.math.BigDecimal;
+import java.sql.Timestamp;
 import java.sql.Types;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -38,6 +39,7 @@
 import org.apache.beam.sdk.extensions.sql.BeamSqlRecordHelper.FloatCoder;
 import org.apache.beam.sdk.extensions.sql.BeamSqlRecordHelper.ShortCoder;
 import org.apache.beam.sdk.extensions.sql.BeamSqlRecordHelper.TimeCoder;
+import org.apache.beam.sdk.extensions.sql.BeamSqlRecordHelper.TimestampCoder;
 import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.beam.sdk.values.BeamRecordType;
 
@@ -68,7 +70,7 @@
     SQL_TYPE_TO_JAVA_CLASS.put(Types.TIME, GregorianCalendar.class);
 
     SQL_TYPE_TO_JAVA_CLASS.put(Types.DATE, Date.class);
-    SQL_TYPE_TO_JAVA_CLASS.put(Types.TIMESTAMP, Date.class);
+    SQL_TYPE_TO_JAVA_CLASS.put(Types.TIMESTAMP, Timestamp.class);
   }
 
   public List<Integer> fieldTypes;
@@ -120,9 +122,11 @@ public static BeamRecordSqlType create(List<String> fieldNames,
         fieldCoders.add(TimeCoder.of());
         break;
       case Types.DATE:
-      case Types.TIMESTAMP:
         fieldCoders.add(DateCoder.of());
         break;
+      case Types.TIMESTAMP:
+        fieldCoders.add(TimestampCoder.of());
+        break;
       case Types.BOOLEAN:
         fieldCoders.add(BooleanCoder.of());
         break;
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlRecordHelper.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlRecordHelper.java
index 870165d70fa..c6c2681adfe 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlRecordHelper.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlRecordHelper.java
@@ -23,6 +23,7 @@
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.math.BigDecimal;
+import java.sql.Timestamp;
 import java.util.Date;
 import java.util.GregorianCalendar;
 import org.apache.beam.sdk.annotations.Experimental;
@@ -187,6 +188,34 @@ public void verifyDeterministic() throws NonDeterministicException {
     }
   }
 
+  /**
+   * {@link Coder} for Java type {@link Timestamp}, it's stored as {@link Long}.
+   */
+  public static class TimestampCoder extends CustomCoder<Timestamp> {
+    private static final BigEndianLongCoder longCoder = BigEndianLongCoder.of();
+    private static final TimestampCoder INSTANCE = new TimestampCoder();
+
+    public static TimestampCoder of() {
+      return INSTANCE;
+    }
+
+    private TimestampCoder() {
+    }
+
+    @Override
+    public void encode(Timestamp value, OutputStream outStream) throws CoderException, IOException {
+      longCoder.encode(value.getTime(), outStream);
+    }
+
+    @Override
+    public Timestamp decode(InputStream inStream) throws CoderException, IOException {
+      return new Timestamp(longCoder.decode(inStream));
+    }
+
+    @Override
+    public void verifyDeterministic() throws NonDeterministicException {
+    }
+  }
   /**
    * {@link Coder} for Java type {@link Boolean}.
    */
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamBuiltinAggregations.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamBuiltinAggregations.java
index ad15f9892d3..b90a5186766 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamBuiltinAggregations.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamBuiltinAggregations.java
@@ -21,6 +21,7 @@
 import java.math.BigDecimal;
 import java.math.MathContext;
 import java.math.RoundingMode;
+import java.sql.Timestamp;
 import java.util.Date;
 import java.util.Iterator;
 import javax.annotation.Nullable;
@@ -62,8 +63,10 @@ public static CombineFn createMax(SqlTypeName fieldType) {
         return new CustMax<Float>();
       case DOUBLE:
         return Max.ofDoubles();
-      case TIMESTAMP:
+      case DATE:
         return new CustMax<Date>();
+      case TIMESTAMP:
+        return new TimestampMax();
       case DECIMAL:
         return new CustMax<BigDecimal>();
       default:
@@ -89,8 +92,10 @@ public static CombineFn createMin(SqlTypeName fieldType) {
         return new CustMin<Float>();
       case DOUBLE:
         return Min.ofDoubles();
-      case TIMESTAMP:
+      case DATE:
         return new CustMin<Date>();
+      case TIMESTAMP:
+        return new TimestampMin();
       case DECIMAL:
         return new CustMin<BigDecimal>();
       default:
@@ -186,6 +191,21 @@ public T apply(T left, T right) {
     }
   }
 
+  /*
+  * Timestamp implement Comparable<Date> thus it doesn't fit CustMin/Max
+  */
+  static class TimestampMax extends Combine.BinaryCombineFn<Timestamp> {
+    public Timestamp apply(Timestamp left, Timestamp right) {
+      return (right == null || right.compareTo(right) < 0) ? left : right;
+    }
+  }
+
+  static class TimestampMin extends Combine.BinaryCombineFn<Timestamp> {
+    public Timestamp apply(Timestamp left, Timestamp right) {
+      return (left == null || left.compareTo(right) < 0) ? left : right;
+    }
+  }
+
   static class ShortSum extends Combine.BinaryCombineFn<Short> {
     public Short apply(Short left, Short right) {
       return (short) (left + right);
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java
index 76d23138dca..f38cab06158 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java
@@ -22,6 +22,7 @@
 import static org.junit.Assert.assertTrue;
 
 import java.math.BigDecimal;
+import java.sql.Timestamp;
 import java.sql.Types;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -141,7 +142,8 @@ private void runAggregationFunctions(PCollection<BeamRecord> input) throws Excep
         + "sum(f_float) as sum4, avg(f_float) as avg4, max(f_float) as max4, min(f_float) as min4, "
         + "sum(f_double) as sum5, avg(f_double) as avg5, "
         + "max(f_double) as max5, min(f_double) as min5, "
-        + "max(f_timestamp) as max6, min(f_timestamp) as min6, "
+        + "max(f_date) as max6, min(f_date) as min6, "
+        + "max(f_timestamp) as max7, min(f_timestamp) as min7, "
         + "var_pop(f_double) as varpop1, var_samp(f_double) as varsamp1, "
         + "var_pop(f_int) as varpop2, var_samp(f_int) as varsamp2 "
         + "FROM TABLE_A group by f_int2";
@@ -151,14 +153,18 @@ private void runAggregationFunctions(PCollection<BeamRecord> input) throws Excep
         .apply("testAggregationFunctions", BeamSql.queryMulti(sql));
 
     BeamRecordSqlType resultType = BeamRecordSqlType.create(
-        Arrays.asList("f_int2", "size", "sum1", "avg1", "max1", "min1", "sum2", "avg2", "max2",
-            "min2", "sum3", "avg3", "max3", "min3", "sum4", "avg4", "max4", "min4", "sum5", "avg5",
-            "max5", "min5", "max6", "min6",
-            "varpop1", "varsamp1", "varpop2", "varsamp2"),
+        Arrays.asList("f_int2", "size", "sum1", "avg1", "max1",
+                          "min1", "sum2", "avg2", "max2", "min2",
+                          "sum3", "avg3", "max3", "min3", "sum4",
+                          "avg4", "max4", "min4", "sum5", "avg5",
+                          "max5", "min5", "max6", "min6",
+                          "max7", "min7",
+                          "varpop1", "varsamp1", "varpop2", "varsamp2"),
         Arrays.asList(Types.INTEGER, Types.BIGINT, Types.BIGINT, Types.BIGINT, Types.BIGINT,
             Types.BIGINT, Types.SMALLINT, Types.SMALLINT, Types.SMALLINT, Types.SMALLINT,
-            Types.TINYINT, Types.TINYINT, Types.TINYINT, Types.TINYINT, Types.FLOAT, Types.FLOAT,
-            Types.FLOAT, Types.FLOAT, Types.DOUBLE, Types.DOUBLE, Types.DOUBLE, Types.DOUBLE,
+            Types.TINYINT, Types.TINYINT, Types.TINYINT, Types.TINYINT, Types.FLOAT,
+            Types.FLOAT, Types.FLOAT, Types.FLOAT, Types.DOUBLE, Types.DOUBLE,
+            Types.DOUBLE, Types.DOUBLE, Types.DATE, Types.DATE,
             Types.TIMESTAMP, Types.TIMESTAMP,
             Types.DOUBLE, Types.DOUBLE, Types.INTEGER, Types.INTEGER));
 
@@ -170,6 +176,8 @@ private void runAggregationFunctions(PCollection<BeamRecord> input) throws Excep
         , 10.0F, 2.5F, 4.0F, 1.0F
         , 10.0, 2.5, 4.0, 1.0
         , FORMAT.parse("2017-01-01 02:04:03"), FORMAT.parse("2017-01-01 01:01:03")
+        , new Timestamp(FORMAT.parse("2017-01-01 02:04:03").getTime())
+        , new Timestamp(FORMAT.parse("2017-01-01 01:01:03").getTime())
         , 1.25, 1.666666667, 1, 1);
 
     PAssert.that(result).containsInAnyOrder(record);
@@ -273,9 +281,9 @@ public void testTumbleWindowWithUnbounded() throws Exception {
 
   private void runTumbleWindow(PCollection<BeamRecord> input) throws Exception {
     String sql = "SELECT f_int2, COUNT(*) AS `getFieldCount`,"
-        + " TUMBLE_START(f_timestamp, INTERVAL '1' HOUR) AS `window_start`"
+        + " TUMBLE_START(f_date, INTERVAL '1' HOUR) AS `window_start`"
         + " FROM TABLE_A"
-        + " GROUP BY f_int2, TUMBLE(f_timestamp, INTERVAL '1' HOUR)";
+        + " GROUP BY f_int2, TUMBLE(f_date, INTERVAL '1' HOUR)";
     PCollection<BeamRecord> result =
         PCollectionTuple.of(new TupleTag<BeamRecord>("TABLE_A"), input)
         .apply("testTumbleWindow", BeamSql.queryMulti(sql));
@@ -310,9 +318,9 @@ public void testHopWindowWithUnbounded() throws Exception {
 
   private void runHopWindow(PCollection<BeamRecord> input) throws Exception {
     String sql = "SELECT f_int2, COUNT(*) AS `getFieldCount`,"
-        + " HOP_START(f_timestamp, INTERVAL '1' HOUR, INTERVAL '30' MINUTE) AS `window_start`"
+        + " HOP_START(f_date, INTERVAL '1' HOUR, INTERVAL '30' MINUTE) AS `window_start`"
         + " FROM PCOLLECTION"
-        + " GROUP BY f_int2, HOP(f_timestamp, INTERVAL '1' HOUR, INTERVAL '30' MINUTE)";
+        + " GROUP BY f_int2, HOP(f_date, INTERVAL '1' HOUR, INTERVAL '30' MINUTE)";
     PCollection<BeamRecord> result =
         input.apply("testHopWindow", BeamSql.query(sql));
 
@@ -348,9 +356,9 @@ public void testSessionWindowWithUnbounded() throws Exception {
 
   private void runSessionWindow(PCollection<BeamRecord> input) throws Exception {
     String sql = "SELECT f_int2, COUNT(*) AS `getFieldCount`,"
-        + " SESSION_START(f_timestamp, INTERVAL '5' MINUTE) AS `window_start`"
+        + " SESSION_START(f_date, INTERVAL '5' MINUTE) AS `window_start`"
         + " FROM TABLE_A"
-        + " GROUP BY f_int2, SESSION(f_timestamp, INTERVAL '5' MINUTE)";
+        + " GROUP BY f_int2, SESSION(f_date, INTERVAL '5' MINUTE)";
     PCollection<BeamRecord> result =
         PCollectionTuple.of(new TupleTag<BeamRecord>("TABLE_A"), input)
         .apply("testSessionWindow", BeamSql.queryMulti(sql));
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslBase.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslBase.java
index b27435c9d2c..9a851c9f5f0 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslBase.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslBase.java
@@ -18,6 +18,7 @@
 package org.apache.beam.sdk.extensions.sql;
 
 import java.math.BigDecimal;
+import java.sql.Timestamp;
 import java.sql.Types;
 import java.text.DateFormat;
 import java.text.ParseException;
@@ -65,10 +66,12 @@
   @BeforeClass
   public static void prepareClass() throws ParseException {
     rowTypeInTableA = BeamRecordSqlType.create(
-        Arrays.asList("f_int", "f_long", "f_short", "f_byte", "f_float", "f_double", "f_string",
-            "f_timestamp", "f_int2", "f_decimal"),
-        Arrays.asList(Types.INTEGER, Types.BIGINT, Types.SMALLINT, Types.TINYINT, Types.FLOAT,
-            Types.DOUBLE, Types.VARCHAR, Types.TIMESTAMP, Types.INTEGER, Types.DECIMAL));
+        Arrays.asList("f_int", "f_long", "f_short", "f_byte",
+                          "f_float", "f_double", "f_string", "f_date",
+                          "f_timestamp", "f_int2", "f_decimal"),
+        Arrays.asList(Types.INTEGER, Types.BIGINT, Types.SMALLINT, Types.TINYINT,
+                      Types.FLOAT, Types.DOUBLE, Types.VARCHAR, Types.DATE,
+                      Types.TIMESTAMP, Types.INTEGER, Types.DECIMAL));
 
     recordsInTableA = prepareInputRowsInTableA();
   }
@@ -90,7 +93,7 @@ public void preparePCollections(){
         .create(rowTypeInTableA.getRecordCoder());
 
     for (BeamRecord row : recordsInTableA) {
-      values = values.advanceWatermarkTo(new Instant(row.getDate("f_timestamp")));
+      values = values.advanceWatermarkTo(new Instant(row.getDate("f_date")));
       values = values.addElements(row);
     }
 
@@ -102,7 +105,7 @@ public void preparePCollections(){
         .create(rowTypeInTableA.getRecordCoder());
 
     BeamRecord row = recordsInTableA.get(0);
-    values = values.advanceWatermarkTo(new Instant(row.getDate("f_timestamp")));
+    values = values.advanceWatermarkTo(new Instant(row.getDate("f_date")));
     values = values.addElements(row);
 
     return PBegin.in(pipeline).apply("unboundedInput2", values.advanceWatermarkToInfinity());
@@ -113,22 +116,30 @@ public void preparePCollections(){
 
     BeamRecord row1 = new BeamRecord(rowTypeInTableA
         , 1, 1000L, Short.valueOf("1"), Byte.valueOf("1"), 1.0f, 1.0, "string_row1"
-        , FORMAT.parse("2017-01-01 01:01:03"), 0, new BigDecimal(1));
+        , FORMAT.parse("2017-01-01 01:01:03")
+        , new Timestamp(FORMAT.parse("2017-01-01 01:01:03").getTime())
+        , 0, new BigDecimal(1));
     rows.add(row1);
 
     BeamRecord row2 = new BeamRecord(rowTypeInTableA
         , 2, 2000L, Short.valueOf("2"), Byte.valueOf("2"), 2.0f, 2.0, "string_row2"
-        , FORMAT.parse("2017-01-01 01:02:03"), 0, new BigDecimal(2));
+        , FORMAT.parse("2017-01-01 01:02:03")
+        , new Timestamp(FORMAT.parse("2017-01-01 01:02:03").getTime())
+        , 0, new BigDecimal(2));
     rows.add(row2);
 
     BeamRecord row3 = new BeamRecord(rowTypeInTableA
         , 3, 3000L, Short.valueOf("3"), Byte.valueOf("3"), 3.0f, 3.0, "string_row3"
-        , FORMAT.parse("2017-01-01 01:06:03"), 0, new BigDecimal(3));
+        , FORMAT.parse("2017-01-01 01:06:03")
+        , new Timestamp(FORMAT.parse("2017-01-01 01:06:03").getTime())
+        , 0, new BigDecimal(3));
     rows.add(row3);
 
     BeamRecord row4 = new BeamRecord(rowTypeInTableA
         , 4, 4000L, Short.valueOf("4"), Byte.valueOf("4"), 4.0f, 4.0, "string_row4"
-        , FORMAT.parse("2017-01-01 02:04:03"), 0, new BigDecimal(4));
+        , FORMAT.parse("2017-01-01 02:04:03")
+        , new Timestamp(FORMAT.parse("2017-01-01 02:04:03").getTime())
+        , 0, new BigDecimal(4));
     rows.add(row4);
 
     return rows;


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> support TIMESTAMP in BeamRecordType
> -----------------------------------
>
>                 Key: BEAM-3076
>                 URL: https://issues.apache.org/jira/browse/BEAM-3076
>             Project: Beam
>          Issue Type: Improvement
>          Components: dsl-sql
>            Reporter: Shayang Zang
>            Assignee: Shayang Zang
>            Priority: Minor
>
> Timestamp type of data was mapped to Data.class and also sharing DateCoder() during BeamSql execution. We want it to be supported in BeamRecordType as a stand-alone datatype.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)