You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by kmurra <gi...@git.apache.org> on 2017/09/29 02:26:47 UTC

[GitHub] flink pull request #4746: [FLINK-7657] [Table] Adding logic to convert RexLi...

GitHub user kmurra opened a pull request:

    https://github.com/apache/flink/pull/4746

    [FLINK-7657] [Table] Adding logic to convert RexLiteral to expected SQL Date/Time/Timestamp classes

    ## What is the purpose of the change
    
    This change fixes handling of date, time, and timestamp literal expressions.
    
    A ClassCastException occurs if you have a date, time, or timestamp literal included in the where clause of a SQL query.  When Calcite trees are converted to an Expression, Literal values are not properly converted to the expected types by the RegNodeToExpressionConverter.
    
    See https://issues.apache.org/jira/browse/FLINK-7657 for more information.
    
    ## Brief change log
    
    - Added a method that takes a RexLiteral in the Literal object and applies the appropriate transforms to date & time related fields.
    - Have a default fallback that works using the old behavior for other field types.  
        - Note that handling of other types are incorrect per the public documentation, but fixes are out of scope for this JIRA
    - Added comments that explained handling of values (specifically why we're shifting them based on time zones)
    
    ## Verifying this change
    
    This change added tests and can be verified as follows:
    - Unit test cases were added covering changes
      - Added unit test case to RexProgramExtractorTest.scala to test that literal conversions are of the appropriate type.
      - Added unit test case to TableSourceTest.scala to confirm that received Expressions in a table source extending from FilterableTableSource do not cause exceptions and have expected values
    - Note: Other unit test cases cover possible regressions in output values from the Literal->RexNode conversion
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): no
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no
      - The serializers: no
      - The runtime per-record code paths (performance sensitive): no
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no
    
    ## Documentation
    
      - Does this pull request introduce a new feature? no
      - If yes, how is the feature documented? not applicable
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/kmurra/flink kmurra-expression-timestamp-fix

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/4746.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #4746
    
----
commit d8477977694614ed51ad53de55104df10ac55ad4
Author: Kent Murra <ke...@remitly.com>
Date:   2017-09-27T20:48:55Z

    [FLINK-7657] Adding logic to convert RexLiteral to expected SQL Date/Time/Timestamp classes, preventing ClassCastException

----


---

