You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Rex Fenley <Re...@remind101.com> on 2020/12/02 23:58:07 UTC

Primary keys go missing after using TableFunction + leftOuterJoinLateral

Hello,

I have a TableFunction and wherever it is applied with a
leftOuterJoinLateral, my table loses any inference of there being a primary
key. I see this because all subsequent joins end up with "NoUniqueKey" when
I know a primary key of id should exist.

I'm wondering if this is expected behavior and if it's possible to tell a
table directly what the primary key should be?


To demonstrate my example:
My table function checks if an element of a certain type is in a string
array, and depending on whether or not it is there, it appends a column
with value true or false. For example, if array "fruits" which could
possibly contain orange, banana, apple, and watermelon on a row contains
only `["orange", "apple"]` then it will append `has_orange: true,
has_banana: false, has_apple: true, has_watermelon: false` as columns to
the row. This example is essentially the same as my code, outside of having
a much larger set of keys and not dealing with fruits.

Example code:

// table will always have pk id
def splatFruits(table: Table, columnPrefix: String): Table = {
return table
 .leftOuterJoinLateral(
   new SplatFruitsFunc()(
     $"fruits"
   ) as (s"${columnPrefix}_has_orange", s"${columnPrefix}_has_banana",
s"${columnPrefix}_has_apple", s"${columnPrefix}_has_watermelon")
 )
 .renameColumns($"fruits".as(s"${columnPrefix}_fruits"))
}

@FunctionHint(
  output = new DataTypeHint(
    "(has_orange BOOLEAN, has_banana BOOLEAN, has_apple BOOLEAN,
has_watermelon BOOLEAN)"
  )
)
class SplatFruitsFunc
    extends TableFunction[(Boolean, Boolean, Boolean, Boolean)] {

  def eval(fruits: Array[String]): Unit = {
    val hasOrange: java.lang.Boolean = fruits.contains("orange")
    val hasBanana: java.lang.Boolean = fruits.contains("banana")
    val hasApple: java.lang.Boolean = fruits.contains("apple")
    val hasWatermelon: java.lang.Boolean = fruits.contains("watermelon")
    collect(hasOrange, hasBanana, hasApple, hasWatermelon)
  }
}

Thanks!

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>

Re: Primary keys go missing after using TableFunction + leftOuterJoinLateral

Posted by Rex Fenley <Re...@remind101.com>.
Thanks, but how would I do this in the Table API? Map? I tried a map
function and it still wasn't able to infer the primary key unless I
artificially instead did an aggregate function over the pk.

On Wed, Dec 9, 2020 at 7:55 PM Jark Wu <im...@gmail.com> wrote:

