You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ta...@apache.org on 2017/07/12 17:35:25 UTC
[1/2] beam git commit: CAST operator supporting numeric,
date and timestamp types
Repository: beam
Updated Branches:
refs/heads/DSL_SQL 53d27e6c4 -> 125cef144
CAST operator supporting numeric, date and timestamp types
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/39eedd56
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/39eedd56
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/39eedd56
Branch: refs/heads/DSL_SQL
Commit: 39eedd566c3a1825ec3e9be0aa4490cfd824cb30
Parents: 53d27e6
Author: tarushapptech <ta...@gmail.com>
Authored: Sat Jun 17 13:20:02 2017 +0530
Committer: Tyler Akidau <ta...@apache.org>
Committed: Wed Jul 12 10:34:10 2017 -0700
----------------------------------------------------------------------
.../dsls/sql/interpreter/BeamSqlFnExecutor.java | 4 +
.../operator/BeamSqlCastExpression.java | 132 +++++++++++++++++++
.../operator/BeamSqlCastExpressionTest.java | 126 ++++++++++++++++++
3 files changed, 262 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/39eedd56/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutor.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutor.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutor.java
index 4678da5..2c2efe9 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutor.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutor.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
import java.util.Calendar;
import java.util.List;
import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlCaseExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlCastExpression;
import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlEqualExpression;
import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlInputRefExpression;
@@ -333,6 +334,9 @@ public class BeamSqlFnExecutor implements BeamSqlExpressionExecutor {
case "CASE":
ret = new BeamSqlCaseExpression(subExps);
break;
+ case "CAST":
+ ret = new BeamSqlCastExpression(subExps, node.type.getSqlTypeName());
+ break;
case "IS NULL":
ret = new BeamSqlIsNullExpression(subExps.get(0));
http://git-wip-us.apache.org/repos/asf/beam/blob/39eedd56/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCastExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCastExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCastExpression.java
new file mode 100644
index 0000000..7e8ab03
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCastExpression.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.interpreter.operator;
+
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.util.List;
+
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.calcite.runtime.SqlFunctions;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+import org.joda.time.format.DateTimeFormatterBuilder;
+import org.joda.time.format.DateTimeParser;
+
+/**
+ * Base class to support 'CAST' operations for all {@link SqlTypeName}.
+ */
+public class BeamSqlCastExpression extends BeamSqlExpression {
+
+ private static final int index = 0;
+ private static final String outputTimestampFormat = "yyyy-MM-dd HH:mm:ss";
+ private static final String outputDateFormat = "yyyy-MM-dd";
+ /**
+ * Date and Timestamp formats used to parse
+ * {@link SqlTypeName#DATE}, {@link SqlTypeName#TIMESTAMP}.
+ */
+ private static final DateTimeFormatter dateTimeFormatter = new DateTimeFormatterBuilder()
+ .append(null/*printer*/, new DateTimeParser[] {
+ // date formats
+ DateTimeFormat.forPattern("yy-MM-dd").getParser(),
+ DateTimeFormat.forPattern("yy/MM/dd").getParser(),
+ DateTimeFormat.forPattern("yy.MM.dd").getParser(),
+ DateTimeFormat.forPattern("yyMMdd").getParser(),
+ DateTimeFormat.forPattern("yyyyMMdd").getParser(),
+ DateTimeFormat.forPattern("yyyy-MM-dd").getParser(),
+ DateTimeFormat.forPattern("yyyy/MM/dd").getParser(),
+ DateTimeFormat.forPattern("yyyy.MM.dd").getParser(),
+ // datetime formats
+ DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss").getParser(),
+ DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ssz").getParser(),
+ DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss z").getParser(),
+ DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSSSSSSSS").getParser(),
+ DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSSSSSSSSz").getParser(),
+ DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSSSSSSSS z").getParser() }).toFormatter()
+ .withPivotYear(2020);
+
+ public BeamSqlCastExpression(List<BeamSqlExpression> operands, SqlTypeName castType) {
+ super(operands, castType);
+ }
+
+ @Override
+ public boolean accept() {
+ return numberOfOperands() == 1;
+ }
+
+ @Override
+ public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) {
+ SqlTypeName castOutputType = getOutputType();
+ switch (castOutputType) {
+ case INTEGER:
+ return BeamSqlPrimitive
+ .of(SqlTypeName.INTEGER, SqlFunctions.toInt(opValueEvaluated(index, inputRecord)));
+ case DOUBLE:
+ return BeamSqlPrimitive
+ .of(SqlTypeName.DOUBLE, SqlFunctions.toDouble(opValueEvaluated(index, inputRecord)));
+ case SMALLINT:
+ return BeamSqlPrimitive
+ .of(SqlTypeName.SMALLINT, SqlFunctions.toShort(opValueEvaluated(index, inputRecord)));
+ case TINYINT:
+ return BeamSqlPrimitive
+ .of(SqlTypeName.TINYINT, SqlFunctions.toByte(opValueEvaluated(index, inputRecord)));
+ case BIGINT:
+ return BeamSqlPrimitive
+ .of(SqlTypeName.BIGINT, SqlFunctions.toLong(opValueEvaluated(index, inputRecord)));
+ case DECIMAL:
+ return BeamSqlPrimitive.of(SqlTypeName.DECIMAL,
+ SqlFunctions.toBigDecimal(opValueEvaluated(index, inputRecord)));
+ case FLOAT:
+ return BeamSqlPrimitive
+ .of(SqlTypeName.FLOAT, SqlFunctions.toFloat(opValueEvaluated(index, inputRecord)));
+ case CHAR:
+ case VARCHAR:
+ return BeamSqlPrimitive
+ .of(SqlTypeName.VARCHAR, opValueEvaluated(index, inputRecord).toString());
+ case DATE:
+ return BeamSqlPrimitive
+ .of(SqlTypeName.DATE, toDate(opValueEvaluated(index, inputRecord), outputDateFormat));
+ case TIMESTAMP:
+ return BeamSqlPrimitive.of(SqlTypeName.TIMESTAMP,
+ toTimeStamp(opValueEvaluated(index, inputRecord), outputTimestampFormat));
+ }
+ throw new UnsupportedOperationException(
+ String.format("Cast to type %s not supported", castOutputType));
+ }
+
+ private Date toDate(Object inputDate, String outputFormat) {
+ try {
+ return Date
+ .valueOf(dateTimeFormatter.parseLocalDate(inputDate.toString()).toString(outputFormat));
+ } catch (IllegalArgumentException | UnsupportedOperationException e) {
+ throw new UnsupportedOperationException("Can't be cast to type 'Date'");
+ }
+ }
+
+ private Timestamp toTimeStamp(Object inputTimestamp, String outputFormat) {
+ try {
+ return Timestamp.valueOf(
+ dateTimeFormatter.parseDateTime(inputTimestamp.toString()).secondOfMinute()
+ .roundCeilingCopy().toString(outputFormat));
+ } catch (IllegalArgumentException | UnsupportedOperationException e) {
+ throw new UnsupportedOperationException("Can't be cast to type 'Timestamp'");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/39eedd56/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCastExpressionTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCastExpressionTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCastExpressionTest.java
new file mode 100644
index 0000000..c2fd68d
--- /dev/null
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCastExpressionTest.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.interpreter.operator;
+
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.List;
+
+import org.apache.beam.dsls.sql.interpreter.BeamSqlFnExecutorTestBase;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test for {@link BeamSqlCastExpression}.
+ */
+public class BeamSqlCastExpressionTest extends BeamSqlFnExecutorTestBase {
+
+ private List<BeamSqlExpression> operands;
+
+ @Before
+ public void setup() {
+ operands = new ArrayList<>();
+ }
+
+ @Test
+ public void testForOperands() {
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1));
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "aaa"));
+ Assert.assertFalse(new BeamSqlCastExpression(operands, SqlTypeName.BIGINT).accept());
+ }
+
+ @Test
+ public void testForIntegerToBigintTypeCasting() {
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 5));
+ Assert.assertEquals(5L,
+ new BeamSqlCastExpression(operands, SqlTypeName.BIGINT).evaluate(record).getLong());
+ }
+
+ @Test
+ public void testForDoubleToBigIntCasting() {
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 5.45));
+ Assert.assertEquals(5L,
+ new BeamSqlCastExpression(operands, SqlTypeName.BIGINT).evaluate(record).getLong());
+ }
+
+ @Test
+ public void testForIntegerToDateCast() {
+ // test for yyyyMMdd format
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 20170521));
+ Assert.assertEquals(Date.valueOf("2017-05-21"),
+ new BeamSqlCastExpression(operands, SqlTypeName.DATE).evaluate(record).getValue());
+ }
+
+ @Test
+ public void testyyyyMMddDateFormat() {
+ //test for yyyy-MM-dd format
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "2017-05-21"));
+ Assert.assertEquals(Date.valueOf("2017-05-21"),
+ new BeamSqlCastExpression(operands, SqlTypeName.DATE).evaluate(record).getValue());
+ }
+
+ @Test
+ public void testyyMMddDateFormat() {
+ // test for yy.MM.dd format
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "17.05.21"));
+ Assert.assertEquals(Date.valueOf("2017-05-21"),
+ new BeamSqlCastExpression(operands, SqlTypeName.DATE).evaluate(record).getValue());
+ }
+
+ @Test
+ public void testForTimestampCastExpression() {
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "17-05-21 23:59:59.989"));
+ Assert.assertEquals(SqlTypeName.TIMESTAMP,
+ new BeamSqlCastExpression(operands, SqlTypeName.TIMESTAMP).evaluate(record)
+ .getOutputType());
+ }
+
+ @Test
+ public void testDateTimeFormatWithMillis() {
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "2017-05-21 23:59:59.989"));
+ Assert.assertEquals(Timestamp.valueOf("2017-05-22 00:00:00.0"),
+ new BeamSqlCastExpression(operands, SqlTypeName.TIMESTAMP).evaluate(record).getValue());
+ }
+
+ @Test
+ public void testDateTimeFormatWithTimezone() {
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "2017-05-21 23:59:59.89079 PST"));
+ Assert.assertEquals(Timestamp.valueOf("2017-05-22 00:00:00.0"),
+ new BeamSqlCastExpression(operands, SqlTypeName.TIMESTAMP).evaluate(record).getValue());
+ }
+
+ @Test
+ public void testDateTimeFormat() {
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "2017-05-21 23:59:59"));
+ Assert.assertEquals(Timestamp.valueOf("2017-05-21 23:59:59"),
+ new BeamSqlCastExpression(operands, SqlTypeName.TIMESTAMP).evaluate(record).getValue());
+ }
+
+ @Test(expected = RuntimeException.class)
+ public void testForCastTypeNotSupported() {
+ operands.add(BeamSqlPrimitive.of(SqlTypeName.TIME, Calendar.getInstance().getTime()));
+ Assert.assertEquals(Timestamp.valueOf("2017-05-22 00:00:00.0"),
+ new BeamSqlCastExpression(operands, SqlTypeName.TIMESTAMP).evaluate(record).getValue());
+ }
+
+}
[2/2] beam git commit: [BEAM-2424] This closes #3386
Posted by ta...@apache.org.
[BEAM-2424] This closes #3386
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/125cef14
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/125cef14
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/125cef14
Branch: refs/heads/DSL_SQL
Commit: 125cef14453d625f1758aef5b129182cb13d829c
Parents: 53d27e6 39eedd5
Author: Tyler Akidau <ta...@apache.org>
Authored: Wed Jul 12 10:34:36 2017 -0700
Committer: Tyler Akidau <ta...@apache.org>
Committed: Wed Jul 12 10:34:36 2017 -0700
----------------------------------------------------------------------
.../dsls/sql/interpreter/BeamSqlFnExecutor.java | 4 +
.../operator/BeamSqlCastExpression.java | 132 +++++++++++++++++++
.../operator/BeamSqlCastExpressionTest.java | 126 ++++++++++++++++++
3 files changed, 262 insertions(+)
----------------------------------------------------------------------