[GitHub] flink pull request #4746: [FLINK-7657] [Table] Adding logic to convert RexLi...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4746#discussion_r142632025
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/literals.scala ---
    @@ -49,10 +50,51 @@ object Literal {
         case sqlTime: Time => Literal(sqlTime, SqlTimeTypeInfo.TIME)
         case sqlTimestamp: Timestamp => Literal(sqlTimestamp, SqlTimeTypeInfo.TIMESTAMP)
       }
    +
    +  private[flink] def apply(rexNode: RexLiteral): Literal = {
    --- End diff --
    
    This logic is better located in `RexNodeToExpressionConverter.visitLiteral`.


---

[GitHub] flink pull request #4746: [FLINK-7657] [Table] Adding logic to convert RexLi...

Posted by kmurra <gi...@git.apache.org>.
Github user kmurra commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4746#discussion_r142799362
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/literals.scala ---
    @@ -49,10 +50,51 @@ object Literal {
         case sqlTime: Time => Literal(sqlTime, SqlTimeTypeInfo.TIME)
         case sqlTimestamp: Timestamp => Literal(sqlTimestamp, SqlTimeTypeInfo.TIMESTAMP)
       }
    +
    +  private[flink] def apply(rexNode: RexLiteral): Literal = {
    +    val literalType = FlinkTypeFactory.toTypeInfo(rexNode.getType)
    +
    +    val literalValue = literalType match {
    +      // Chrono use cases.  We're force-adjusting the UTC-based epoch timestamps to a new
    +      // timestamp such that we get the same year/month/hour/day field values in the query's
    +      // timezone (UTC)
    +      case _@SqlTimeTypeInfo.DATE =>
    +        val rexValue = rexNode.getValueAs(classOf[DateString])
    +        val adjustedCal = adjustCalendar(rexValue.toCalendar, TimeZone.getDefault)
    +        new Date(adjustedCal.getTimeInMillis)
    --- End diff --
    
    Unfortunately that constructor is deprecated in Java 8, which is why I avoided using it.  


---

[GitHub] flink pull request #4746: [FLINK-7657] [Table] Adding logic to convert RexLi...

Posted by kmurra <gi...@git.apache.org>.
Github user kmurra commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4746#discussion_r142800400
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/literals.scala ---
    @@ -103,13 +148,21 @@ case class Literal(value: Any, resultType: TypeInformation[_]) extends LeafExpre
         }
       }
     
    -  private def dateToCalendar: Calendar = {
    +  /**
    +    * Convert a date value to a utc calendar.
    +    * <p>
    +    * We're assuming that when the user passes in a Date its constructed from fields,
    +    * such as days and hours, and that they want those fields to be in the same timezone as the
    +    * Calcite times, which are UTC.  Since we need to convert a Date to a Calendar, that means we
    +    * have to shift the epoch millisecond timestamp to account for the difference between UTC and
    +    * local time.
    +    * @return Get the Calendar value
    +    */
    +  private def valueAsUtcCalendar: Calendar = {
         val date = value.asInstanceOf[java.util.Date]
    -    val cal = Calendar.getInstance(Literal.GMT)
    -    val t = date.getTime
    -    // according to Calcite's SqlFunctions.internalToXXX methods
    -    cal.setTimeInMillis(t + TimeZone.getDefault.getOffset(t))
    --- End diff --
    
    The re-implemented adjustCalendar method is functionally the same as this when toTz is the UTC TimeZone.  Its just generalized to allow converting between arbitrary TimeZones so that I can re-use it in the RexLiteral to Expression conversion.


---

[GitHub] flink issue #4746: [FLINK-7657] [Table] Adding logic to convert RexLiteral t...

Posted by kmurra <gi...@git.apache.org>.
Github user kmurra commented on the issue:

    https://github.com/apache/flink/pull/4746
  
    The biggest change here is in the test cases -- I generalized the test table source to have some basic filtering logic and allow for generic datasets.
    
    I moved the Literal build logic to the RexNodeToExpressionConverter.visitLiteral.  I also rewrote several of the conversion methods to more closely align with the intended behavior of the code - that we're preserving the string values of the various time-related literals in the local timezone.  This made a bunch of the epoch-millisecond modifications go away.


---

[GitHub] flink issue #4746: [FLINK-7657] [Table] Adding logic to convert RexLiteral t...

Posted by kmurra <gi...@git.apache.org>.
Github user kmurra commented on the issue:

    https://github.com/apache/flink/pull/4746
  
    Regarding the time zones, I think I understand your argument here.  Is there anything in particular that you would want me to change overall that you haven't already outlined to account for that? I do want to document why we're doing any conversions of time zones since it took me some amount of time to understand why it was being done (it looked incorrect to myself and several other developers on first glance).
    
    Also, I noticed that the Calcite fromCalendarFields simply take the fields directly from the Calendar, so making time-zone adjustments are unnecessary after I made the changes to toRexNode.  I'll fix that as well for my next commit.


---

[GitHub] flink pull request #4746: [FLINK-7657] [Table] Adding logic to convert RexLi...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/4746


---

[GitHub] flink pull request #4746: [FLINK-7657] [Table] Adding logic to convert RexLi...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4746#discussion_r142633894
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/CheckExpressionsTableSource.scala ---
    @@ -0,0 +1,65 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.utils
    +
    +import org.apache.flink.api.java.typeutils.RowTypeInfo
    +import org.apache.flink.api.java.{DataSet, ExecutionEnvironment}
    +import org.apache.flink.streaming.api.datastream.DataStream
    +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
    +import org.apache.flink.table.expressions.Expression
    +import org.apache.flink.table.sources.{BatchTableSource, FilterableTableSource, StreamTableSource}
    +import org.apache.flink.types.Row
    +
    +import java.util
    +import java.util.Collections
    +
    +import scala.collection.JavaConverters._
    +
    +/**
    +  * A table source that takes in assertions and applies them when applyPredicate is called.
    +  * Allows for testing that expression push downs are handled properly
    +  * @param typeInfo The type info.
    +  * @param assertions A set of assertions as a function reference
    +  * @param pushedDown Whether this has been pushed down/
    +  */
    +class CheckExpressionsTableSource(typeInfo: RowTypeInfo,
    --- End diff --
    
    Maybe it would be better to generalize `TestFilterableTableSource`.


---

[GitHub] flink pull request #4746: [FLINK-7657] [Table] Adding logic to convert RexLi...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4746#discussion_r142642598
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/literals.scala ---
    @@ -49,10 +50,51 @@ object Literal {
         case sqlTime: Time => Literal(sqlTime, SqlTimeTypeInfo.TIME)
         case sqlTimestamp: Timestamp => Literal(sqlTimestamp, SqlTimeTypeInfo.TIMESTAMP)
       }
    +
    +  private[flink] def apply(rexNode: RexLiteral): Literal = {
    +    val literalType = FlinkTypeFactory.toTypeInfo(rexNode.getType)
    +
    +    val literalValue = literalType match {
    +      // Chrono use cases.  We're force-adjusting the UTC-based epoch timestamps to a new
    +      // timestamp such that we get the same year/month/hour/day field values in the query's
    +      // timezone (UTC)
    +      case _@SqlTimeTypeInfo.DATE =>
    +        val rexValue = rexNode.getValueAs(classOf[DateString])
    +        val adjustedCal = adjustCalendar(rexValue.toCalendar, TimeZone.getDefault)
    +        new Date(adjustedCal.getTimeInMillis)
    --- End diff --
    
    This can be simplified to `new Date(rexValue.toString)`.


---

[GitHub] flink pull request #4746: [FLINK-7657] [Table] Adding logic to convert RexLi...

Posted by kmurra <gi...@git.apache.org>.
Github user kmurra commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4746#discussion_r142801455
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/literals.scala ---
    @@ -49,10 +50,51 @@ object Literal {
         case sqlTime: Time => Literal(sqlTime, SqlTimeTypeInfo.TIME)
         case sqlTimestamp: Timestamp => Literal(sqlTimestamp, SqlTimeTypeInfo.TIMESTAMP)
       }
    +
    +  private[flink] def apply(rexNode: RexLiteral): Literal = {
    +    val literalType = FlinkTypeFactory.toTypeInfo(rexNode.getType)
    +
    +    val literalValue = literalType match {
    +      // Chrono use cases.  We're force-adjusting the UTC-based epoch timestamps to a new
    +      // timestamp such that we get the same year/month/hour/day field values in the query's
    +      // timezone (UTC)
    +      case _@SqlTimeTypeInfo.DATE =>
    +        val rexValue = rexNode.getValueAs(classOf[DateString])
    +        val adjustedCal = adjustCalendar(rexValue.toCalendar, TimeZone.getDefault)
    +        new Date(adjustedCal.getTimeInMillis)
    --- End diff --
    
    Do you want me to use the deprecated constructor or leave this as-is?


---

[GitHub] flink pull request #4746: [FLINK-7657] [Table] Adding logic to convert RexLi...

Posted by kmurra <gi...@git.apache.org>.
Github user kmurra commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4746#discussion_r142800611
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/CheckExpressionsTableSource.scala ---
    @@ -0,0 +1,65 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.utils
    +
    +import org.apache.flink.api.java.typeutils.RowTypeInfo
    +import org.apache.flink.api.java.{DataSet, ExecutionEnvironment}
    +import org.apache.flink.streaming.api.datastream.DataStream
    +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
    +import org.apache.flink.table.expressions.Expression
    +import org.apache.flink.table.sources.{BatchTableSource, FilterableTableSource, StreamTableSource}
    +import org.apache.flink.types.Row
    +
    +import java.util
    +import java.util.Collections
    +
    +import scala.collection.JavaConverters._
    +
    +/**
    +  * A table source that takes in assertions and applies them when applyPredicate is called.
    +  * Allows for testing that expression push downs are handled properly
    +  * @param typeInfo The type info.
    +  * @param assertions A set of assertions as a function reference
    +  * @param pushedDown Whether this has been pushed down/
    +  */
    +class CheckExpressionsTableSource(typeInfo: RowTypeInfo,
    --- End diff --
    
    I'll look at doing that.  It was my initial approach, but when I saw the potential set of test cases that would impact, I decided to do something more conservative.  Still do-able.


---

[GitHub] flink pull request #4746: [FLINK-7657] [Table] Adding logic to convert RexLi...

Posted by kmurra <gi...@git.apache.org>.
Github user kmurra commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4746#discussion_r142801313
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/literals.scala ---
    @@ -49,10 +50,51 @@ object Literal {
         case sqlTime: Time => Literal(sqlTime, SqlTimeTypeInfo.TIME)
         case sqlTimestamp: Timestamp => Literal(sqlTimestamp, SqlTimeTypeInfo.TIMESTAMP)
       }
    +
    +  private[flink] def apply(rexNode: RexLiteral): Literal = {
    --- End diff --
    
    I'll commit to using your standards for the code-base.  However, allow me to voice a disagreement here:
    
    The Literal class does the conversion from the Literal back to the RexLiteral.  Having this logic specifically in RexNodeToExpressionConverted means the RexLiteral-to-Literal logic is physically split from the Literal-to-RexLiteral logic.  This makes it slightly easier for a contributor to make a change in one side of the conversion without accounting for the other.  In particular, the date adjustments here become harder to understand since the context is split between two different files. 



---

[GitHub] flink pull request #4746: [FLINK-7657] [Table] Adding logic to convert RexLi...

Posted by kmurra <gi...@git.apache.org>.
Github user kmurra commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4746#discussion_r142800631
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala ---
    @@ -200,10 +201,30 @@ case class BatchTableTestUtil() extends TableTestUtil {
         printTable(tableEnv.sqlQuery(query))
       }
     
    +  def verifyExpressionProjection(fields: Seq[(String, TypeInformation[_])],
    --- End diff --
    
    I will move this.


---

[GitHub] flink issue #4746: [FLINK-7657] [Table] Adding logic to convert RexLiteral t...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on the issue:

    https://github.com/apache/flink/pull/4746
  
    Thanks for the update @kmurra. I will go over the changes a last time and merge this.


---

[GitHub] flink pull request #4746: [FLINK-7657] [Table] Adding logic to convert RexLi...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4746#discussion_r142641600
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/literals.scala ---
    @@ -103,13 +148,21 @@ case class Literal(value: Any, resultType: TypeInformation[_]) extends LeafExpre
         }
       }
     
    -  private def dateToCalendar: Calendar = {
    +  /**
    +    * Convert a date value to a utc calendar.
    +    * <p>
    +    * We're assuming that when the user passes in a Date its constructed from fields,
    +    * such as days and hours, and that they want those fields to be in the same timezone as the
    +    * Calcite times, which are UTC.  Since we need to convert a Date to a Calendar, that means we
    +    * have to shift the epoch millisecond timestamp to account for the difference between UTC and
    +    * local time.
    +    * @return Get the Calendar value
    +    */
    +  private def valueAsUtcCalendar: Calendar = {
         val date = value.asInstanceOf[java.util.Date]
    -    val cal = Calendar.getInstance(Literal.GMT)
    -    val t = date.getTime
    -    // according to Calcite's SqlFunctions.internalToXXX methods
    -    cal.setTimeInMillis(t + TimeZone.getDefault.getOffset(t))
    --- End diff --
    
    Could you explain why `cal.setTimeInMillis(t + TimeZone.getDefault.getOffset(t))` did not produce the correct result? Calcite is also doing it like that.


---

[GitHub] flink pull request #4746: [FLINK-7657] [Table] Adding logic to convert RexLi...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4746#discussion_r142634197
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala ---
    @@ -200,10 +201,30 @@ case class BatchTableTestUtil() extends TableTestUtil {
         printTable(tableEnv.sqlQuery(query))
       }
     
    +  def verifyExpressionProjection(fields: Seq[(String, TypeInformation[_])],
    --- End diff --
    
    The `TableTestBase` class is intended for general test utilities. This looks very specific to me. I would add to to the `TableSourceTest` or a utils class there.


---