> Could you use 4 scalar functions instead of UDTF and map function? For
> example;
>
> select *, hasOrange(fruits), hasBanana(fruits), hasApple(fruits),
> hasWatermelon(fruits)
> from T;
>
> I think this can preserve the primary key.
>
> Best,
> Jark
>
> On Thu, 3 Dec 2020 at 15:28, Rex Fenley <Re...@remind101.com> wrote:
>
>> It appears that even when I pass id through the map function and join
>> back with the original table, it does not seem to think that the id passed
>> through map is a unique key. Is there any way to solve this while still
>> preserving the primary key?
>>
>> On Wed, Dec 2, 2020 at 5:27 PM Rex Fenley <Re...@remind101.com> wrote:
>>
>>> Even odder, if I pull the constructor of the function into its own
>>> variable it "works" (though it appears that map only passes through the
>>> fields mapped over which means I'll need an additional join, though now I
>>> think I'm on the right path).
>>>
>>> I.e.
>>> def splatFruits(table: Table, columnPrefix: String): Table = {
>>>   val func = new SplatFruitsFunc()
>>>   return table
>>>     .map(func($"fruits"))
>>>     .as(
>>>       s"${columnPrefix}_has_orange",
>>>       s"${columnPrefix}_has_banana",
>>>       s"${columnPrefix}_has_apple",
>>>       s"${columnPrefix}_has_watermelon"
>>>    )
>>>    .renameColumns($"fruits".as(s"${columnPrefix}_fruits"))
>>> }
>>>
>>> ends up giving me the following error instead
>>> > org.apache.flink.client.program.ProgramInvocationException: The main
>>> method caused an error: Cannot resolve field [fruits], input field
>>> list:[prefix_has_orange, prefix_has_banana, prefix_has_apple,
>>> prefix_has_watermelon].
>>>
>>> which implies I'll need to join back to the original table like I was
>>> doing with the leftOuterJoinLateral originally I suppose.
>>>
>>>
>>> On Wed, Dec 2, 2020 at 5:15 PM Rex Fenley <Re...@remind101.com> wrote:
>>>
>>>> Looks like `as` needed to move outside of where it was before to fix
>>>> that error. Though now I'm receiving
>>>> >org.apache.flink.client.program.ProgramInvocationException: The main
>>>> method caused an error: Aliasing more fields than we actually have.
>>>>
>>>> Example code now:
>>>>
>>>> // table will always have pk id
>>>> def splatFruits(table: Table, columnPrefix: String): Table = {
>>>> return table
>>>>  .map(
>>>>    new SplatFruitsFunc()(
>>>>      $"fruits"
>>>>    )
>>>>  )
>>>>  .as(
>>>>   s"${columnPrefix}_has_orange",
>>>>   s"${columnPrefix}_has_banana",
>>>>   s"${columnPrefix}_has_apple",
>>>>   s"${columnPrefix}_has_watermelon"
>>>>  )
>>>>  .renameColumns($"fruits".as(s"${columnPrefix}_fruits"))
>>>> }
>>>>
>>>> class SplatFruitsFunc extends ScalarFunction {
>>>>   def eval(fruits: Array[String]): Row = {
>>>>     val hasOrange: java.lang.Boolean = fruits.contains("Orange")
>>>>     val hasBanana: java.lang.Boolean = fruits.contains("Banana")
>>>>     val hasApple: java.lang.Boolean = fruits.contains("Apple")
>>>>     val hasWatermelon: java.lang.Boolean = fruits.contains("Watermelon")
>>>>     Row.of(hasOrange, hasBanana, hasApple, hasWatermelon)
>>>>   }
>>>>
>>>>   override def getResultType(signature: Array[Class[_]]):
>>>> TypeInformation[_] =
>>>>     Types.ROW(Types.BOOLEAN, Types.BOOLEAN, Types.BOOLEAN,
>>>> Types.BOOLEAN)
>>>> }
>>>>
>>>> which afaict correctly follows the documentation.
>>>>
>>>> Anything here stand out?
>>>>
>>>> On Wed, Dec 2, 2020 at 4:55 PM Rex Fenley <Re...@remind101.com> wrote:
>>>>
>>>>> So I just instead tried changing SplatFruitsFunc to a ScalaFunction
>>>>> and leftOuterJoinLateral to a map and I'm receiving:
>>>>> > org.apache.flink.client.program.ProgramInvocationException: The main
>>>>> method caused an error: Only a scalar function can be used in the map
>>>>> operator.
>>>>> which seems odd because documentation says
>>>>>
>>>>> > Performs a map operation with a user-defined scalar function or
>>>>> built-in scalar function. The output will be flattened if the output type
>>>>> is a composite type.
>>>>>
>>>>>
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/tableApi.html#row-based-operations
>>>>>
>>>>> Shouldn't this work as an alternative?
>>>>>
>>>>> On Wed, Dec 2, 2020 at 3:58 PM Rex Fenley <Re...@remind101.com> wrote:
>>>>>
>>>>>> Hello,
>>>>>>
>>>>>> I have a TableFunction and wherever it is applied with a
>>>>>> leftOuterJoinLateral, my table loses any inference of there being a primary
>>>>>> key. I see this because all subsequent joins end up with "NoUniqueKey" when
>>>>>> I know a primary key of id should exist.
>>>>>>
>>>>>> I'm wondering if this is expected behavior and if it's possible to
>>>>>> tell a table directly what the primary key should be?
>>>>>>
>>>>>>
>>>>>> To demonstrate my example:
>>>>>> My table function checks if an element of a certain type is in a
>>>>>> string array, and depending on whether or not it is there, it appends a
>>>>>> column with value true or false. For example, if array "fruits" which could
>>>>>> possibly contain orange, banana, apple, and watermelon on a row contains
>>>>>> only `["orange", "apple"]` then it will append `has_orange: true,
>>>>>> has_banana: false, has_apple: true, has_watermelon: false` as columns to
>>>>>> the row. This example is essentially the same as my code, outside of having
>>>>>> a much larger set of keys and not dealing with fruits.
>>>>>>
>>>>>> Example code:
>>>>>>
>>>>>> // table will always have pk id
>>>>>> def splatFruits(table: Table, columnPrefix: String): Table = {
>>>>>> return table
>>>>>>  .leftOuterJoinLateral(
>>>>>>    new SplatFruitsFunc()(
>>>>>>      $"fruits"
>>>>>>    ) as (s"${columnPrefix}_has_orange",
>>>>>> s"${columnPrefix}_has_banana", s"${columnPrefix}_has_apple",
>>>>>> s"${columnPrefix}_has_watermelon")
>>>>>>  )
>>>>>>  .renameColumns($"fruits".as(s"${columnPrefix}_fruits"))
>>>>>> }
>>>>>>
>>>>>> @FunctionHint(
>>>>>>   output = new DataTypeHint(
>>>>>>     "(has_orange BOOLEAN, has_banana BOOLEAN, has_apple BOOLEAN,
>>>>>> has_watermelon BOOLEAN)"
>>>>>>   )
>>>>>> )
>>>>>> class SplatFruitsFunc
>>>>>>     extends TableFunction[(Boolean, Boolean, Boolean, Boolean)] {
>>>>>>
>>>>>>   def eval(fruits: Array[String]): Unit = {
>>>>>>     val hasOrange: java.lang.Boolean = fruits.contains("orange")
>>>>>>     val hasBanana: java.lang.Boolean = fruits.contains("banana")
>>>>>>     val hasApple: java.lang.Boolean = fruits.contains("apple")
>>>>>>     val hasWatermelon: java.lang.Boolean =
>>>>>> fruits.contains("watermelon")
>>>>>>     collect(hasOrange, hasBanana, hasApple, hasWatermelon)
>>>>>>   }
>>>>>> }
>>>>>>
>>>>>> Thanks!
>>>>>>
>>>>>> --
>>>>>>
>>>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>>>
>>>>>>
>>>>>> Remind.com <https://www.remind.com/> |  BLOG
>>>>>> <http://blog.remind.com/>  |  FOLLOW US
>>>>>> <https://twitter.com/remindhq>  |  LIKE US
>>>>>> <https://www.facebook.com/remindhq>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>>
>>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>>
>>>>>
>>>>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>>>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>>>>> <https://www.facebook.com/remindhq>
>>>>>
>>>>
>>>>
>>>> --
>>>>
>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>
>>>>
>>>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>>>> <https://www.facebook.com/remindhq>
>>>>
>>>
>>>
>>> --
>>>
>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>
>>>
>>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>>> <https://www.facebook.com/remindhq>
>>>
>>
>>
>> --
>>
>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>
>>
>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>> <https://www.facebook.com/remindhq>
>>
>

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>

