You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ta...@apache.org on 2017/07/12 17:35:25 UTC

[1/2] beam git commit: CAST operator supporting numeric, date and timestamp types

Repository: beam
Updated Branches:
  refs/heads/DSL_SQL 53d27e6c4 -> 125cef144


CAST operator supporting numeric, date and timestamp types


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/39eedd56
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/39eedd56
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/39eedd56

Branch: refs/heads/DSL_SQL
Commit: 39eedd566c3a1825ec3e9be0aa4490cfd824cb30
Parents: 53d27e6
Author: tarushapptech <ta...@gmail.com>
Authored: Sat Jun 17 13:20:02 2017 +0530
Committer: Tyler Akidau <ta...@apache.org>
Committed: Wed Jul 12 10:34:10 2017 -0700

----------------------------------------------------------------------
 .../dsls/sql/interpreter/BeamSqlFnExecutor.java |   4 +
 .../operator/BeamSqlCastExpression.java         | 132 +++++++++++++++++++
 .../operator/BeamSqlCastExpressionTest.java     | 126 ++++++++++++++++++
 3 files changed, 262 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/39eedd56/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutor.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutor.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutor.java
index 4678da5..2c2efe9 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutor.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutor.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
 import java.util.Calendar;
 import java.util.List;
 import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlCaseExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlCastExpression;
 import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlEqualExpression;
 import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
 import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlInputRefExpression;
