You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by twalthr <gi...@git.apache.org> on 2017/06/19 15:10:54 UTC

[GitHub] flink pull request #4144: [FLINK-6881] [FLINK-6896] [table] Creating a table...

GitHub user twalthr opened a pull request:

    https://github.com/apache/flink/pull/4144

    [FLINK-6881] [FLINK-6896] [table] Creating a table from a POJO and defining a time attribute fails

    This PR fixes several issues with POJOs and time attributes.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/twalthr/flink FLINK-6881_NEW

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/4144.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #4144
    
----
commit 0f5793159ca97d6d7e9f8a4b9fab3a3a2479fab8
Author: twalthr <tw...@apache.org>
Date:   2017-06-19T15:06:44Z

    [FLINK-6881] [FLINK-6896] [table] Creating a table from a POJO and defining a time attribute fails

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4144: [FLINK-6881] [FLINK-6896] [table] Creating a table...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/4144


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4144: [FLINK-6881] [FLINK-6896] [table] Creating a table...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4144#discussion_r122937741
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala ---
    @@ -1666,14 +1666,8 @@ class CodeGenerator(
         : GeneratedExpression = {
    --- End diff --
    
    We could. Maybe I have to check that again.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4144: [FLINK-6881] [FLINK-6896] [table] Creating a table from a...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on the issue:

    https://github.com/apache/flink/pull/4144
  
    I will merge this...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4144: [FLINK-6881] [FLINK-6896] [table] Creating a table...

Posted by wuchong <gi...@git.apache.org>.
Github user wuchong commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4144#discussion_r122924846
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala ---
    @@ -1666,14 +1666,8 @@ class CodeGenerator(
         : GeneratedExpression = {
    --- End diff --
    
    The `fieldMapping` is never used, should we remove it? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4144: [FLINK-6881] [FLINK-6896] [table] Creating a table...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4144#discussion_r122922340
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala ---
    @@ -437,39 +437,64 @@ abstract class StreamTableEnvironment(
         var rowtime: Option[(Int, String)] = None
         var proctime: Option[(Int, String)] = None
     
    -    exprs.zipWithIndex.foreach {
    -      case (RowtimeAttribute(reference@UnresolvedFieldReference(name)), idx) =>
    -        if (rowtime.isDefined) {
    +    def extractRowtime(idx: Int, name: String, origName: Option[String]): Unit = {
    +      if (rowtime.isDefined) {
    +        throw new TableException(
    +          "The rowtime attribute can only be defined once in a table schema.")
    +      } else {
    +        val mappedIdx = streamType match {
    +          case pti: PojoTypeInfo[_] =>
    +            pti.getFieldIndex(origName.getOrElse(name))
    --- End diff --
    
    Thanks @sunjincheng121. Good point.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4144: [FLINK-6881] [FLINK-6896] [table] Creating a table...

Posted by sunjincheng121 <gi...@git.apache.org>.
Github user sunjincheng121 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4144#discussion_r122852900
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala ---
    @@ -437,39 +437,64 @@ abstract class StreamTableEnvironment(
         var rowtime: Option[(Int, String)] = None
         var proctime: Option[(Int, String)] = None
     
    -    exprs.zipWithIndex.foreach {
    -      case (RowtimeAttribute(reference@UnresolvedFieldReference(name)), idx) =>
    -        if (rowtime.isDefined) {
    +    def extractRowtime(idx: Int, name: String, origName: Option[String]): Unit = {
    +      if (rowtime.isDefined) {
    +        throw new TableException(
    +          "The rowtime attribute can only be defined once in a table schema.")
    +      } else {
    +        val mappedIdx = streamType match {
    +          case pti: PojoTypeInfo[_] =>
    +            pti.getFieldIndex(origName.getOrElse(name))
    --- End diff --
    
    When user write a mistake row-time property name of POJO. e.g.:
    `(recordTimeA as rowtime).rowtime` --> correct name is `recordTime`.
    will get the exception as follows:
    ```
    Exception in thread "main" java.lang.ArrayIndexOutOfBoundsException: -1
    	at org.apache.flink.table.api.StreamTableEnvironment.org$apache$flink$table$api$StreamTableEnvironment$$extractRowtime$1(StreamTableEnvironment.scala:453)
    	at org.apache.flink.table.api.StreamTableEnvironment$$anonfun$validateAndExtractTimeAttributes$1.apply(StreamTableEnvironment.scala:484)
    ```
    I suggest that:
    1. May be we need check the row-time property name of POJO as early as possible. 
    2. We should check the index value must >= 0, If no so, we should throw a exception with clearly error information. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4144: [FLINK-6881] [FLINK-6896] [table] Creating a table from a...

Posted by sunjincheng121 <gi...@git.apache.org>.
Github user sunjincheng121 commented on the issue:

    https://github.com/apache/flink/pull/4144
  
    +1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4144: [FLINK-6881] [FLINK-6896] [table] Creating a table...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4144#discussion_r122829892
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala ---
    @@ -437,39 +437,64 @@ abstract class StreamTableEnvironment(
         var rowtime: Option[(Int, String)] = None
         var proctime: Option[(Int, String)] = None
     
    -    exprs.zipWithIndex.foreach {
    -      case (RowtimeAttribute(reference@UnresolvedFieldReference(name)), idx) =>
    -        if (rowtime.isDefined) {
    +    def extractRowtime(idx: Int, name: String, origName: Option[String]): Unit = {
    +      if (rowtime.isDefined) {
    +        throw new TableException(
    +          "The rowtime attribute can only be defined once in a table schema.")
    +      } else {
    +        val mappedIdx = streamType match {
    +          case pti: PojoTypeInfo[_] =>
    +            pti.getFieldIndex(origName.getOrElse(name))
    +          case _ => idx;
    +        }
    +        // check type of field that is replaced
    +        if (mappedIdx < fieldTypes.length &&
    +          !(TypeCheckUtils.isLong(fieldTypes(mappedIdx)) ||
    +            TypeCheckUtils.isTimePoint(fieldTypes(mappedIdx)))) {
               throw new TableException(
    -            "The rowtime attribute can only be defined once in a table schema.")
    -        } else {
    -          // check type of field that is replaced
    -          if (idx < fieldTypes.length &&
    -            !(TypeCheckUtils.isLong(fieldTypes(idx)) ||
    -              TypeCheckUtils.isTimePoint(fieldTypes(idx)))) {
    -            throw new TableException(
    -              "The rowtime attribute can only be replace a field with a valid time type, such as " +
    -                "Timestamp or Long.")
    -          }
    -          rowtime = Some(idx, name)
    +            s"The rowtime attribute can only be replace a field with a valid time type, " +
    --- End diff --
    
    remove "be" -> `"The rowtime attribute can only replace a field with ..."`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---