Re: Primary keys go missing after using TableFunction + leftOuterJoinLateral

Posted by Jark Wu <im...@gmail.com>.
Could you use 4 scalar functions instead of UDTF and map function? For
example;

select *, hasOrange(fruits), hasBanana(fruits), hasApple(fruits),
hasWatermelon(fruits)
from T;

I think this can preserve the primary key.

Best,
Jark

On Thu, 3 Dec 2020 at 15:28, Rex Fenley <Re...@remind101.com> wrote:

> It appears that even when I pass id through the map function and join back
> with the original table, it does not seem to think that the id passed
> through map is a unique key. Is there any way to solve this while still
> preserving the primary key?
>
> On Wed, Dec 2, 2020 at 5:27 PM Rex Fenley <Re...@remind101.com> wrote:
>
>> Even odder, if I pull the constructor of the function into its own
>> variable it "works" (though it appears that map only passes through the
>> fields mapped over which means I'll need an additional join, though now I
>> think I'm on the right path).
>>
>> I.e.
>> def splatFruits(table: Table, columnPrefix: String): Table = {
>>   val func = new SplatFruitsFunc()
>>   return table
>>     .map(func($"fruits"))
>>     .as(
>>       s"${columnPrefix}_has_orange",
>>       s"${columnPrefix}_has_banana",
>>       s"${columnPrefix}_has_apple",
>>       s"${columnPrefix}_has_watermelon"
>>    )
>>    .renameColumns($"fruits".as(s"${columnPrefix}_fruits"))
>> }
>>
>> ends up giving me the following error instead
>> > org.apache.flink.client.program.ProgramInvocationException: The main
>> method caused an error: Cannot resolve field [fruits], input field
>> list:[prefix_has_orange, prefix_has_banana, prefix_has_apple,
>> prefix_has_watermelon].
>>
>> which implies I'll need to join back to the original table like I was
>> doing with the leftOuterJoinLateral originally I suppose.
>>
>>
>> On Wed, Dec 2, 2020 at 5:15 PM Rex Fenley <Re...@remind101.com> wrote:
>>
>>> Looks like `as` needed to move outside of where it was before to fix
>>> that error. Though now I'm receiving
>>> >org.apache.flink.client.program.ProgramInvocationException: The main
>>> method caused an error: Aliasing more fields than we actually have.
>>>
>>> Example code now:
>>>
>>> // table will always have pk id
>>> def splatFruits(table: Table, columnPrefix: String): Table = {
>>> return table
>>>  .map(
>>>    new SplatFruitsFunc()(
>>>      $"fruits"
>>>    )
>>>  )
>>>  .as(
>>>   s"${columnPrefix}_has_orange",
>>>   s"${columnPrefix}_has_banana",
>>>   s"${columnPrefix}_has_apple",
>>>   s"${columnPrefix}_has_watermelon"
>>>  )
>>>  .renameColumns($"fruits".as(s"${columnPrefix}_fruits"))
>>> }
>>>
>>> class SplatFruitsFunc extends ScalarFunction {
>>>   def eval(fruits: Array[String]): Row = {
>>>     val hasOrange: java.lang.Boolean = fruits.contains("Orange")
>>>     val hasBanana: java.lang.Boolean = fruits.contains("Banana")
>>>     val hasApple: java.lang.Boolean = fruits.contains("Apple")
>>>     val hasWatermelon: java.lang.Boolean = fruits.contains("Watermelon")
>>>     Row.of(hasOrange, hasBanana, hasApple, hasWatermelon)
>>>   }
>>>
>>>   override def getResultType(signature: Array[Class[_]]):
>>> TypeInformation[_] =
>>>     Types.ROW(Types.BOOLEAN, Types.BOOLEAN, Types.BOOLEAN, Types.BOOLEAN)
>>> }
>>>
>>> which afaict correctly follows the documentation.
>>>
>>> Anything here stand out?
>>>
>>> On Wed, Dec 2, 2020 at 4:55 PM Rex Fenley <Re...@remind101.com> wrote:
>>>
>>>> So I just instead tried changing SplatFruitsFunc to a ScalaFunction and
>>>> leftOuterJoinLateral to a map and I'm receiving:
>>>> > org.apache.flink.client.program.ProgramInvocationException: The main
>>>> method caused an error: Only a scalar function can be used in the map
>>>> operator.
>>>> which seems odd because documentation says
>>>>
>>>> > Performs a map operation with a user-defined scalar function or
>>>> built-in scalar function. The output will be flattened if the output type
>>>> is a composite type.
>>>>
>>>>
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/tableApi.html#row-based-operations
>>>>
>>>> Shouldn't this work as an alternative?
>>>>
>>>> On Wed, Dec 2, 2020 at 3:58 PM Rex Fenley <Re...@remind101.com> wrote:
>>>>
>>>>> Hello,
>>>>>
>>>>> I have a TableFunction and wherever it is applied with a
>>>>> leftOuterJoinLateral, my table loses any inference of there being a primary
>>>>> key. I see this because all subsequent joins end up with "NoUniqueKey" when
>>>>> I know a primary key of id should exist.
>>>>>
>>>>> I'm wondering if this is expected behavior and if it's possible to
>>>>> tell a table directly what the primary key should be?
>>>>>
>>>>>
>>>>> To demonstrate my example:
>>>>> My table function checks if an element of a certain type is in a
>>>>> string array, and depending on whether or not it is there, it appends a
>>>>> column with value true or false. For example, if array "fruits" which could
>>>>> possibly contain orange, banana, apple, and watermelon on a row contains
>>>>> only `["orange", "apple"]` then it will append `has_orange: true,
>>>>> has_banana: false, has_apple: true, has_watermelon: false` as columns to
>>>>> the row. This example is essentially the same as my code, outside of having
>>>>> a much larger set of keys and not dealing with fruits.
>>>>>
>>>>> Example code:
>>>>>
>>>>> // table will always have pk id
>>>>> def splatFruits(table: Table, columnPrefix: String): Table = {
>>>>> return table
>>>>>  .leftOuterJoinLateral(
>>>>>    new SplatFruitsFunc()(
>>>>>      $"fruits"
>>>>>    ) as (s"${columnPrefix}_has_orange", s"${columnPrefix}_has_banana",
>>>>> s"${columnPrefix}_has_apple", s"${columnPrefix}_has_watermelon")
>>>>>  )
>>>>>  .renameColumns($"fruits".as(s"${columnPrefix}_fruits"))
>>>>> }
>>>>>
>>>>> @FunctionHint(
>>>>>   output = new DataTypeHint(
>>>>>     "(has_orange BOOLEAN, has_banana BOOLEAN, has_apple BOOLEAN,
>>>>> has_watermelon BOOLEAN)"
>>>>>   )
>>>>> )
>>>>> class SplatFruitsFunc
>>>>>     extends TableFunction[(Boolean, Boolean, Boolean, Boolean)] {
>>>>>
>>>>>   def eval(fruits: Array[String]): Unit = {
>>>>>     val hasOrange: java.lang.Boolean = fruits.contains("orange")
>>>>>     val hasBanana: java.lang.Boolean = fruits.contains("banana")
>>>>>     val hasApple: java.lang.Boolean = fruits.contains("apple")
>>>>>     val hasWatermelon: java.lang.Boolean =
>>>>> fruits.contains("watermelon")
>>>>>     collect(hasOrange, hasBanana, hasApple, hasWatermelon)
>>>>>   }
>>>>> }
>>>>>
>>>>> Thanks!
>>>>>
>>>>> --
>>>>>
>>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>>
>>>>>
>>>>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>>>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>>>>> <https://www.facebook.com/remindhq>
>>>>>
>>>>
>>>>
>>>> --
>>>>
>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>
>>>>
>>>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>>>> <https://www.facebook.com/remindhq>
>>>>
>>>
>>>
>>> --
>>>
>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>
>>>
>>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>>> <https://www.facebook.com/remindhq>
>>>
>>
>>
>> --
>>
>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>
>>
>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>> <https://www.facebook.com/remindhq>
>>
>
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>  |
>  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
> <https://www.facebook.com/remindhq>
>