@@ -333,6 +334,9 @@ public class BeamSqlFnExecutor implements BeamSqlExpressionExecutor {
         case "CASE":
           ret = new BeamSqlCaseExpression(subExps);
           break;
+        case "CAST":
+          ret = new BeamSqlCastExpression(subExps, node.type.getSqlTypeName());
+          break;
 
         case "IS NULL":
           ret = new BeamSqlIsNullExpression(subExps.get(0));

http://git-wip-us.apache.org/repos/asf/beam/blob/39eedd56/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCastExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCastExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCastExpression.java
new file mode 100644
index 0000000..7e8ab03
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCastExpression.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.interpreter.operator;
+
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.util.List;
+
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.calcite.runtime.SqlFunctions;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+import org.joda.time.format.DateTimeFormatterBuilder;
+import org.joda.time.format.DateTimeParser;
+
+/**
+ * Base class to support 'CAST' operations for all {@link SqlTypeName}.
+ */
+public class BeamSqlCastExpression extends BeamSqlExpression {
+
+  private static final int index = 0;
+  private static final String outputTimestampFormat = "yyyy-MM-dd HH:mm:ss";
+  private static final String outputDateFormat = "yyyy-MM-dd";
+  /**
+   * Date and Timestamp formats used to parse
+   * {@link SqlTypeName#DATE}, {@link SqlTypeName#TIMESTAMP}.
+   */
+  private static final DateTimeFormatter dateTimeFormatter = new DateTimeFormatterBuilder()
+      .append(null/*printer*/, new DateTimeParser[] {
+          // date formats
+          DateTimeFormat.forPattern("yy-MM-dd").getParser(),
+          DateTimeFormat.forPattern("yy/MM/dd").getParser(),
+          DateTimeFormat.forPattern("yy.MM.dd").getParser(),
+          DateTimeFormat.forPattern("yyMMdd").getParser(),
+          DateTimeFormat.forPattern("yyyyMMdd").getParser(),
+          DateTimeFormat.forPattern("yyyy-MM-dd").getParser(),
+          DateTimeFormat.forPattern("yyyy/MM/dd").getParser(),
+          DateTimeFormat.forPattern("yyyy.MM.dd").getParser(),
+          // datetime formats
+          DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss").getParser(),
+          DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ssz").getParser(),
+          DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss z").getParser(),
+          DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSSSSSSSS").getParser(),
+          DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSSSSSSSSz").getParser(),
+          DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSSSSSSSS z").getParser() }).toFormatter()
+      .withPivotYear(2020);
+
+  public BeamSqlCastExpression(List<BeamSqlExpression> operands, SqlTypeName castType) {
+    super(operands, castType);
+  }
+
+  @Override
+  public boolean accept() {
+    return numberOfOperands() == 1;
+  }
+
+  @Override
+  public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) {
+    SqlTypeName castOutputType = getOutputType();
+    switch (castOutputType) {
+      case INTEGER:
+        return BeamSqlPrimitive
+            .of(SqlTypeName.INTEGER, SqlFunctions.toInt(opValueEvaluated(index, inputRecord)));
+      case DOUBLE:
+        return BeamSqlPrimitive
+            .of(SqlTypeName.DOUBLE, SqlFunctions.toDouble(opValueEvaluated(index, inputRecord)));
+      case SMALLINT:
+        return BeamSqlPrimitive
+            .of(SqlTypeName.SMALLINT, SqlFunctions.toShort(opValueEvaluated(index, inputRecord)));
+      case TINYINT:
+        return BeamSqlPrimitive
+            .of(SqlTypeName.TINYINT, SqlFunctions.toByte(opValueEvaluated(index, inputRecord)));
+      case BIGINT:
+        return BeamSqlPrimitive
+            .of(SqlTypeName.BIGINT, SqlFunctions.toLong(opValueEvaluated(index, inputRecord)));
+      case DECIMAL:
+        return BeamSqlPrimitive.of(SqlTypeName.DECIMAL,
+            SqlFunctions.toBigDecimal(opValueEvaluated(index, inputRecord)));
+      case FLOAT:
+        return BeamSqlPrimitive
+            .of(SqlTypeName.FLOAT, SqlFunctions.toFloat(opValueEvaluated(index, inputRecord)));
+      case CHAR:
+      case VARCHAR:
+        return BeamSqlPrimitive
+            .of(SqlTypeName.VARCHAR, opValueEvaluated(index, inputRecord).toString());
+      case DATE:
+        return BeamSqlPrimitive
+            .of(SqlTypeName.DATE, toDate(opValueEvaluated(index, inputRecord), outputDateFormat));
+      case TIMESTAMP:
+        return BeamSqlPrimitive.of(SqlTypeName.TIMESTAMP,
+            toTimeStamp(opValueEvaluated(index, inputRecord), outputTimestampFormat));
+    }
+    throw new UnsupportedOperationException(
+        String.format("Cast to type %s not supported", castOutputType));
+  }
+
+  private Date toDate(Object inputDate, String outputFormat) {
+    try {
+      return Date
+          .valueOf(dateTimeFormatter.parseLocalDate(inputDate.toString()).toString(outputFormat));
+    } catch (IllegalArgumentException | UnsupportedOperationException e) {
+      throw new UnsupportedOperationException("Can't be cast to type 'Date'");
+    }
+  }
+
+  private Timestamp toTimeStamp(Object inputTimestamp, String outputFormat) {
+    try {
+      return Timestamp.valueOf(
+          dateTimeFormatter.parseDateTime(inputTimestamp.toString()).secondOfMinute()
+              .roundCeilingCopy().toString(outputFormat));
+    } catch (IllegalArgumentException | UnsupportedOperationException e) {
+      throw new UnsupportedOperationException("Can't be cast to type 'Timestamp'");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/39eedd56/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCastExpressionTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCastExpressionTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCastExpressionTest.java
new file mode 100644
index 0000000..c2fd68d
--- /dev/null
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCastExpressionTest.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.interpreter.operator;
+
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.List;
+
+import org.apache.beam.dsls.sql.interpreter.BeamSqlFnExecutorTestBase;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test for {@link BeamSqlCastExpression}.
+ */
+public class BeamSqlCastExpressionTest extends BeamSqlFnExecutorTestBase {
+
+  private List<BeamSqlExpression> operands;
+
+  @Before
+  public void setup() {
+    operands = new ArrayList<>();
+  }
+
+  @Test
+  public void testForOperands() {
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "aaa"));
+    Assert.assertFalse(new BeamSqlCastExpression(operands, SqlTypeName.BIGINT).accept());
+  }
+
+  @Test
+  public void testForIntegerToBigintTypeCasting() {
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 5));
+    Assert.assertEquals(5L,
+        new BeamSqlCastExpression(operands, SqlTypeName.BIGINT).evaluate(record).getLong());
+  }
+
+  @Test
+  public void testForDoubleToBigIntCasting() {
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 5.45));
+    Assert.assertEquals(5L,
+        new BeamSqlCastExpression(operands, SqlTypeName.BIGINT).evaluate(record).getLong());
+  }
+
+  @Test
+  public void testForIntegerToDateCast() {
+    // test for yyyyMMdd format
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 20170521));
+    Assert.assertEquals(Date.valueOf("2017-05-21"),
+        new BeamSqlCastExpression(operands, SqlTypeName.DATE).evaluate(record).getValue());
+  }
+
+  @Test
+  public void testyyyyMMddDateFormat() {
+    //test for yyyy-MM-dd format
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "2017-05-21"));
+    Assert.assertEquals(Date.valueOf("2017-05-21"),
+        new BeamSqlCastExpression(operands, SqlTypeName.DATE).evaluate(record).getValue());
+  }
+
+  @Test
+  public void testyyMMddDateFormat() {
+    // test for yy.MM.dd format
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "17.05.21"));
+    Assert.assertEquals(Date.valueOf("2017-05-21"),
+        new BeamSqlCastExpression(operands, SqlTypeName.DATE).evaluate(record).getValue());
+  }
+
+  @Test
+  public void testForTimestampCastExpression() {
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "17-05-21 23:59:59.989"));
+    Assert.assertEquals(SqlTypeName.TIMESTAMP,
+        new BeamSqlCastExpression(operands, SqlTypeName.TIMESTAMP).evaluate(record)
+            .getOutputType());
+  }
+
+  @Test
+  public void testDateTimeFormatWithMillis() {
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "2017-05-21 23:59:59.989"));
+    Assert.assertEquals(Timestamp.valueOf("2017-05-22 00:00:00.0"),
+        new BeamSqlCastExpression(operands, SqlTypeName.TIMESTAMP).evaluate(record).getValue());
+  }
+
+  @Test
+  public void testDateTimeFormatWithTimezone() {
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "2017-05-21 23:59:59.89079 PST"));
+    Assert.assertEquals(Timestamp.valueOf("2017-05-22 00:00:00.0"),
+        new BeamSqlCastExpression(operands, SqlTypeName.TIMESTAMP).evaluate(record).getValue());
+  }
+
+  @Test
+  public void testDateTimeFormat() {
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "2017-05-21 23:59:59"));
+    Assert.assertEquals(Timestamp.valueOf("2017-05-21 23:59:59"),
+        new BeamSqlCastExpression(operands, SqlTypeName.TIMESTAMP).evaluate(record).getValue());
+  }
+
+  @Test(expected = RuntimeException.class)
+  public void testForCastTypeNotSupported() {
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.TIME, Calendar.getInstance().getTime()));
+    Assert.assertEquals(Timestamp.valueOf("2017-05-22 00:00:00.0"),
+        new BeamSqlCastExpression(operands, SqlTypeName.TIMESTAMP).evaluate(record).getValue());
+  }
+
+}


[2/2] beam git commit: [BEAM-2424] This closes #3386

Posted by ta...@apache.org.
[BEAM-2424] This closes #3386


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/125cef14
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/125cef14
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/125cef14

Branch: refs/heads/DSL_SQL
Commit: 125cef14453d625f1758aef5b129182cb13d829c
Parents: 53d27e6 39eedd5
Author: Tyler Akidau <ta...@apache.org>
Authored: Wed Jul 12 10:34:36 2017 -0700
Committer: Tyler Akidau <ta...@apache.org>
Committed: Wed Jul 12 10:34:36 2017 -0700

----------------------------------------------------------------------
 .../dsls/sql/interpreter/BeamSqlFnExecutor.java |   4 +
 .../operator/BeamSqlCastExpression.java         | 132 +++++++++++++++++++
 .../operator/BeamSqlCastExpressionTest.java     | 126 ++++++++++++++++++
 3 files changed, 262 insertions(+)
----------------------------------------------------------------------