Re: Primary keys go missing after using TableFunction + leftOuterJoinLateral

Posted by Rex Fenley <Re...@remind101.com>.
It appears that even when I pass id through the map function and join back
with the original table, it does not seem to think that the id passed
through map is a unique key. Is there any way to solve this while still
preserving the primary key?

On Wed, Dec 2, 2020 at 5:27 PM Rex Fenley <Re...@remind101.com> wrote:

> Even odder, if I pull the constructor of the function into its own
> variable it "works" (though it appears that map only passes through the
> fields mapped over which means I'll need an additional join, though now I
> think I'm on the right path).
>
> I.e.
> def splatFruits(table: Table, columnPrefix: String): Table = {
>   val func = new SplatFruitsFunc()
>   return table
>     .map(func($"fruits"))
>     .as(
>       s"${columnPrefix}_has_orange",
>       s"${columnPrefix}_has_banana",
>       s"${columnPrefix}_has_apple",
>       s"${columnPrefix}_has_watermelon"
>    )
>    .renameColumns($"fruits".as(s"${columnPrefix}_fruits"))
> }
>
> ends up giving me the following error instead
> > org.apache.flink.client.program.ProgramInvocationException: The main
> method caused an error: Cannot resolve field [fruits], input field
> list:[prefix_has_orange, prefix_has_banana, prefix_has_apple,
> prefix_has_watermelon].
>
> which implies I'll need to join back to the original table like I was
> doing with the leftOuterJoinLateral originally I suppose.
>
>
> On Wed, Dec 2, 2020 at 5:15 PM Rex Fenley <Re...@remind101.com> wrote:
>
>> Looks like `as` needed to move outside of where it was before to fix that
>> error. Though now I'm receiving
>> >org.apache.flink.client.program.ProgramInvocationException: The main
>> method caused an error: Aliasing more fields than we actually have.
>>
>> Example code now:
>>
>> // table will always have pk id
>> def splatFruits(table: Table, columnPrefix: String): Table = {
>> return table
>>  .map(
>>    new SplatFruitsFunc()(
>>      $"fruits"
>>    )
>>  )
>>  .as(
>>   s"${columnPrefix}_has_orange",
>>   s"${columnPrefix}_has_banana",
>>   s"${columnPrefix}_has_apple",
>>   s"${columnPrefix}_has_watermelon"
>>  )
>>  .renameColumns($"fruits".as(s"${columnPrefix}_fruits"))
>> }
>>
>> class SplatFruitsFunc extends ScalarFunction {
>>   def eval(fruits: Array[String]): Row = {
>>     val hasOrange: java.lang.Boolean = fruits.contains("Orange")
>>     val hasBanana: java.lang.Boolean = fruits.contains("Banana")
>>     val hasApple: java.lang.Boolean = fruits.contains("Apple")
>>     val hasWatermelon: java.lang.Boolean = fruits.contains("Watermelon")
>>     Row.of(hasOrange, hasBanana, hasApple, hasWatermelon)
>>   }
>>
>>   override def getResultType(signature: Array[Class[_]]):
>> TypeInformation[_] =
>>     Types.ROW(Types.BOOLEAN, Types.BOOLEAN, Types.BOOLEAN, Types.BOOLEAN)
>> }
>>
>> which afaict correctly follows the documentation.
>>
>> Anything here stand out?
>>
>> On Wed, Dec 2, 2020 at 4:55 PM Rex Fenley <Re...@remind101.com> wrote:
>>
>>> So I just instead tried changing SplatFruitsFunc to a ScalaFunction and
>>> leftOuterJoinLateral to a map and I'm receiving:
>>> > org.apache.flink.client.program.ProgramInvocationException: The main
>>> method caused an error: Only a scalar function can be used in the map
>>> operator.
>>> which seems odd because documentation says
>>>
>>> > Performs a map operation with a user-defined scalar function or
>>> built-in scalar function. The output will be flattened if the output type
>>> is a composite type.
>>>
>>>
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/tableApi.html#row-based-operations
>>>
>>> Shouldn't this work as an alternative?
>>>
>>> On Wed, Dec 2, 2020 at 3:58 PM Rex Fenley <Re...@remind101.com> wrote:
>>>
>>>> Hello,
>>>>
>>>> I have a TableFunction and wherever it is applied with a
>>>> leftOuterJoinLateral, my table loses any inference of there being a primary
>>>> key. I see this because all subsequent joins end up with "NoUniqueKey" when
>>>> I know a primary key of id should exist.
>>>>
>>>> I'm wondering if this is expected behavior and if it's possible to tell
>>>> a table directly what the primary key should be?
>>>>
>>>>
>>>> To demonstrate my example:
>>>> My table function checks if an element of a certain type is in a string
>>>> array, and depending on whether or not it is there, it appends a column
>>>> with value true or false. For example, if array "fruits" which could
>>>> possibly contain orange, banana, apple, and watermelon on a row contains
>>>> only `["orange", "apple"]` then it will append `has_orange: true,
>>>> has_banana: false, has_apple: true, has_watermelon: false` as columns to
>>>> the row. This example is essentially the same as my code, outside of having
>>>> a much larger set of keys and not dealing with fruits.
>>>>
>>>> Example code:
>>>>
>>>> // table will always have pk id
>>>> def splatFruits(table: Table, columnPrefix: String): Table = {
>>>> return table
>>>>  .leftOuterJoinLateral(
>>>>    new SplatFruitsFunc()(
>>>>      $"fruits"
>>>>    ) as (s"${columnPrefix}_has_orange", s"${columnPrefix}_has_banana",
>>>> s"${columnPrefix}_has_apple", s"${columnPrefix}_has_watermelon")
>>>>  )
>>>>  .renameColumns($"fruits".as(s"${columnPrefix}_fruits"))
>>>> }
>>>>
>>>> @FunctionHint(
>>>>   output = new DataTypeHint(
>>>>     "(has_orange BOOLEAN, has_banana BOOLEAN, has_apple BOOLEAN,
>>>> has_watermelon BOOLEAN)"
>>>>   )
>>>> )
>>>> class SplatFruitsFunc
>>>>     extends TableFunction[(Boolean, Boolean, Boolean, Boolean)] {
>>>>
>>>>   def eval(fruits: Array[String]): Unit = {
>>>>     val hasOrange: java.lang.Boolean = fruits.contains("orange")
>>>>     val hasBanana: java.lang.Boolean = fruits.contains("banana")
>>>>     val hasApple: java.lang.Boolean = fruits.contains("apple")
>>>>     val hasWatermelon: java.lang.Boolean = fruits.contains("watermelon")
>>>>     collect(hasOrange, hasBanana, hasApple, hasWatermelon)
>>>>   }
>>>> }
>>>>
>>>> Thanks!
>>>>
>>>> --
>>>>
>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>
>>>>
>>>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>>>> <https://www.facebook.com/remindhq>
>>>>
>>>
>>>
>>> --
>>>
>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>
>>>
>>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>>> <https://www.facebook.com/remindhq>
>>>
>>
>>
>> --
>>
>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>
>>
>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>> <https://www.facebook.com/remindhq>
>>
>
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>  |
>  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
> <https://www.facebook.com/remindhq>
>


-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>

Re: Primary keys go missing after using TableFunction + leftOuterJoinLateral

Posted by Rex Fenley <Re...@remind101.com>.
Even odder, if I pull the constructor of the function into its own variable
it "works" (though it appears that map only passes through the fields
mapped over which means I'll need an additional join, though now I think
I'm on the right path).

I.e.
def splatFruits(table: Table, columnPrefix: String): Table = {
  val func = new SplatFruitsFunc()
  return table
    .map(func($"fruits"))
    .as(
      s"${columnPrefix}_has_orange",
      s"${columnPrefix}_has_banana",
      s"${columnPrefix}_has_apple",
      s"${columnPrefix}_has_watermelon"
   )
   .renameColumns($"fruits".as(s"${columnPrefix}_fruits"))
}

ends up giving me the following error instead
> org.apache.flink.client.program.ProgramInvocationException: The main
method caused an error: Cannot resolve field [fruits], input field
list:[prefix_has_orange, prefix_has_banana, prefix_has_apple,
prefix_has_watermelon].

which implies I'll need to join back to the original table like I was doing
with the leftOuterJoinLateral originally I suppose.


On Wed, Dec 2, 2020 at 5:15 PM Rex Fenley <Re...@remind101.com> wrote:

> Looks like `as` needed to move outside of where it was before to fix that
> error. Though now I'm receiving
> >org.apache.flink.client.program.ProgramInvocationException: The main
> method caused an error: Aliasing more fields than we actually have.
>
> Example code now:
>
> // table will always have pk id
> def splatFruits(table: Table, columnPrefix: String): Table = {
> return table
>  .map(
>    new SplatFruitsFunc()(
>      $"fruits"
>    )
>  )
>  .as(
>   s"${columnPrefix}_has_orange",
>   s"${columnPrefix}_has_banana",
>   s"${columnPrefix}_has_apple",
>   s"${columnPrefix}_has_watermelon"
>  )
>  .renameColumns($"fruits".as(s"${columnPrefix}_fruits"))
> }
>
> class SplatFruitsFunc extends ScalarFunction {
>   def eval(fruits: Array[String]): Row = {
>     val hasOrange: java.lang.Boolean = fruits.contains("Orange")
>     val hasBanana: java.lang.Boolean = fruits.contains("Banana")
>     val hasApple: java.lang.Boolean = fruits.contains("Apple")
>     val hasWatermelon: java.lang.Boolean = fruits.contains("Watermelon")
>     Row.of(hasOrange, hasBanana, hasApple, hasWatermelon)
>   }
>
>   override def getResultType(signature: Array[Class[_]]):
> TypeInformation[_] =
>     Types.ROW(Types.BOOLEAN, Types.BOOLEAN, Types.BOOLEAN, Types.BOOLEAN)
> }
>
> which afaict correctly follows the documentation.
>
> Anything here stand out?
>
> On Wed, Dec 2, 2020 at 4:55 PM Rex Fenley <Re...@remind101.com> wrote:
>
>> So I just instead tried changing SplatFruitsFunc to a ScalaFunction and
>> leftOuterJoinLateral to a map and I'm receiving:
>> > org.apache.flink.client.program.ProgramInvocationException: The main
>> method caused an error: Only a scalar function can be used in the map
>> operator.
>> which seems odd because documentation says
>>
>> > Performs a map operation with a user-defined scalar function or
>> built-in scalar function. The output will be flattened if the output type
>> is a composite type.
>>
>>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/tableApi.html#row-based-operations
>>
>> Shouldn't this work as an alternative?
>>
>> On Wed, Dec 2, 2020 at 3:58 PM Rex Fenley <Re...@remind101.com> wrote:
>>
>>> Hello,
>>>
>>> I have a TableFunction and wherever it is applied with a
>>> leftOuterJoinLateral, my table loses any inference of there being a primary
>>> key. I see this because all subsequent joins end up with "NoUniqueKey" when
>>> I know a primary key of id should exist.
>>>
>>> I'm wondering if this is expected behavior and if it's possible to tell
>>> a table directly what the primary key should be?
>>>
>>>
>>> To demonstrate my example:
>>> My table function checks if an element of a certain type is in a string
>>> array, and depending on whether or not it is there, it appends a column
>>> with value true or false. For example, if array "fruits" which could
>>> possibly contain orange, banana, apple, and watermelon on a row contains
>>> only `["orange", "apple"]` then it will append `has_orange: true,
>>> has_banana: false, has_apple: true, has_watermelon: false` as columns to
>>> the row. This example is essentially the same as my code, outside of having
>>> a much larger set of keys and not dealing with fruits.
>>>
>>> Example code:
>>>
>>> // table will always have pk id
>>> def splatFruits(table: Table, columnPrefix: String): Table = {
>>> return table
>>>  .leftOuterJoinLateral(
>>>    new SplatFruitsFunc()(
>>>      $"fruits"
>>>    ) as (s"${columnPrefix}_has_orange", s"${columnPrefix}_has_banana",
>>> s"${columnPrefix}_has_apple", s"${columnPrefix}_has_watermelon")
>>>  )
>>>  .renameColumns($"fruits".as(s"${columnPrefix}_fruits"))
>>> }
>>>
>>> @FunctionHint(
>>>   output = new DataTypeHint(
>>>     "(has_orange BOOLEAN, has_banana BOOLEAN, has_apple BOOLEAN,
>>> has_watermelon BOOLEAN)"
>>>   )
>>> )
>>> class SplatFruitsFunc
>>>     extends TableFunction[(Boolean, Boolean, Boolean, Boolean)] {
>>>
>>>   def eval(fruits: Array[String]): Unit = {
>>>     val hasOrange: java.lang.Boolean = fruits.contains("orange")
>>>     val hasBanana: java.lang.Boolean = fruits.contains("banana")
>>>     val hasApple: java.lang.Boolean = fruits.contains("apple")
>>>     val hasWatermelon: java.lang.Boolean = fruits.contains("watermelon")
>>>     collect(hasOrange, hasBanana, hasApple, hasWatermelon)
>>>   }
>>> }
>>>
>>> Thanks!
>>>
>>> --
>>>
>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>
>>>
>>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>>> <https://www.facebook.com/remindhq>
>>>
>>
>>
>> --
>>
>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>
>>
>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>> <https://www.facebook.com/remindhq>
>>
>
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>  |
>  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
> <https://www.facebook.com/remindhq>
>


-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>

Re: Primary keys go missing after using TableFunction + leftOuterJoinLateral

Posted by Rex Fenley <Re...@remind101.com>.
Looks like `as` needed to move outside of where it was before to fix that
error. Though now I'm receiving
>org.apache.flink.client.program.ProgramInvocationException: The main
method caused an error: Aliasing more fields than we actually have.

Example code now:

// table will always have pk id
def splatFruits(table: Table, columnPrefix: String): Table = {
return table
 .map(
   new SplatFruitsFunc()(
     $"fruits"
   )
 )
 .as(
  s"${columnPrefix}_has_orange",
  s"${columnPrefix}_has_banana",
  s"${columnPrefix}_has_apple",
  s"${columnPrefix}_has_watermelon"
 )
 .renameColumns($"fruits".as(s"${columnPrefix}_fruits"))
}

class SplatFruitsFunc extends ScalarFunction {
  def eval(fruits: Array[String]): Row = {
    val hasOrange: java.lang.Boolean = fruits.contains("Orange")
    val hasBanana: java.lang.Boolean = fruits.contains("Banana")
    val hasApple: java.lang.Boolean = fruits.contains("Apple")
    val hasWatermelon: java.lang.Boolean = fruits.contains("Watermelon")
    Row.of(hasOrange, hasBanana, hasApple, hasWatermelon)
  }

  override def getResultType(signature: Array[Class[_]]):
TypeInformation[_] =
    Types.ROW(Types.BOOLEAN, Types.BOOLEAN, Types.BOOLEAN, Types.BOOLEAN)
}

which afaict correctly follows the documentation.

Anything here stand out?

On Wed, Dec 2, 2020 at 4:55 PM Rex Fenley <Re...@remind101.com> wrote:

> So I just instead tried changing SplatFruitsFunc to a ScalaFunction and
> leftOuterJoinLateral to a map and I'm receiving:
> > org.apache.flink.client.program.ProgramInvocationException: The main
> method caused an error: Only a scalar function can be used in the map
> operator.
> which seems odd because documentation says
>
> > Performs a map operation with a user-defined scalar function or built-in
> scalar function. The output will be flattened if the output type is a
> composite type.
>
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/tableApi.html#row-based-operations
>
> Shouldn't this work as an alternative?
>
> On Wed, Dec 2, 2020 at 3:58 PM Rex Fenley <Re...@remind101.com> wrote:
>
>> Hello,
>>
>> I have a TableFunction and wherever it is applied with a
>> leftOuterJoinLateral, my table loses any inference of there being a primary
>> key. I see this because all subsequent joins end up with "NoUniqueKey" when
>> I know a primary key of id should exist.
>>
>> I'm wondering if this is expected behavior and if it's possible to tell a
>> table directly what the primary key should be?
>>
>>
>> To demonstrate my example:
>> My table function checks if an element of a certain type is in a string
>> array, and depending on whether or not it is there, it appends a column
>> with value true or false. For example, if array "fruits" which could
>> possibly contain orange, banana, apple, and watermelon on a row contains
>> only `["orange", "apple"]` then it will append `has_orange: true,
>> has_banana: false, has_apple: true, has_watermelon: false` as columns to
>> the row. This example is essentially the same as my code, outside of having
>> a much larger set of keys and not dealing with fruits.
>>
>> Example code:
>>
>> // table will always have pk id
>> def splatFruits(table: Table, columnPrefix: String): Table = {
>> return table
>>  .leftOuterJoinLateral(
>>    new SplatFruitsFunc()(
>>      $"fruits"
>>    ) as (s"${columnPrefix}_has_orange", s"${columnPrefix}_has_banana",
>> s"${columnPrefix}_has_apple", s"${columnPrefix}_has_watermelon")
>>  )
>>  .renameColumns($"fruits".as(s"${columnPrefix}_fruits"))
>> }
>>
>> @FunctionHint(
>>   output = new DataTypeHint(
>>     "(has_orange BOOLEAN, has_banana BOOLEAN, has_apple BOOLEAN,
>> has_watermelon BOOLEAN)"
>>   )
>> )
>> class SplatFruitsFunc
>>     extends TableFunction[(Boolean, Boolean, Boolean, Boolean)] {
>>
>>   def eval(fruits: Array[String]): Unit = {
>>     val hasOrange: java.lang.Boolean = fruits.contains("orange")
>>     val hasBanana: java.lang.Boolean = fruits.contains("banana")
>>     val hasApple: java.lang.Boolean = fruits.contains("apple")
>>     val hasWatermelon: java.lang.Boolean = fruits.contains("watermelon")
>>     collect(hasOrange, hasBanana, hasApple, hasWatermelon)
>>   }
>> }
>>
>> Thanks!
>>
>> --
>>
>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>
>>
>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>> <https://www.facebook.com/remindhq>
>>
>
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>  |
>  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
> <https://www.facebook.com/remindhq>
>


-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>

Re: Primary keys go missing after using TableFunction + leftOuterJoinLateral

Posted by Rex Fenley <Re...@remind101.com>.
So I just instead tried changing SplatFruitsFunc to a ScalaFunction and
leftOuterJoinLateral to a map and I'm receiving:
> org.apache.flink.client.program.ProgramInvocationException: The main
method caused an error: Only a scalar function can be used in the map
operator.
which seems odd because documentation says

> Performs a map operation with a user-defined scalar function or built-in
scalar function. The output will be flattened if the output type is a
composite type.

https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/tableApi.html#row-based-operations

Shouldn't this work as an alternative?

On Wed, Dec 2, 2020 at 3:58 PM Rex Fenley <Re...@remind101.com> wrote:

> Hello,
>
> I have a TableFunction and wherever it is applied with a
> leftOuterJoinLateral, my table loses any inference of there being a primary
> key. I see this because all subsequent joins end up with "NoUniqueKey" when
> I know a primary key of id should exist.
>
> I'm wondering if this is expected behavior and if it's possible to tell a
> table directly what the primary key should be?
>
>
> To demonstrate my example:
> My table function checks if an element of a certain type is in a string
> array, and depending on whether or not it is there, it appends a column
> with value true or false. For example, if array "fruits" which could
> possibly contain orange, banana, apple, and watermelon on a row contains
> only `["orange", "apple"]` then it will append `has_orange: true,
> has_banana: false, has_apple: true, has_watermelon: false` as columns to
> the row. This example is essentially the same as my code, outside of having
> a much larger set of keys and not dealing with fruits.
>
> Example code:
>
> // table will always have pk id
> def splatFruits(table: Table, columnPrefix: String): Table = {
> return table
>  .leftOuterJoinLateral(
>    new SplatFruitsFunc()(
>      $"fruits"
>    ) as (s"${columnPrefix}_has_orange", s"${columnPrefix}_has_banana",
> s"${columnPrefix}_has_apple", s"${columnPrefix}_has_watermelon")
>  )
>  .renameColumns($"fruits".as(s"${columnPrefix}_fruits"))
> }
>
> @FunctionHint(
>   output = new DataTypeHint(
>     "(has_orange BOOLEAN, has_banana BOOLEAN, has_apple BOOLEAN,
> has_watermelon BOOLEAN)"
>   )
> )
> class SplatFruitsFunc
>     extends TableFunction[(Boolean, Boolean, Boolean, Boolean)] {
>
>   def eval(fruits: Array[String]): Unit = {
>     val hasOrange: java.lang.Boolean = fruits.contains("orange")
>     val hasBanana: java.lang.Boolean = fruits.contains("banana")
>     val hasApple: java.lang.Boolean = fruits.contains("apple")
>     val hasWatermelon: java.lang.Boolean = fruits.contains("watermelon")
>     collect(hasOrange, hasBanana, hasApple, hasWatermelon)
>   }
> }
>
> Thanks!
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>  |
>  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
> <https://www.facebook.com/remindhq>
>


-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>