You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Rex Fenley <Re...@remind101.com> on 2020/11/02 05:50:02 UTC

Re: LEGACY('STRUCTURED_TYPE' to pojo

Maybe this is related to this issue?
https://issues.apache.org/jira/browse/FLINK-17683

On Fri, Oct 30, 2020 at 8:43 PM Rex Fenley <Re...@remind101.com> wrote:

> Correction, I'm using Scala case classes not strictly Java POJOs just to
> be clear.
>
> On Fri, Oct 30, 2020 at 7:56 PM Rex Fenley <Re...@remind101.com> wrote:
>
>> Hello,
>>
>> I keep running into trouble moving between DataStream and SQL with POJOs
>> because my nested POJOs turn into LEGACY('STRUCTURED_TYPE', is there any
>> way to convert them back to POJOs in Flink when converting a SQL Table back
>> to a DataStream?
>>
>> Thanks!
>>
>> --
>>
>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>
>>
>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>> <https://www.facebook.com/remindhq>
>>
>
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>  |
>  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
> <https://www.facebook.com/remindhq>
>


-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>

Re: LEGACY('STRUCTURED_TYPE' to pojo

Posted by Timo Walther <tw...@apache.org>.
It was planned for 1.12 but didn't make it. 1.13 should fully implement 
FLIP-136. I just created issues to monitor the progress:

https://issues.apache.org/jira/browse/FLINK-19976

Regards,
Timo

On 04.11.20 18:43, Rex Fenley wrote:
> Thank you for the info!
> 
> Is there a timetable for when the next version with this change might 
> release?
> 
> On Wed, Nov 4, 2020 at 2:44 AM Timo Walther <twalthr@apache.org 
> <ma...@apache.org>> wrote:
> 
>     Hi Rex,
> 
>     sorry for the late reply. POJOs will have much better support in the
>     upcoming Flink versions because they have been fully integrated with
>     the
>     new table type system mentioned in FLIP-37 [1] (e.g. support for
>     immutable POJOs and nested DataTypeHints etc).
> 
>     For queries, scalar, and table functions you can already use the full
>     POJOs within the table ecosystem.
> 
>     However, the only missing piece is the new translation of POJOs from
>     Table API to DataStream API. This will be fixed in FLIP-136 [2]. Until
>     then I would recommend to either use `Row` as the output of the table
>     API or try to use a scalar function before that maps to the desired
>     data
>     structure.
> 
>     I hope this helps a bit.
> 
>     Regards,
>     Timo
> 
>     [1]
>     https://cwiki.apache.org/confluence/display/FLINK/FLIP-37%3A+Rework+of+the+Table+API+Type+System
>     [2]
>     https://cwiki.apache.org/confluence/display/FLINK/FLIP-136%3A++Improve+interoperability+between+DataStream+and+Table+API
> 
>     On 02.11.20 21:44, Rex Fenley wrote:
>      > My jobs normally use the blink planner, I noticed with this test
>     that
>      > may not be the case.
>      >
>      > On Mon, Nov 2, 2020 at 12:38 PM Rex Fenley <Rex@remind101.com
>     <ma...@remind101.com>
>      > <mailto:Rex@remind101.com <ma...@remind101.com>>> wrote:
>      >
>      >     Flink 1.11.2 with Scala 2.12
>      >
>      >     Error:
>      >     [info] JobScalaTest:
>      >     [info] - dummy *** FAILED ***
>      >     [info]   org.apache.flink.table.api.ValidationException:
>     Field types
>      >     of query result and registered TableSink  do not match.
>      >     [info] Query schema: [user: BIGINT, product: ROW<`name`
>      >     VARCHAR(2147483647), `id` BIGINT>, amount: INT]
>      >     [info] Sink schema: [user: BIGINT, product:
>      >     LEGACY('STRUCTURED_TYPE', 'ANY<ProductItem,
>      >   
>       rO0ABXNyAB1Kb2JTY2FsYVRlc3QkJGFub24kOSQkYW5vbiQxMODHL_EXRquJAgAAeHIANm9yZy5hcGFjaGUuZmxpbmsuYXBpLnNjYWxhLnR5cGV1dGlscy5DYXNlQ2xhc3NUeXBlSW5mb46d1-iqONqQAgAMTAARUEFUVEVSTl9JTlRfRklFTER0ABlMamF2YS91dGlsL3JlZ2V4L1BhdHRlcm47TAAVUEFUVEVSTl9ORVNURURfRklFTERTcQB-AAJMAB5QQVRURVJOX05FU1RFRF9GSUVMRFNfV0lMRENBUkRxAH4AAkwAC1JFR0VYX0ZJRUxEdAASTGphdmEvbGFuZy9TdHJpbmc7TAAPUkVHRVhfSU5UX0ZJRUxEcQB-AANMABNSRUdFWF9ORVNURURfRklFTERTcQB-AANMABxSRUdFWF9ORVNURURfRklFTERTX1dJTERDQVJEcQB-AANMAA9SRUdFWF9TVFJfRklFTERxAH4AA0wABWNsYXp6dAARTGphdmEvbGFuZy9DbGFzcztMAApmaWVsZE5hbWVzdAAWTHNjYWxhL2NvbGxlY3Rpb24vU2VxO0wACmZpZWxkVHlwZXNxAH4ABVsAEnR5cGVQYXJhbVR5cGVJbmZvc3QAN1tMb3JnL2FwYWNoZS9mbGluay9hcGkvY29tbW9uL3R5cGVpbmZvL1R5cGVJbmZvcm1hdGlvbjt4cgA1b3JnLmFwYWNoZS5mbGluay5hcGkuamF2YS50eXBldXRpbHMuVHVwbGVUeXBlSW5mb0Jhc2UAAAAAAAAAAQIAAkkAC3RvdGFsRmllbGRzWwAFdHlwZXNxAH4ABnhyADNvcmcuYXBhY2hlLmZsaW5rLmFwaS5jb21tb24udHlwZXV0aWxzLkNvbXBvc2l0ZVR5cGUAAAAAAAAAAQIAAUwACXR5cGVDbGFzc3EAfgAEeHIANG9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBlaW5mby5UeXBlSW5mb3JtYXRpb26UjchIurN66wIAAHhwdnIAC1Byb2R1Y3RJdGVtBwFtETzcflcCAAJKAAJpZEwABG5hbWVxAH4AA3hwAAAAAnVyADdbTG9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBlaW5mby5UeXBlSW5mb3JtYXRpb247uKBspBqaFLYCAAB4cAAAAAJzcgAyb3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGVpbmZvLkJhc2ljVHlwZUluZm_6BPCCpWndBgIABEwABWNsYXp6cQB-AARMAA9jb21wYXJhdG9yQ2xhc3NxAH4ABFsAF3Bvc3NpYmxlQ2FzdFRhcmdldFR5cGVzdAASW0xqYXZhL2xhbmcvQ2xhc3M7TAAKc2VyaWFsaXplcnQANkxvcmcvYXBhY2hlL2ZsaW5rL2FwaS9jb21tb24vdHlwZXV0aWxzL1R5cGVTZXJpYWxpemVyO3hxAH4ACXZyABBqYXZhLmxhbmcuU3RyaW5noPCkOHo7s0ICAAB4cHZyADtvcmcuYXBhY2hlLmZsaW5rLmFwaS5jb21tb24udHlwZXV0aWxzLmJhc2UuU3RyaW5nQ29tcGFyYXRvcgAAAAAAAAABAgAAeHIAPm9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBldXRpbHMuYmFzZS5CYXNpY1R5cGVDb21wYXJhdG9yAAAAAAAAAAECAAJaABNhc2NlbmRpbmdDb21wYXJpc29uWwALY29tcGFyYXRvcnN0ADdbTG9yZy9hcGFjaGUvZmxpbmsvYXBpL2NvbW1vbi90eXBldXRpbHMvVHlwZUNvbXBhcmF0b3I7eHIANG9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBldXRpbHMuVHlwZUNvbXBhcmF0b3IAAAAAAAAAAQIAAHhwdXIAEltMamF2YS5sYW5nLkNsYXNzO6sW167LzVqZAgAAeHAAAAAAc3IAO29yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBldXRpbHMuYmFzZS5TdHJpbmdTZXJpYWxpemVyAAAAAAAAAAECAAB4cgBCb3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGV1dGlscy5iYXNlLlR5cGVTZXJpYWxpemVyU2luZ2xldG9ueamHqscud0UCAAB4cgA0b3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGV1dGlscy5UeXBlU2VyaWFsaXplcgAAAAAAAAABAgAAeHBzcgA0b3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGVpbmZvLkludGVnZXJUeXBlSW5mb5AFxEVpQEqVAgAAeHIANG9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBlaW5mby5OdW1lcmljVHlwZUluZm-tmMZzMAKMFgIAAHhxAH4AD3ZyAA5qYXZhLmxhbmcuTG9uZzuL5JDMjyPfAgABSgAFdmFsdWV4cgAQamF2YS5sYW5nLk51bWJlcoaslR0LlOCLAgAAeHB2cgA5b3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGV1dGlscy5iYXNlLkxvbmdDb21wYXJhdG9yAAAAAAAAAAECAAB4cQB-ABZ1cQB-ABoAAAADdnIAD2phdmEubGFuZy5GbG9hdNrtyaLbPPDsAgABRgAFdmFsdWV4cQB-ACR2cgAQamF2YS5sYW5nLkRvdWJsZYCzwkopa_sEAgABRAAFdmFsdWV4cQB-ACR2cgATamF2YS5sYW5nLkNoYXJhY3RlcjSLR9lrGiZ4AgABQwAFdmFsdWV4cHNyADlvcmcuYXBhY2hlLmZsaW5rLmFwaS5jb21tb24udHlwZXV0aWxzLmJhc2UuTG9uZ1NlcmlhbGl6ZXIAAAAAAAAAAQIAAHhxAH4AHXNyABdqYXZhLnV0aWwucmVnZXguUGF0dGVybkZn1WtuSQINAgACSQAFZmxhZ3NMAAdwYXR0ZXJucQB-AAN4cAAAAAB0AAZbMC05XStzcQB-ADEAAAAAdAAwKFtccHtMfV9cJF1bXHB7TH1ccHtEaWdpdH1fXCRdKnxbMC05XSspKFwuKC4rKSk_c3EAfgAxAAAAAHQANihbXHB7TH1fXCRdW1xwe0x9XHB7RGlnaXR9X1wkXSp8WzAtOV0rKShcLiguKykpP3xcKnxcX3QAJVtccHtMfV9cJF1bXHB7TH1ccHtEaWdpdH1fXCRdKnxbMC05XStxAH4AM3EAfgA1cQB-ADd0AB5bXHB7TH1fXCRdW1xwe0x9XHB7RGlnaXR9X1wkXSpxAH4ADHNyADJzY2FsYS5jb2xsZWN0aW9uLmltbXV0YWJsZS5MaXN0JFNlcmlhbGl6YXRpb25Qcm94eQAAAAAAAAABAwAAeHB0AARuYW1ldAACaWRzcgAsc2NhbGEuY29sbGVjdGlvbi5pbW11dGFibGUuTGlzdFNlcmlhbGl6ZUVuZCSKXGNb91MLbQIAAHhweHNxAH4AOnEAfgAScQB-ACJxAH4AP3h1cQB-AA0AAAAA>'),
>      >     amount: INT]
>      >     [info]   at
>      >   
>       org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSchemaAndApplyImplicitCast(TableSinkUtils.scala:103)
>      >     [info]   at
>      >   
>       org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:260)
>      >     [info]   at
>      >   
>       org.apache.flink.table.planner.delegation.PlannerBase.$anonfun$translate$1(PlannerBase.scala:163)
>      >     [info]   at
>      >   
>       scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:285)
>      >     [info]   at scala.collection.Iterator.foreach(Iterator.scala:943)
>      >     [info]   at
>     scala.collection.Iterator.foreach$(Iterator.scala:943)
>      >     [info]   at
>      >     scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
>      >     [info]   at
>     scala.collection.IterableLike.foreach(IterableLike.scala:74)
>      >     [info]   at
>      >     scala.collection.IterableLike.foreach$(IterableLike.scala:73)
>      >     [info]   at
>     scala.collection.AbstractIterable.foreach(Iterable.scala:56)
>      >
>      >     Code:
>      >     import com.remind.graph.people.PeopleJobScala
>      >
>      >     import org.scalatest.funsuite._
>      >     import org.scalatest.BeforeAndAfter
>      >
>      >     import org.apache.flink.streaming.api.scala.{
>      >     DataStream,
>      >     StreamExecutionEnvironment
>      >     }
>      >     import org.apache.flink.streaming.util.TestStreamEnvironment
>      >     import org.apache.flink.table.runtime.util._
>      >     import org.apache.flink.test.util.AbstractTestBase
>      >     import org.apache.flink.table.api._
>      >     import org.apache.flink.table.api.bridge.scala._
>      >     import org.apache.flink.streaming.api.scala._
>      >     import
>     org.apache.flink.streaming.api.functions.sink.RichSinkFunction
>      >     import
>     org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
>      >     import org.apache.flink.api.common.state.ListState
>      >     import
>     org.apache.flink.runtime.state.FunctionInitializationContext
>      >     import org.apache.flink.api.common.state.ListStateDescriptor
>      >     import org.apache.flink.runtime.state.FunctionSnapshotContext
>      >     import org.apache.flink.types.Row
>      >
>      >     import java.io.Serializable;
>      >     import java.sql.Timestamp;
>      >     import java.text.SimpleDateFormat
>      >     import java.util.concurrent.atomic.AtomicInteger
>      >     import java.{util => ju}
>      >
>      >     import scala.collection.JavaConverters._
>      >     import scala.collection.mutable
>      >     import scala.util.Try
>      >
>      >     caseclassOrder(user: Long, product: ProductItem, amount: Int) {
>      >     defthis() {
>      >     this(0, null, 0)
>      >     }
>      >
>      >     overridedeftoString(): String = {
>      >     return"Order{"+
>      >     "user="+ user +
>      >     ", product='"+ product + '\''+
>      >     ", amount="+ amount +
>      >     '}';
>      >     }
>      >     }
>      >
>      >     caseclassProductItem(name: String, id: Long) {
>      >     defthis() {
>      >     this(null, 0)
>      >     }
>      >
>      >     overridedeftoString(): String = {
>      >     return"Product{"+
>      >     "name='"+ name + '\''+
>      >     ", id="+ id +
>      >     '}';
>      >     }
>      >     }
>      >
>      >     classJobScalaTest extendsAnyFunSuitewithBeforeAndAfter{
>      >     varenv: StreamExecutionEnvironment = _
>      >     vartEnv: StreamTableEnvironment = _
>      >
>      >     before {
>      >     this.env = StreamExecutionEnvironment.getExecutionEnvironment
>      >     this.env.setParallelism(2)
>      >     this.env.getConfig.enableObjectReuse()
>      >     valsetting =
>     EnvironmentSettings.newInstance().inStreamingMode().build()
>      >     this.tEnv = StreamTableEnvironment.create(env, setting)
>      >     }
>      >
>      >     after {
>      >     StreamTestSink.clear()
>      >     // TestValuesTableFactory.clearAllData()
>      >     }
>      >
>      >     defdateFrom(stringDate: String): java.sql.Date = {
>      >     valdate = newSimpleDateFormat("dd/MM/yyyy")
>      >     .parse(stringDate)
>      >     returnnewjava.sql.Date(date.getTime())
>      >     }
>      >
>      >     defprintTable(table: Table) = {
>      >     println(table)
>      >     table.printSchema()
>      >     println(table.getSchema().getFieldNames().mkString(", "))
>      >     }
>      >
>      >     defprintDataStream(dataStream: DataStream[_]) = {
>      >     println(dataStream)
>      >     println(dataStream.dataType)
>      >     }
>      >
>      >     test("dummy") {
>      >     valorderA: DataStream[Order] = this.env.fromCollection(
>      >     Seq(
>      >     newOrder(1L, newProductItem("beer", 10L), 3),
>      >     newOrder(1L, newProductItem("diaper", 11L), 4),
>      >     newOrder(3L, newProductItem("rubber", 12L), 2)
>      >     )
>      >     )
>      >
>      >     valorderB: DataStream[Order] = this.env.fromCollection(
>      >     Seq(
>      >     newOrder(2L, newProductItem("pen", 13L), 3),
>      >     newOrder(2L, newProductItem("rubber", 12L), 3),
>      >     newOrder(4L, newProductItem("beer", 10L), 1)
>      >     )
>      >     )
>      >
>      >     println(orderB)
>      >     println(orderB.dataType)
>      >
>      >     // convert DataStream to Table
>      >     valtableA =
>      >     this.tEnv.fromDataStream(orderA, 'user, 'product, 'amount)
>      >     println(tableA)
>      >     tableA.printSchema()
>      >     println(tableA.getSchema().getFieldNames().mkString(", "))
>      >     // register DataStream as Table
>      >     this.tEnv.createTemporaryView("OrderB", orderB, 'user, 'product,
>      >     'amount)
>      >
>      >     // union the two tables
>      >     valresult = this.tEnv.sqlQuery(s"""
>      >     |SELECT * FROM $tableAWHERE amount > 2
>      >     |UNION ALL
>      >     |SELECT * FROM OrderB WHERE amount < 2
>      >     """.stripMargin)
>      >
>      >     valsink = newStringSink[Order]()
>      >     result.toAppendStream[Order].addSink(sink)
>      >
>      >     this.env.execute()
>      >
>      >     valexpected = List(
>      >     "Order{user=1, product='Product{name='beer', id=10}', amount=3}",
>      >     "Order{user=1, product='Product{name='diaper', id=11}',
>     amount=4}",
>      >     "Order{user=4, product='Product{name='beer', id=10}', amount=1}"
>      >     )
>      >     valresults = sink.getResults.sorted
>      >     println("results")
>      >     println(results)
>      >     assert(expected.sorted === results)
>      >     }
>      >     }
>      >
>      >     /**
>      >     * Taken from:
>      >
>     https://github.com/apache/flink/blob/release-1.11.2/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamTestSink.scala
>      >     * There's a whole bunch of other test sinks to choose from there.
>      >     */
>      >     objectStreamTestSink {
>      >
>      >     validCounter: AtomicInteger = newAtomicInteger(0)
>      >
>      >     valglobalResults =
>      >     mutable.HashMap.empty[Int, mutable.Map[Int,
>      >     mutable.ArrayBuffer[String]]]
>      >     valglobalRetractResults =
>      >     mutable.HashMap.empty[Int, mutable.Map[Int,
>      >     mutable.ArrayBuffer[String]]]
>      >     valglobalUpsertResults =
>      >     mutable.HashMap.empty[Int, mutable.Map[Int, mutable.Map[String,
>      >     String]]]
>      >
>      >     defgetNewSinkId: Int = {
>      >     validx = idCounter.getAndIncrement()
>      >     this.synchronized{
>      >     globalResults.put(
>      >     idx,
>      >     mutable.HashMap.empty[Int, mutable.ArrayBuffer[String]]
>      >     )
>      >     globalRetractResults.put(
>      >     idx,
>      >     mutable.HashMap.empty[Int, mutable.ArrayBuffer[String]]
>      >     )
>      >     globalUpsertResults.put(
>      >     idx,
>      >     mutable.HashMap.empty[Int, mutable.Map[String, String]]
>      >     )
>      >     }
>      >     idx
>      >     }
>      >
>      >     defclear(): Unit = {
>      >     globalResults.clear()
>      >     globalRetractResults.clear()
>      >     globalUpsertResults.clear()
>      >     }
>      >     }
>      >
>      >     abstractclassAbstractExactlyOnceSink[T]
>      >     extendsRichSinkFunction[T]
>      >     withCheckpointedFunction{
>      >     protectedvarresultsState: ListState[String] = _
>      >     protectedvarlocalResults: mutable.ArrayBuffer[String] = _
>      >     protectedvalidx: Int = StreamTestSink.getNewSinkId
>      >
>      >     protectedvarglobalResults: mutable.Map[Int,
>      >     mutable.ArrayBuffer[String]] = _
>      >     protectedvarglobalRetractResults
>      >     : mutable.Map[Int, mutable.ArrayBuffer[String]] = _
>      >     protectedvarglobalUpsertResults
>      >     : mutable.Map[Int, mutable.Map[String, String]] = _
>      >
>      >     defisInitialized: Boolean = globalResults != null
>      >
>      >     overridedefinitializeState(context:
>     FunctionInitializationContext):
>      >     Unit = {
>      >     resultsState = context.getOperatorStateStore
>      >     .getListState(
>      >     newListStateDescriptor[String]("sink-results", Types.STRING)
>      >     )
>      >
>      >     localResults = mutable.ArrayBuffer.empty[String]
>      >
>      >     if(context.isRestored) {
>      >     for(value <- resultsState.get().asScala) {
>      >     localResults += value
>      >     }
>      >     }
>      >
>      >     valtaskId = getRuntimeContext.getIndexOfThisSubtask
>      >     StreamTestSink.synchronized(
>      >     StreamTestSink.globalResults(idx) += (taskId -> localResults)
>      >     )
>      >     }
>      >
>      >     overridedefsnapshotState(context: FunctionSnapshotContext):
>     Unit = {
>      >     resultsState.clear()
>      >     for(value <- localResults) {
>      >     resultsState.add(value)
>      >     }
>      >     }
>      >
>      >     protecteddefclearAndStashGlobalResults(): Unit = {
>      >     if(globalResults == null) {
>      >     StreamTestSink.synchronized{
>      >     globalResults = StreamTestSink.globalResults.remove(idx).get
>      >     globalRetractResults =
>      >     StreamTestSink.globalRetractResults.remove(idx).get
>      >     globalUpsertResults =
>     StreamTestSink.globalUpsertResults.remove(idx).get
>      >     }
>      >     }
>      >     }
>      >
>      >     protecteddefgetResults: List[String] = {
>      >     clearAndStashGlobalResults()
>      >     valresult = mutable.ArrayBuffer.empty[String]
>      >     this.globalResults.foreach {
>      >     case(_, list) => result ++= list
>      >     }
>      >     result.toList
>      >     }
>      >     }
>      >
>      >     finalclassStringSink[T] extendsAbstractExactlyOnceSink[T]() {
>      >     overridedefinvoke(value: T) {
>      >     localResults += value.toString
>      >     }
>      >
>      >     overridedefgetResults: List[String] = super.getResults
>      >     }
>      >
>      >
>      >
>      >     On Mon, Nov 2, 2020 at 5:23 AM Aljoscha Krettek
>     <aljoscha@apache.org <ma...@apache.org>
>      >     <mailto:aljoscha@apache.org <ma...@apache.org>>> wrote:
>      >
>      >         @Timo: Is this sth that would work when using the new type
>      >         stack? From
>      >         the message I'm assuming it's using the older type stack.
>      >
>      >         @Rex: Which Flink version are you using and could you
>     maybe post
>      >         the
>      >         code snipped that you use to do conversions?
>      >
>      >         Best,
>      >         Aljoscha
>      >
>      >         On 02.11.20 06:50, Rex Fenley wrote:
>      >          > Maybe this is related to this issue?
>      >          > https://issues.apache.org/jira/browse/FLINK-17683
>      >          >
>      >          > On Fri, Oct 30, 2020 at 8:43 PM Rex Fenley
>     <Rex@remind101.com <ma...@remind101.com>
>      >         <mailto:Rex@remind101.com <ma...@remind101.com>>> wrote:
>      >          >
>      >          >> Correction, I'm using Scala case classes not strictly
>     Java
>      >         POJOs just to
>      >          >> be clear.
>      >          >>
>      >          >> On Fri, Oct 30, 2020 at 7:56 PM Rex Fenley
>      >         <Rex@remind101.com <ma...@remind101.com>
>     <mailto:Rex@remind101.com <ma...@remind101.com>>> wrote:
>      >          >>
>      >          >>> Hello,
>      >          >>>
>      >          >>> I keep running into trouble moving between
>     DataStream and
>      >         SQL with POJOs
>      >          >>> because my nested POJOs turn into
>     LEGACY('STRUCTURED_TYPE',
>      >         is there any
>      >          >>> way to convert them back to POJOs in Flink when
>     converting
>      >         a SQL Table back
>      >          >>> to a DataStream?
>      >          >>>
>      >          >>> Thanks!
>      >          >>>
>      >          >>> --
>      >          >>>
>      >          >>> Rex Fenley  |  Software Engineer - Mobile and Backend
>      >          >>>
>      >          >>>
>      >          >>> Remind.com <https://www.remind.com/> |  BLOG
>      >         <http://blog.remind.com/>
>      >          >>>   |  FOLLOW US <https://twitter.com/remindhq>  | 
>     LIKE US
>      >          >>> <https://www.facebook.com/remindhq>
>      >          >>>
>      >          >>
>      >          >>
>      >          >> --
>      >          >>
>      >          >> Rex Fenley  |  Software Engineer - Mobile and Backend
>      >          >>
>      >          >>
>      >          >> Remind.com <https://www.remind.com/> |  BLOG
>      >         <http://blog.remind.com/>  |
>      >          >>   FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>      >          >> <https://www.facebook.com/remindhq>
>      >          >>
>      >          >
>      >          >
>      >
>      >
>      >
>      >     --
>      >
>      >     Rex Fenley|Software Engineer - Mobile and Backend
>      >
>      >
>      >     Remind.com <https://www.remind.com/>| BLOG
>      >     <http://blog.remind.com/> | FOLLOW US
>      >     <https://twitter.com/remindhq> | LIKE US
>      >     <https://www.facebook.com/remindhq>
>      >
>      >
>      >
>      > --
>      >
>      > Rex Fenley|Software Engineer - Mobile and Backend
>      >
>      >
>      > Remind.com <https://www.remind.com/>| BLOG
>     <http://blog.remind.com/> |
>      > FOLLOW US <https://twitter.com/remindhq> | LIKE US
>      > <https://www.facebook.com/remindhq>
>      >
> 
> 
> 
> -- 
> 
> Rex Fenley|Software Engineer - Mobile and Backend
> 
> 
> Remind.com <https://www.remind.com/>| BLOG <http://blog.remind.com/> | 
> FOLLOW US <https://twitter.com/remindhq> | LIKE US 
> <https://www.facebook.com/remindhq>
> 


Re: LEGACY('STRUCTURED_TYPE' to pojo

Posted by Rex Fenley <Re...@remind101.com>.
Thank you for the info!

Is there a timetable for when the next version with this change might
release?

On Wed, Nov 4, 2020 at 2:44 AM Timo Walther <tw...@apache.org> wrote:

> Hi Rex,
>
> sorry for the late reply. POJOs will have much better support in the
> upcoming Flink versions because they have been fully integrated with the
> new table type system mentioned in FLIP-37 [1] (e.g. support for
> immutable POJOs and nested DataTypeHints etc).
>
> For queries, scalar, and table functions you can already use the full
> POJOs within the table ecosystem.
>
> However, the only missing piece is the new translation of POJOs from
> Table API to DataStream API. This will be fixed in FLIP-136 [2]. Until
> then I would recommend to either use `Row` as the output of the table
> API or try to use a scalar function before that maps to the desired data
> structure.
>
> I hope this helps a bit.
>
> Regards,
> Timo
>
> [1]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-37%3A+Rework+of+the+Table+API+Type+System
> [2]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-136%3A++Improve+interoperability+between+DataStream+and+Table+API
>
> On 02.11.20 21:44, Rex Fenley wrote:
> > My jobs normally use the blink planner, I noticed with this test that
> > may not be the case.
> >
> > On Mon, Nov 2, 2020 at 12:38 PM Rex Fenley <Rex@remind101.com
> > <ma...@remind101.com>> wrote:
> >
> >     Flink 1.11.2 with Scala 2.12
> >
> >     Error:
> >     [info] JobScalaTest:
> >     [info] - dummy *** FAILED ***
> >     [info]   org.apache.flink.table.api.ValidationException: Field types
> >     of query result and registered TableSink  do not match.
> >     [info] Query schema: [user: BIGINT, product: ROW<`name`
> >     VARCHAR(2147483647), `id` BIGINT>, amount: INT]
> >     [info] Sink schema: [user: BIGINT, product:
> >     LEGACY('STRUCTURED_TYPE', 'ANY<ProductItem,
> >
>  rO0ABXNyAB1Kb2JTY2FsYVRlc3QkJGFub24kOSQkYW5vbiQxMODHL_EXRquJAgAAeHIANm9yZy5hcGFjaGUuZmxpbmsuYXBpLnNjYWxhLnR5cGV1dGlscy5DYXNlQ2xhc3NUeXBlSW5mb46d1-iqONqQAgAMTAARUEFUVEVSTl9JTlRfRklFTER0ABlMamF2YS91dGlsL3JlZ2V4L1BhdHRlcm47TAAVUEFUVEVSTl9ORVNURURfRklFTERTcQB-AAJMAB5QQVRURVJOX05FU1RFRF9GSUVMRFNfV0lMRENBUkRxAH4AAkwAC1JFR0VYX0ZJRUxEdAASTGphdmEvbGFuZy9TdHJpbmc7TAAPUkVHRVhfSU5UX0ZJRUxEcQB-AANMABNSRUdFWF9ORVNURURfRklFTERTcQB-AANMABxSRUdFWF9ORVNURURfRklFTERTX1dJTERDQVJEcQB-AANMAA9SRUdFWF9TVFJfRklFTERxAH4AA0wABWNsYXp6dAARTGphdmEvbGFuZy9DbGFzcztMAApmaWVsZE5hbWVzdAAWTHNjYWxhL2NvbGxlY3Rpb24vU2VxO0wACmZpZWxkVHlwZXNxAH4ABVsAEnR5cGVQYXJhbVR5cGVJbmZvc3QAN1tMb3JnL2FwYWNoZS9mbGluay9hcGkvY29tbW9uL3R5cGVpbmZvL1R5cGVJbmZvcm1hdGlvbjt4cgA1b3JnLmFwYWNoZS5mbGluay5hcGkuamF2YS50eXBldXRpbHMuVHVwbGVUeXBlSW5mb0Jhc2UAAAAAAAAAAQIAAkkAC3RvdGFsRmllbGRzWwAFdHlwZXNxAH4ABnhyADNvcmcuYXBhY2hlLmZsaW5rLmFwaS5jb21tb24udHlwZXV0aWxzLkNvbXBvc2l0ZVR5cGUAAAAAAAAAAQIAAUwACXR5cGVDbGFzc3EAfgAEeHIANG9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBlaW5mby5UeXBlSW5mb3JtYXRpb26UjchIurN66wIAAHhwdnIAC1Byb2R1Y3RJdGVtBwFtETzcflcCAAJKAAJpZEwABG5hbWVxAH4AA3hwAAAAAnVyADdbTG9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBlaW5mby5UeXBlSW5mb3JtYXRpb247uKBspBqaFLYCAAB4cAAAAAJzcgAyb3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGVpbmZvLkJhc2ljVHlwZUluZm_6BPCCpWndBgIABEwABWNsYXp6cQB-AARMAA9jb21wYXJhdG9yQ2xhc3NxAH4ABFsAF3Bvc3NpYmxlQ2FzdFRhcmdldFR5cGVzdAASW0xqYXZhL2xhbmcvQ2xhc3M7TAAKc2VyaWFsaXplcnQANkxvcmcvYXBhY2hlL2ZsaW5rL2FwaS9jb21tb24vdHlwZXV0aWxzL1R5cGVTZXJpYWxpemVyO3hxAH4ACXZyABBqYXZhLmxhbmcuU3RyaW5noPCkOHo7s0ICAAB4cHZyADtvcmcuYXBhY2hlLmZsaW5rLmFwaS5jb21tb24udHlwZXV0aWxzLmJhc2UuU3RyaW5nQ29tcGFyYXRvcgAAAAAAAAABAgAAeHIAPm9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBldXRpbHMuYmFzZS5CYXNpY1R5cGVDb21wYXJhdG9yAAAAAAAAAAECAAJaABNhc2NlbmRpbmdDb21wYXJpc29uWwALY29tcGFyYXRvcnN0ADdbTG9yZy9hcGFjaGUvZmxpbmsvYXBpL2NvbW1vbi90eXBldXRpbHMvVHlwZUNvbXBhcmF0b3I7eHIANG9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBldXRpbHMuVHlwZUNvbXBhcmF0b3IAAAAAAAAAAQIAAHhwdXIAEltMamF2YS5sYW5nLkNsYXNzO6sW167LzVqZAgAAeHAAAAAAc3IAO29yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBldXRpbHMuYmFzZS5TdHJpbmdTZXJpYWxpemVyAAAAAAAAAAECAAB4cgBCb3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGV1dGlscy5iYXNlLlR5cGVTZXJpYWxpemVyU2luZ2xldG9ueamHqscud0UCAAB4cgA0b3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGV1dGlscy5UeXBlU2VyaWFsaXplcgAAAAAAAAABAgAAeHBzcgA0b3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGVpbmZvLkludGVnZXJUeXBlSW5mb5AFxEVpQEqVAgAAeHIANG9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBlaW5mby5OdW1lcmljVHlwZUluZm-tmMZzMAKMFgIAAHhxAH4AD3ZyAA5qYXZhLmxhbmcuTG9uZzuL5JDMjyPfAgABSgAFdmFsdWV4cgAQamF2YS5sYW5nLk51bWJlcoaslR0LlOCLAgAAeHB2cgA5b3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGV1dGlscy5iYXNlLkxvbmdDb21wYXJhdG9yAAAAAAAAAAECAAB4cQB-ABZ1cQB-ABoAAAADdnIAD2phdmEubGFuZy5GbG9hdNrtyaLbPPDsAgABRgAFdmFsdWV4cQB-ACR2cgAQamF2YS5sYW5nLkRvdWJsZYCzwkopa_sEAgABRAAFdmFsdWV4cQB-ACR2cgATamF2YS5sYW5nLkNoYXJhY3RlcjSLR9lrGiZ4AgABQwAFdmFsdWV4cHNyADlvcmcuYXBhY2hlLmZsaW5rLmFwaS5jb21tb24udHlwZXV0aWxzLmJhc2UuTG9uZ1NlcmlhbGl6ZXIAAAAAAAAAAQIAAHhxAH4AHXNyABdqYXZhLnV0aWwucmVnZXguUGF0dGVybkZn1WtuSQINAgACSQAFZmxhZ3NMAAdwYXR0ZXJucQB-AAN4cAAAAAB0AAZbMC05XStzcQB-ADEAAAAAdAAwKFtccHtMfV9cJF1bXHB7TH1ccHtEaWdpdH1fXCRdKnxbMC05XSspKFwuKC4rKSk_c3EAfgAxAAAAAHQANihbXHB7TH1fXCRdW1xwe0x9XHB7RGlnaXR9X1wkXSp8WzAtOV0rKShcLiguKykpP3xcKnxcX3QAJVtccHtMfV9cJF1bXHB7TH1ccHtEaWdpdH1fXCRdKnxbMC05XStxAH4AM3EAfgA1cQB-ADd0AB5bXHB7TH1fXCRdW1xwe0x9XHB7RGlnaXR9X1wkXSpxAH4ADHNyADJzY2FsYS5jb2xsZWN0aW9uLmltbXV0YWJsZS5MaXN0JFNlcmlhbGl6YXRpb25Qcm94eQAAAAAAAAABAwAAeHB0AARuYW1ldAACaWRzcgAsc2NhbGEuY29sbGVjdGlvbi5pbW11dGFibGUuTGlzdFNlcmlhbGl6ZUVuZCSKXGNb91MLbQIAAHhweHNxAH4AOnEAfgAScQB-ACJxAH4AP3h1cQB-AA0AAAAA>'),
> >     amount: INT]
> >     [info]   at
> >
>  org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSchemaAndApplyImplicitCast(TableSinkUtils.scala:103)
> >     [info]   at
> >
>  org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:260)
> >     [info]   at
> >
>  org.apache.flink.table.planner.delegation.PlannerBase.$anonfun$translate$1(PlannerBase.scala:163)
> >     [info]   at
> >
>  scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:285)
> >     [info]   at scala.collection.Iterator.foreach(Iterator.scala:943)
> >     [info]   at scala.collection.Iterator.foreach$(Iterator.scala:943)
> >     [info]   at
> >     scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
> >     [info]   at
> scala.collection.IterableLike.foreach(IterableLike.scala:74)
> >     [info]   at
> >     scala.collection.IterableLike.foreach$(IterableLike.scala:73)
> >     [info]   at
> scala.collection.AbstractIterable.foreach(Iterable.scala:56)
> >
> >     Code:
> >     import com.remind.graph.people.PeopleJobScala
> >
> >     import org.scalatest.funsuite._
> >     import org.scalatest.BeforeAndAfter
> >
> >     import org.apache.flink.streaming.api.scala.{
> >     DataStream,
> >     StreamExecutionEnvironment
> >     }
> >     import org.apache.flink.streaming.util.TestStreamEnvironment
> >     import org.apache.flink.table.runtime.util._
> >     import org.apache.flink.test.util.AbstractTestBase
> >     import org.apache.flink.table.api._
> >     import org.apache.flink.table.api.bridge.scala._
> >     import org.apache.flink.streaming.api.scala._
> >     import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
> >     import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
> >     import org.apache.flink.api.common.state.ListState
> >     import org.apache.flink.runtime.state.FunctionInitializationContext
> >     import org.apache.flink.api.common.state.ListStateDescriptor
> >     import org.apache.flink.runtime.state.FunctionSnapshotContext
> >     import org.apache.flink.types.Row
> >
> >     import java.io.Serializable;
> >     import java.sql.Timestamp;
> >     import java.text.SimpleDateFormat
> >     import java.util.concurrent.atomic.AtomicInteger
> >     import java.{util => ju}
> >
> >     import scala.collection.JavaConverters._
> >     import scala.collection.mutable
> >     import scala.util.Try
> >
> >     caseclassOrder(user: Long, product: ProductItem, amount: Int) {
> >     defthis() {
> >     this(0, null, 0)
> >     }
> >
> >     overridedeftoString(): String = {
> >     return"Order{"+
> >     "user="+ user +
> >     ", product='"+ product + '\''+
> >     ", amount="+ amount +
> >     '}';
> >     }
> >     }
> >
> >     caseclassProductItem(name: String, id: Long) {
> >     defthis() {
> >     this(null, 0)
> >     }
> >
> >     overridedeftoString(): String = {
> >     return"Product{"+
> >     "name='"+ name + '\''+
> >     ", id="+ id +
> >     '}';
> >     }
> >     }
> >
> >     classJobScalaTest extendsAnyFunSuitewithBeforeAndAfter{
> >     varenv: StreamExecutionEnvironment = _
> >     vartEnv: StreamTableEnvironment = _
> >
> >     before {
> >     this.env = StreamExecutionEnvironment.getExecutionEnvironment
> >     this.env.setParallelism(2)
> >     this.env.getConfig.enableObjectReuse()
> >     valsetting =
> EnvironmentSettings.newInstance().inStreamingMode().build()
> >     this.tEnv = StreamTableEnvironment.create(env, setting)
> >     }
> >
> >     after {
> >     StreamTestSink.clear()
> >     // TestValuesTableFactory.clearAllData()
> >     }
> >
> >     defdateFrom(stringDate: String): java.sql.Date = {
> >     valdate = newSimpleDateFormat("dd/MM/yyyy")
> >     .parse(stringDate)
> >     returnnewjava.sql.Date(date.getTime())
> >     }
> >
> >     defprintTable(table: Table) = {
> >     println(table)
> >     table.printSchema()
> >     println(table.getSchema().getFieldNames().mkString(", "))
> >     }
> >
> >     defprintDataStream(dataStream: DataStream[_]) = {
> >     println(dataStream)
> >     println(dataStream.dataType)
> >     }
> >
> >     test("dummy") {
> >     valorderA: DataStream[Order] = this.env.fromCollection(
> >     Seq(
> >     newOrder(1L, newProductItem("beer", 10L), 3),
> >     newOrder(1L, newProductItem("diaper", 11L), 4),
> >     newOrder(3L, newProductItem("rubber", 12L), 2)
> >     )
> >     )
> >
> >     valorderB: DataStream[Order] = this.env.fromCollection(
> >     Seq(
> >     newOrder(2L, newProductItem("pen", 13L), 3),
> >     newOrder(2L, newProductItem("rubber", 12L), 3),
> >     newOrder(4L, newProductItem("beer", 10L), 1)
> >     )
> >     )
> >
> >     println(orderB)
> >     println(orderB.dataType)
> >
> >     // convert DataStream to Table
> >     valtableA =
> >     this.tEnv.fromDataStream(orderA, 'user, 'product, 'amount)
> >     println(tableA)
> >     tableA.printSchema()
> >     println(tableA.getSchema().getFieldNames().mkString(", "))
> >     // register DataStream as Table
> >     this.tEnv.createTemporaryView("OrderB", orderB, 'user, 'product,
> >     'amount)
> >
> >     // union the two tables
> >     valresult = this.tEnv.sqlQuery(s"""
> >     |SELECT * FROM $tableAWHERE amount > 2
> >     |UNION ALL
> >     |SELECT * FROM OrderB WHERE amount < 2
> >     """.stripMargin)
> >
> >     valsink = newStringSink[Order]()
> >     result.toAppendStream[Order].addSink(sink)
> >
> >     this.env.execute()
> >
> >     valexpected = List(
> >     "Order{user=1, product='Product{name='beer', id=10}', amount=3}",
> >     "Order{user=1, product='Product{name='diaper', id=11}', amount=4}",
> >     "Order{user=4, product='Product{name='beer', id=10}', amount=1}"
> >     )
> >     valresults = sink.getResults.sorted
> >     println("results")
> >     println(results)
> >     assert(expected.sorted === results)
> >     }
> >     }
> >
> >     /**
> >     * Taken from:
> >
> https://github.com/apache/flink/blob/release-1.11.2/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamTestSink.scala
> >     * There's a whole bunch of other test sinks to choose from there.
> >     */
> >     objectStreamTestSink {
> >
> >     validCounter: AtomicInteger = newAtomicInteger(0)
> >
> >     valglobalResults =
> >     mutable.HashMap.empty[Int, mutable.Map[Int,
> >     mutable.ArrayBuffer[String]]]
> >     valglobalRetractResults =
> >     mutable.HashMap.empty[Int, mutable.Map[Int,
> >     mutable.ArrayBuffer[String]]]
> >     valglobalUpsertResults =
> >     mutable.HashMap.empty[Int, mutable.Map[Int, mutable.Map[String,
> >     String]]]
> >
> >     defgetNewSinkId: Int = {
> >     validx = idCounter.getAndIncrement()
> >     this.synchronized{
> >     globalResults.put(
> >     idx,
> >     mutable.HashMap.empty[Int, mutable.ArrayBuffer[String]]
> >     )
> >     globalRetractResults.put(
> >     idx,
> >     mutable.HashMap.empty[Int, mutable.ArrayBuffer[String]]
> >     )
> >     globalUpsertResults.put(
> >     idx,
> >     mutable.HashMap.empty[Int, mutable.Map[String, String]]
> >     )
> >     }
> >     idx
> >     }
> >
> >     defclear(): Unit = {
> >     globalResults.clear()
> >     globalRetractResults.clear()
> >     globalUpsertResults.clear()
> >     }
> >     }
> >
> >     abstractclassAbstractExactlyOnceSink[T]
> >     extendsRichSinkFunction[T]
> >     withCheckpointedFunction{
> >     protectedvarresultsState: ListState[String] = _
> >     protectedvarlocalResults: mutable.ArrayBuffer[String] = _
> >     protectedvalidx: Int = StreamTestSink.getNewSinkId
> >
> >     protectedvarglobalResults: mutable.Map[Int,
> >     mutable.ArrayBuffer[String]] = _
> >     protectedvarglobalRetractResults
> >     : mutable.Map[Int, mutable.ArrayBuffer[String]] = _
> >     protectedvarglobalUpsertResults
> >     : mutable.Map[Int, mutable.Map[String, String]] = _
> >
> >     defisInitialized: Boolean = globalResults != null
> >
> >     overridedefinitializeState(context: FunctionInitializationContext):
> >     Unit = {
> >     resultsState = context.getOperatorStateStore
> >     .getListState(
> >     newListStateDescriptor[String]("sink-results", Types.STRING)
> >     )
> >
> >     localResults = mutable.ArrayBuffer.empty[String]
> >
> >     if(context.isRestored) {
> >     for(value <- resultsState.get().asScala) {
> >     localResults += value
> >     }
> >     }
> >
> >     valtaskId = getRuntimeContext.getIndexOfThisSubtask
> >     StreamTestSink.synchronized(
> >     StreamTestSink.globalResults(idx) += (taskId -> localResults)
> >     )
> >     }
> >
> >     overridedefsnapshotState(context: FunctionSnapshotContext): Unit = {
> >     resultsState.clear()
> >     for(value <- localResults) {
> >     resultsState.add(value)
> >     }
> >     }
> >
> >     protecteddefclearAndStashGlobalResults(): Unit = {
> >     if(globalResults == null) {
> >     StreamTestSink.synchronized{
> >     globalResults = StreamTestSink.globalResults.remove(idx).get
> >     globalRetractResults =
> >     StreamTestSink.globalRetractResults.remove(idx).get
> >     globalUpsertResults =
> StreamTestSink.globalUpsertResults.remove(idx).get
> >     }
> >     }
> >     }
> >
> >     protecteddefgetResults: List[String] = {
> >     clearAndStashGlobalResults()
> >     valresult = mutable.ArrayBuffer.empty[String]
> >     this.globalResults.foreach {
> >     case(_, list) => result ++= list
> >     }
> >     result.toList
> >     }
> >     }
> >
> >     finalclassStringSink[T] extendsAbstractExactlyOnceSink[T]() {
> >     overridedefinvoke(value: T) {
> >     localResults += value.toString
> >     }
> >
> >     overridedefgetResults: List[String] = super.getResults
> >     }
> >
> >
> >
> >     On Mon, Nov 2, 2020 at 5:23 AM Aljoscha Krettek <aljoscha@apache.org
> >     <ma...@apache.org>> wrote:
> >
> >         @Timo: Is this sth that would work when using the new type
> >         stack? From
> >         the message I'm assuming it's using the older type stack.
> >
> >         @Rex: Which Flink version are you using and could you maybe post
> >         the
> >         code snipped that you use to do conversions?
> >
> >         Best,
> >         Aljoscha
> >
> >         On 02.11.20 06:50, Rex Fenley wrote:
> >          > Maybe this is related to this issue?
> >          > https://issues.apache.org/jira/browse/FLINK-17683
> >          >
> >          > On Fri, Oct 30, 2020 at 8:43 PM Rex Fenley <Rex@remind101.com
> >         <ma...@remind101.com>> wrote:
> >          >
> >          >> Correction, I'm using Scala case classes not strictly Java
> >         POJOs just to
> >          >> be clear.
> >          >>
> >          >> On Fri, Oct 30, 2020 at 7:56 PM Rex Fenley
> >         <Rex@remind101.com <ma...@remind101.com>> wrote:
> >          >>
> >          >>> Hello,
> >          >>>
> >          >>> I keep running into trouble moving between DataStream and
> >         SQL with POJOs
> >          >>> because my nested POJOs turn into LEGACY('STRUCTURED_TYPE',
> >         is there any
> >          >>> way to convert them back to POJOs in Flink when converting
> >         a SQL Table back
> >          >>> to a DataStream?
> >          >>>
> >          >>> Thanks!
> >          >>>
> >          >>> --
> >          >>>
> >          >>> Rex Fenley  |  Software Engineer - Mobile and Backend
> >          >>>
> >          >>>
> >          >>> Remind.com <https://www.remind.com/> |  BLOG
> >         <http://blog.remind.com/>
> >          >>>   |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
> >          >>> <https://www.facebook.com/remindhq>
> >          >>>
> >          >>
> >          >>
> >          >> --
> >          >>
> >          >> Rex Fenley  |  Software Engineer - Mobile and Backend
> >          >>
> >          >>
> >          >> Remind.com <https://www.remind.com/> |  BLOG
> >         <http://blog.remind.com/>  |
> >          >>   FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
> >          >> <https://www.facebook.com/remindhq>
> >          >>
> >          >
> >          >
> >
> >
> >
> >     --
> >
> >     Rex Fenley|Software Engineer - Mobile and Backend
> >
> >
> >     Remind.com <https://www.remind.com/>| BLOG
> >     <http://blog.remind.com/> | FOLLOW US
> >     <https://twitter.com/remindhq> | LIKE US
> >     <https://www.facebook.com/remindhq>
> >
> >
> >
> > --
> >
> > Rex Fenley|Software Engineer - Mobile and Backend
> >
> >
> > Remind.com <https://www.remind.com/>| BLOG <http://blog.remind.com/> |
> > FOLLOW US <https://twitter.com/remindhq> | LIKE US
> > <https://www.facebook.com/remindhq>
> >
>
>

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>

Re: LEGACY('STRUCTURED_TYPE' to pojo

Posted by Timo Walther <tw...@apache.org>.
Hi Rex,

sorry for the late reply. POJOs will have much better support in the 
upcoming Flink versions because they have been fully integrated with the 
new table type system mentioned in FLIP-37 [1] (e.g. support for 
immutable POJOs and nested DataTypeHints etc).

For queries, scalar, and table functions you can already use the full 
POJOs within the table ecosystem.

However, the only missing piece is the new translation of POJOs from 
Table API to DataStream API. This will be fixed in FLIP-136 [2]. Until 
then I would recommend to either use `Row` as the output of the table 
API or try to use a scalar function before that maps to the desired data 
structure.

I hope this helps a bit.

Regards,
Timo

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-37%3A+Rework+of+the+Table+API+Type+System
[2] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-136%3A++Improve+interoperability+between+DataStream+and+Table+API

On 02.11.20 21:44, Rex Fenley wrote:
> My jobs normally use the blink planner, I noticed with this test that 
> may not be the case.
> 
> On Mon, Nov 2, 2020 at 12:38 PM Rex Fenley <Rex@remind101.com 
> <ma...@remind101.com>> wrote:
> 
>     Flink 1.11.2 with Scala 2.12
> 
>     Error:
>     [info] JobScalaTest:
>     [info] - dummy *** FAILED ***
>     [info]   org.apache.flink.table.api.ValidationException: Field types
>     of query result and registered TableSink  do not match.
>     [info] Query schema: [user: BIGINT, product: ROW<`name`
>     VARCHAR(2147483647), `id` BIGINT>, amount: INT]
>     [info] Sink schema: [user: BIGINT, product:
>     LEGACY('STRUCTURED_TYPE', 'ANY<ProductItem,
>     rO0ABXNyAB1Kb2JTY2FsYVRlc3QkJGFub24kOSQkYW5vbiQxMODHL_EXRquJAgAAeHIANm9yZy5hcGFjaGUuZmxpbmsuYXBpLnNjYWxhLnR5cGV1dGlscy5DYXNlQ2xhc3NUeXBlSW5mb46d1-iqONqQAgAMTAARUEFUVEVSTl9JTlRfRklFTER0ABlMamF2YS91dGlsL3JlZ2V4L1BhdHRlcm47TAAVUEFUVEVSTl9ORVNURURfRklFTERTcQB-AAJMAB5QQVRURVJOX05FU1RFRF9GSUVMRFNfV0lMRENBUkRxAH4AAkwAC1JFR0VYX0ZJRUxEdAASTGphdmEvbGFuZy9TdHJpbmc7TAAPUkVHRVhfSU5UX0ZJRUxEcQB-AANMABNSRUdFWF9ORVNURURfRklFTERTcQB-AANMABxSRUdFWF9ORVNURURfRklFTERTX1dJTERDQVJEcQB-AANMAA9SRUdFWF9TVFJfRklFTERxAH4AA0wABWNsYXp6dAARTGphdmEvbGFuZy9DbGFzcztMAApmaWVsZE5hbWVzdAAWTHNjYWxhL2NvbGxlY3Rpb24vU2VxO0wACmZpZWxkVHlwZXNxAH4ABVsAEnR5cGVQYXJhbVR5cGVJbmZvc3QAN1tMb3JnL2FwYWNoZS9mbGluay9hcGkvY29tbW9uL3R5cGVpbmZvL1R5cGVJbmZvcm1hdGlvbjt4cgA1b3JnLmFwYWNoZS5mbGluay5hcGkuamF2YS50eXBldXRpbHMuVHVwbGVUeXBlSW5mb0Jhc2UAAAAAAAAAAQIAAkkAC3RvdGFsRmllbGRzWwAFdHlwZXNxAH4ABnhyADNvcmcuYXBhY2hlLmZsaW5rLmFwaS5jb21tb24udHlwZXV0aWxzLkNvbXBvc2l0ZVR5cGUAAAAAAAAAAQIAAUwACXR5cGVDbGFzc3EAfgAEeHIANG9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBlaW5mby5UeXBlSW5mb3JtYXRpb26UjchIurN66wIAAHhwdnIAC1Byb2R1Y3RJdGVtBwFtETzcflcCAAJKAAJpZEwABG5hbWVxAH4AA3hwAAAAAnVyADdbTG9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBlaW5mby5UeXBlSW5mb3JtYXRpb247uKBspBqaFLYCAAB4cAAAAAJzcgAyb3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGVpbmZvLkJhc2ljVHlwZUluZm_6BPCCpWndBgIABEwABWNsYXp6cQB-AARMAA9jb21wYXJhdG9yQ2xhc3NxAH4ABFsAF3Bvc3NpYmxlQ2FzdFRhcmdldFR5cGVzdAASW0xqYXZhL2xhbmcvQ2xhc3M7TAAKc2VyaWFsaXplcnQANkxvcmcvYXBhY2hlL2ZsaW5rL2FwaS9jb21tb24vdHlwZXV0aWxzL1R5cGVTZXJpYWxpemVyO3hxAH4ACXZyABBqYXZhLmxhbmcuU3RyaW5noPCkOHo7s0ICAAB4cHZyADtvcmcuYXBhY2hlLmZsaW5rLmFwaS5jb21tb24udHlwZXV0aWxzLmJhc2UuU3RyaW5nQ29tcGFyYXRvcgAAAAAAAAABAgAAeHIAPm9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBldXRpbHMuYmFzZS5CYXNpY1R5cGVDb21wYXJhdG9yAAAAAAAAAAECAAJaABNhc2NlbmRpbmdDb21wYXJpc29uWwALY29tcGFyYXRvcnN0ADdbTG9yZy9hcGFjaGUvZmxpbmsvYXBpL2NvbW1vbi90eXBldXRpbHMvVHlwZUNvbXBhcmF0b3I7eHIANG9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBldXRpbHMuVHlwZUNvbXBhcmF0b3IAAAAAAAAAAQIAAHhwdXIAEltMamF2YS5sYW5nLkNsYXNzO6sW167LzVqZAgAAeHAAAAAAc3IAO29yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBldXRpbHMuYmFzZS5TdHJpbmdTZXJpYWxpemVyAAAAAAAAAAECAAB4cgBCb3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGV1dGlscy5iYXNlLlR5cGVTZXJpYWxpemVyU2luZ2xldG9ueamHqscud0UCAAB4cgA0b3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGV1dGlscy5UeXBlU2VyaWFsaXplcgAAAAAAAAABAgAAeHBzcgA0b3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGVpbmZvLkludGVnZXJUeXBlSW5mb5AFxEVpQEqVAgAAeHIANG9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBlaW5mby5OdW1lcmljVHlwZUluZm-tmMZzMAKMFgIAAHhxAH4AD3ZyAA5qYXZhLmxhbmcuTG9uZzuL5JDMjyPfAgABSgAFdmFsdWV4cgAQamF2YS5sYW5nLk51bWJlcoaslR0LlOCLAgAAeHB2cgA5b3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGV1dGlscy5iYXNlLkxvbmdDb21wYXJhdG9yAAAAAAAAAAECAAB4cQB-ABZ1cQB-ABoAAAADdnIAD2phdmEubGFuZy5GbG9hdNrtyaLbPPDsAgABRgAFdmFsdWV4cQB-ACR2cgAQamF2YS5sYW5nLkRvdWJsZYCzwkopa_sEAgABRAAFdmFsdWV4cQB-ACR2cgATamF2YS5sYW5nLkNoYXJhY3RlcjSLR9lrGiZ4AgABQwAFdmFsdWV4cHNyADlvcmcuYXBhY2hlLmZsaW5rLmFwaS5jb21tb24udHlwZXV0aWxzLmJhc2UuTG9uZ1NlcmlhbGl6ZXIAAAAAAAAAAQIAAHhxAH4AHXNyABdqYXZhLnV0aWwucmVnZXguUGF0dGVybkZn1WtuSQINAgACSQAFZmxhZ3NMAAdwYXR0ZXJucQB-AAN4cAAAAAB0AAZbMC05XStzcQB-ADEAAAAAdAAwKFtccHtMfV9cJF1bXHB7TH1ccHtEaWdpdH1fXCRdKnxbMC05XSspKFwuKC4rKSk_c3EAfgAxAAAAAHQANihbXHB7TH1fXCRdW1xwe0x9XHB7RGlnaXR9X1wkXSp8WzAtOV0rKShcLiguKykpP3xcKnxcX3QAJVtccHtMfV9cJF1bXHB7TH1ccHtEaWdpdH1fXCRdKnxbMC05XStxAH4AM3EAfgA1cQB-ADd0AB5bXHB7TH1fXCRdW1xwe0x9XHB7RGlnaXR9X1wkXSpxAH4ADHNyADJzY2FsYS5jb2xsZWN0aW9uLmltbXV0YWJsZS5MaXN0JFNlcmlhbGl6YXRpb25Qcm94eQAAAAAAAAABAwAAeHB0AARuYW1ldAACaWRzcgAsc2NhbGEuY29sbGVjdGlvbi5pbW11dGFibGUuTGlzdFNlcmlhbGl6ZUVuZCSKXGNb91MLbQIAAHhweHNxAH4AOnEAfgAScQB-ACJxAH4AP3h1cQB-AA0AAAAA>'),
>     amount: INT]
>     [info]   at
>     org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSchemaAndApplyImplicitCast(TableSinkUtils.scala:103)
>     [info]   at
>     org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:260)
>     [info]   at
>     org.apache.flink.table.planner.delegation.PlannerBase.$anonfun$translate$1(PlannerBase.scala:163)
>     [info]   at
>     scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:285)
>     [info]   at scala.collection.Iterator.foreach(Iterator.scala:943)
>     [info]   at scala.collection.Iterator.foreach$(Iterator.scala:943)
>     [info]   at
>     scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
>     [info]   at scala.collection.IterableLike.foreach(IterableLike.scala:74)
>     [info]   at
>     scala.collection.IterableLike.foreach$(IterableLike.scala:73)
>     [info]   at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
> 
>     Code:
>     import com.remind.graph.people.PeopleJobScala
> 
>     import org.scalatest.funsuite._
>     import org.scalatest.BeforeAndAfter
> 
>     import org.apache.flink.streaming.api.scala.{
>     DataStream,
>     StreamExecutionEnvironment
>     }
>     import org.apache.flink.streaming.util.TestStreamEnvironment
>     import org.apache.flink.table.runtime.util._
>     import org.apache.flink.test.util.AbstractTestBase
>     import org.apache.flink.table.api._
>     import org.apache.flink.table.api.bridge.scala._
>     import org.apache.flink.streaming.api.scala._
>     import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
>     import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
>     import org.apache.flink.api.common.state.ListState
>     import org.apache.flink.runtime.state.FunctionInitializationContext
>     import org.apache.flink.api.common.state.ListStateDescriptor
>     import org.apache.flink.runtime.state.FunctionSnapshotContext
>     import org.apache.flink.types.Row
> 
>     import java.io.Serializable;
>     import java.sql.Timestamp;
>     import java.text.SimpleDateFormat
>     import java.util.concurrent.atomic.AtomicInteger
>     import java.{util => ju}
> 
>     import scala.collection.JavaConverters._
>     import scala.collection.mutable
>     import scala.util.Try
> 
>     caseclassOrder(user: Long, product: ProductItem, amount: Int) {
>     defthis() {
>     this(0, null, 0)
>     }
> 
>     overridedeftoString(): String = {
>     return"Order{"+
>     "user="+ user +
>     ", product='"+ product + '\''+
>     ", amount="+ amount +
>     '}';
>     }
>     }
> 
>     caseclassProductItem(name: String, id: Long) {
>     defthis() {
>     this(null, 0)
>     }
> 
>     overridedeftoString(): String = {
>     return"Product{"+
>     "name='"+ name + '\''+
>     ", id="+ id +
>     '}';
>     }
>     }
> 
>     classJobScalaTest extendsAnyFunSuitewithBeforeAndAfter{
>     varenv: StreamExecutionEnvironment = _
>     vartEnv: StreamTableEnvironment = _
> 
>     before {
>     this.env = StreamExecutionEnvironment.getExecutionEnvironment
>     this.env.setParallelism(2)
>     this.env.getConfig.enableObjectReuse()
>     valsetting = EnvironmentSettings.newInstance().inStreamingMode().build()
>     this.tEnv = StreamTableEnvironment.create(env, setting)
>     }
> 
>     after {
>     StreamTestSink.clear()
>     // TestValuesTableFactory.clearAllData()
>     }
> 
>     defdateFrom(stringDate: String): java.sql.Date = {
>     valdate = newSimpleDateFormat("dd/MM/yyyy")
>     .parse(stringDate)
>     returnnewjava.sql.Date(date.getTime())
>     }
> 
>     defprintTable(table: Table) = {
>     println(table)
>     table.printSchema()
>     println(table.getSchema().getFieldNames().mkString(", "))
>     }
> 
>     defprintDataStream(dataStream: DataStream[_]) = {
>     println(dataStream)
>     println(dataStream.dataType)
>     }
> 
>     test("dummy") {
>     valorderA: DataStream[Order] = this.env.fromCollection(
>     Seq(
>     newOrder(1L, newProductItem("beer", 10L), 3),
>     newOrder(1L, newProductItem("diaper", 11L), 4),
>     newOrder(3L, newProductItem("rubber", 12L), 2)
>     )
>     )
> 
>     valorderB: DataStream[Order] = this.env.fromCollection(
>     Seq(
>     newOrder(2L, newProductItem("pen", 13L), 3),
>     newOrder(2L, newProductItem("rubber", 12L), 3),
>     newOrder(4L, newProductItem("beer", 10L), 1)
>     )
>     )
> 
>     println(orderB)
>     println(orderB.dataType)
> 
>     // convert DataStream to Table
>     valtableA =
>     this.tEnv.fromDataStream(orderA, 'user, 'product, 'amount)
>     println(tableA)
>     tableA.printSchema()
>     println(tableA.getSchema().getFieldNames().mkString(", "))
>     // register DataStream as Table
>     this.tEnv.createTemporaryView("OrderB", orderB, 'user, 'product,
>     'amount)
> 
>     // union the two tables
>     valresult = this.tEnv.sqlQuery(s"""
>     |SELECT * FROM $tableAWHERE amount > 2
>     |UNION ALL
>     |SELECT * FROM OrderB WHERE amount < 2
>     """.stripMargin)
> 
>     valsink = newStringSink[Order]()
>     result.toAppendStream[Order].addSink(sink)
> 
>     this.env.execute()
> 
>     valexpected = List(
>     "Order{user=1, product='Product{name='beer', id=10}', amount=3}",
>     "Order{user=1, product='Product{name='diaper', id=11}', amount=4}",
>     "Order{user=4, product='Product{name='beer', id=10}', amount=1}"
>     )
>     valresults = sink.getResults.sorted
>     println("results")
>     println(results)
>     assert(expected.sorted === results)
>     }
>     }
> 
>     /**
>     * Taken from:
>     https://github.com/apache/flink/blob/release-1.11.2/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamTestSink.scala
>     * There's a whole bunch of other test sinks to choose from there.
>     */
>     objectStreamTestSink {
> 
>     validCounter: AtomicInteger = newAtomicInteger(0)
> 
>     valglobalResults =
>     mutable.HashMap.empty[Int, mutable.Map[Int,
>     mutable.ArrayBuffer[String]]]
>     valglobalRetractResults =
>     mutable.HashMap.empty[Int, mutable.Map[Int,
>     mutable.ArrayBuffer[String]]]
>     valglobalUpsertResults =
>     mutable.HashMap.empty[Int, mutable.Map[Int, mutable.Map[String,
>     String]]]
> 
>     defgetNewSinkId: Int = {
>     validx = idCounter.getAndIncrement()
>     this.synchronized{
>     globalResults.put(
>     idx,
>     mutable.HashMap.empty[Int, mutable.ArrayBuffer[String]]
>     )
>     globalRetractResults.put(
>     idx,
>     mutable.HashMap.empty[Int, mutable.ArrayBuffer[String]]
>     )
>     globalUpsertResults.put(
>     idx,
>     mutable.HashMap.empty[Int, mutable.Map[String, String]]
>     )
>     }
>     idx
>     }
> 
>     defclear(): Unit = {
>     globalResults.clear()
>     globalRetractResults.clear()
>     globalUpsertResults.clear()
>     }
>     }
> 
>     abstractclassAbstractExactlyOnceSink[T]
>     extendsRichSinkFunction[T]
>     withCheckpointedFunction{
>     protectedvarresultsState: ListState[String] = _
>     protectedvarlocalResults: mutable.ArrayBuffer[String] = _
>     protectedvalidx: Int = StreamTestSink.getNewSinkId
> 
>     protectedvarglobalResults: mutable.Map[Int,
>     mutable.ArrayBuffer[String]] = _
>     protectedvarglobalRetractResults
>     : mutable.Map[Int, mutable.ArrayBuffer[String]] = _
>     protectedvarglobalUpsertResults
>     : mutable.Map[Int, mutable.Map[String, String]] = _
> 
>     defisInitialized: Boolean = globalResults != null
> 
>     overridedefinitializeState(context: FunctionInitializationContext):
>     Unit = {
>     resultsState = context.getOperatorStateStore
>     .getListState(
>     newListStateDescriptor[String]("sink-results", Types.STRING)
>     )
> 
>     localResults = mutable.ArrayBuffer.empty[String]
> 
>     if(context.isRestored) {
>     for(value <- resultsState.get().asScala) {
>     localResults += value
>     }
>     }
> 
>     valtaskId = getRuntimeContext.getIndexOfThisSubtask
>     StreamTestSink.synchronized(
>     StreamTestSink.globalResults(idx) += (taskId -> localResults)
>     )
>     }
> 
>     overridedefsnapshotState(context: FunctionSnapshotContext): Unit = {
>     resultsState.clear()
>     for(value <- localResults) {
>     resultsState.add(value)
>     }
>     }
> 
>     protecteddefclearAndStashGlobalResults(): Unit = {
>     if(globalResults == null) {
>     StreamTestSink.synchronized{
>     globalResults = StreamTestSink.globalResults.remove(idx).get
>     globalRetractResults =
>     StreamTestSink.globalRetractResults.remove(idx).get
>     globalUpsertResults = StreamTestSink.globalUpsertResults.remove(idx).get
>     }
>     }
>     }
> 
>     protecteddefgetResults: List[String] = {
>     clearAndStashGlobalResults()
>     valresult = mutable.ArrayBuffer.empty[String]
>     this.globalResults.foreach {
>     case(_, list) => result ++= list
>     }
>     result.toList
>     }
>     }
> 
>     finalclassStringSink[T] extendsAbstractExactlyOnceSink[T]() {
>     overridedefinvoke(value: T) {
>     localResults += value.toString
>     }
> 
>     overridedefgetResults: List[String] = super.getResults
>     }
> 
> 
> 
>     On Mon, Nov 2, 2020 at 5:23 AM Aljoscha Krettek <aljoscha@apache.org
>     <ma...@apache.org>> wrote:
> 
>         @Timo: Is this sth that would work when using the new type
>         stack? From
>         the message I'm assuming it's using the older type stack.
> 
>         @Rex: Which Flink version are you using and could you maybe post
>         the
>         code snipped that you use to do conversions?
> 
>         Best,
>         Aljoscha
> 
>         On 02.11.20 06:50, Rex Fenley wrote:
>          > Maybe this is related to this issue?
>          > https://issues.apache.org/jira/browse/FLINK-17683
>          >
>          > On Fri, Oct 30, 2020 at 8:43 PM Rex Fenley <Rex@remind101.com
>         <ma...@remind101.com>> wrote:
>          >
>          >> Correction, I'm using Scala case classes not strictly Java
>         POJOs just to
>          >> be clear.
>          >>
>          >> On Fri, Oct 30, 2020 at 7:56 PM Rex Fenley
>         <Rex@remind101.com <ma...@remind101.com>> wrote:
>          >>
>          >>> Hello,
>          >>>
>          >>> I keep running into trouble moving between DataStream and
>         SQL with POJOs
>          >>> because my nested POJOs turn into LEGACY('STRUCTURED_TYPE',
>         is there any
>          >>> way to convert them back to POJOs in Flink when converting
>         a SQL Table back
>          >>> to a DataStream?
>          >>>
>          >>> Thanks!
>          >>>
>          >>> --
>          >>>
>          >>> Rex Fenley  |  Software Engineer - Mobile and Backend
>          >>>
>          >>>
>          >>> Remind.com <https://www.remind.com/> |  BLOG
>         <http://blog.remind.com/>
>          >>>   |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>          >>> <https://www.facebook.com/remindhq>
>          >>>
>          >>
>          >>
>          >> --
>          >>
>          >> Rex Fenley  |  Software Engineer - Mobile and Backend
>          >>
>          >>
>          >> Remind.com <https://www.remind.com/> |  BLOG
>         <http://blog.remind.com/>  |
>          >>   FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>          >> <https://www.facebook.com/remindhq>
>          >>
>          >
>          >
> 
> 
> 
>     -- 
> 
>     Rex Fenley|Software Engineer - Mobile and Backend
> 
> 
>     Remind.com <https://www.remind.com/>| BLOG
>     <http://blog.remind.com/> | FOLLOW US
>     <https://twitter.com/remindhq> | LIKE US
>     <https://www.facebook.com/remindhq>
> 
> 
> 
> -- 
> 
> Rex Fenley|Software Engineer - Mobile and Backend
> 
> 
> Remind.com <https://www.remind.com/>| BLOG <http://blog.remind.com/> | 
> FOLLOW US <https://twitter.com/remindhq> | LIKE US 
> <https://www.facebook.com/remindhq>
> 


Re: LEGACY('STRUCTURED_TYPE' to pojo

Posted by Rex Fenley <Re...@remind101.com>.
My jobs normally use the blink planner, I noticed with this test that may
not be the case.

On Mon, Nov 2, 2020 at 12:38 PM Rex Fenley <Re...@remind101.com> wrote:

> Flink 1.11.2 with Scala 2.12
>
> Error:
> [info] JobScalaTest:
> [info] - dummy *** FAILED ***
> [info]   org.apache.flink.table.api.ValidationException: Field types of
> query result and registered TableSink  do not match.
> [info] Query schema: [user: BIGINT, product: ROW<`name`
> VARCHAR(2147483647), `id` BIGINT>, amount: INT]
> [info] Sink schema: [user: BIGINT, product: LEGACY('STRUCTURED_TYPE',
> 'ANY<ProductItem,
> rO0ABXNyAB1Kb2JTY2FsYVRlc3QkJGFub24kOSQkYW5vbiQxMODHL_EXRquJAgAAeHIANm9yZy5hcGFjaGUuZmxpbmsuYXBpLnNjYWxhLnR5cGV1dGlscy5DYXNlQ2xhc3NUeXBlSW5mb46d1-iqONqQAgAMTAARUEFUVEVSTl9JTlRfRklFTER0ABlMamF2YS91dGlsL3JlZ2V4L1BhdHRlcm47TAAVUEFUVEVSTl9ORVNURURfRklFTERTcQB-AAJMAB5QQVRURVJOX05FU1RFRF9GSUVMRFNfV0lMRENBUkRxAH4AAkwAC1JFR0VYX0ZJRUxEdAASTGphdmEvbGFuZy9TdHJpbmc7TAAPUkVHRVhfSU5UX0ZJRUxEcQB-AANMABNSRUdFWF9ORVNURURfRklFTERTcQB-AANMABxSRUdFWF9ORVNURURfRklFTERTX1dJTERDQVJEcQB-AANMAA9SRUdFWF9TVFJfRklFTERxAH4AA0wABWNsYXp6dAARTGphdmEvbGFuZy9DbGFzcztMAApmaWVsZE5hbWVzdAAWTHNjYWxhL2NvbGxlY3Rpb24vU2VxO0wACmZpZWxkVHlwZXNxAH4ABVsAEnR5cGVQYXJhbVR5cGVJbmZvc3QAN1tMb3JnL2FwYWNoZS9mbGluay9hcGkvY29tbW9uL3R5cGVpbmZvL1R5cGVJbmZvcm1hdGlvbjt4cgA1b3JnLmFwYWNoZS5mbGluay5hcGkuamF2YS50eXBldXRpbHMuVHVwbGVUeXBlSW5mb0Jhc2UAAAAAAAAAAQIAAkkAC3RvdGFsRmllbGRzWwAFdHlwZXNxAH4ABnhyADNvcmcuYXBhY2hlLmZsaW5rLmFwaS5jb21tb24udHlwZXV0aWxzLkNvbXBvc2l0ZVR5cGUAAAAAAAAAAQIAAUwACXR5cGVDbGFzc3EAfgAEeHIANG9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBlaW5mby5UeXBlSW5mb3JtYXRpb26UjchIurN66wIAAHhwdnIAC1Byb2R1Y3RJdGVtBwFtETzcflcCAAJKAAJpZEwABG5hbWVxAH4AA3hwAAAAAnVyADdbTG9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBlaW5mby5UeXBlSW5mb3JtYXRpb247uKBspBqaFLYCAAB4cAAAAAJzcgAyb3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGVpbmZvLkJhc2ljVHlwZUluZm_6BPCCpWndBgIABEwABWNsYXp6cQB-AARMAA9jb21wYXJhdG9yQ2xhc3NxAH4ABFsAF3Bvc3NpYmxlQ2FzdFRhcmdldFR5cGVzdAASW0xqYXZhL2xhbmcvQ2xhc3M7TAAKc2VyaWFsaXplcnQANkxvcmcvYXBhY2hlL2ZsaW5rL2FwaS9jb21tb24vdHlwZXV0aWxzL1R5cGVTZXJpYWxpemVyO3hxAH4ACXZyABBqYXZhLmxhbmcuU3RyaW5noPCkOHo7s0ICAAB4cHZyADtvcmcuYXBhY2hlLmZsaW5rLmFwaS5jb21tb24udHlwZXV0aWxzLmJhc2UuU3RyaW5nQ29tcGFyYXRvcgAAAAAAAAABAgAAeHIAPm9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBldXRpbHMuYmFzZS5CYXNpY1R5cGVDb21wYXJhdG9yAAAAAAAAAAECAAJaABNhc2NlbmRpbmdDb21wYXJpc29uWwALY29tcGFyYXRvcnN0ADdbTG9yZy9hcGFjaGUvZmxpbmsvYXBpL2NvbW1vbi90eXBldXRpbHMvVHlwZUNvbXBhcmF0b3I7eHIANG9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBldXRpbHMuVHlwZUNvbXBhcmF0b3IAAAAAAAAAAQIAAHhwdXIAEltMamF2YS5sYW5nLkNsYXNzO6sW167LzVqZAgAAeHAAAAAAc3IAO29yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBldXRpbHMuYmFzZS5TdHJpbmdTZXJpYWxpemVyAAAAAAAAAAECAAB4cgBCb3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGV1dGlscy5iYXNlLlR5cGVTZXJpYWxpemVyU2luZ2xldG9ueamHqscud0UCAAB4cgA0b3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGV1dGlscy5UeXBlU2VyaWFsaXplcgAAAAAAAAABAgAAeHBzcgA0b3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGVpbmZvLkludGVnZXJUeXBlSW5mb5AFxEVpQEqVAgAAeHIANG9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBlaW5mby5OdW1lcmljVHlwZUluZm-tmMZzMAKMFgIAAHhxAH4AD3ZyAA5qYXZhLmxhbmcuTG9uZzuL5JDMjyPfAgABSgAFdmFsdWV4cgAQamF2YS5sYW5nLk51bWJlcoaslR0LlOCLAgAAeHB2cgA5b3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGV1dGlscy5iYXNlLkxvbmdDb21wYXJhdG9yAAAAAAAAAAECAAB4cQB-ABZ1cQB-ABoAAAADdnIAD2phdmEubGFuZy5GbG9hdNrtyaLbPPDsAgABRgAFdmFsdWV4cQB-ACR2cgAQamF2YS5sYW5nLkRvdWJsZYCzwkopa_sEAgABRAAFdmFsdWV4cQB-ACR2cgATamF2YS5sYW5nLkNoYXJhY3RlcjSLR9lrGiZ4AgABQwAFdmFsdWV4cHNyADlvcmcuYXBhY2hlLmZsaW5rLmFwaS5jb21tb24udHlwZXV0aWxzLmJhc2UuTG9uZ1NlcmlhbGl6ZXIAAAAAAAAAAQIAAHhxAH4AHXNyABdqYXZhLnV0aWwucmVnZXguUGF0dGVybkZn1WtuSQINAgACSQAFZmxhZ3NMAAdwYXR0ZXJucQB-AAN4cAAAAAB0AAZbMC05XStzcQB-ADEAAAAAdAAwKFtccHtMfV9cJF1bXHB7TH1ccHtEaWdpdH1fXCRdKnxbMC05XSspKFwuKC4rKSk_c3EAfgAxAAAAAHQANihbXHB7TH1fXCRdW1xwe0x9XHB7RGlnaXR9X1wkXSp8WzAtOV0rKShcLiguKykpP3xcKnxcX3QAJVtccHtMfV9cJF1bXHB7TH1ccHtEaWdpdH1fXCRdKnxbMC05XStxAH4AM3EAfgA1cQB-ADd0AB5bXHB7TH1fXCRdW1xwe0x9XHB7RGlnaXR9X1wkXSpxAH4ADHNyADJzY2FsYS5jb2xsZWN0aW9uLmltbXV0YWJsZS5MaXN0JFNlcmlhbGl6YXRpb25Qcm94eQAAAAAAAAABAwAAeHB0AARuYW1ldAACaWRzcgAsc2NhbGEuY29sbGVjdGlvbi5pbW11dGFibGUuTGlzdFNlcmlhbGl6ZUVuZCSKXGNb91MLbQIAAHhweHNxAH4AOnEAfgAScQB-ACJxAH4AP3h1cQB-AA0AAAAA>'),
> amount: INT]
> [info]   at
> org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSchemaAndApplyImplicitCast(TableSinkUtils.scala:103)
> [info]   at
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:260)
> [info]   at
> org.apache.flink.table.planner.delegation.PlannerBase.$anonfun$translate$1(PlannerBase.scala:163)
> [info]   at
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:285)
> [info]   at scala.collection.Iterator.foreach(Iterator.scala:943)
> [info]   at scala.collection.Iterator.foreach$(Iterator.scala:943)
> [info]   at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
> [info]   at scala.collection.IterableLike.foreach(IterableLike.scala:74)
> [info]   at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
> [info]   at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
>
> Code:
> import com.remind.graph.people.PeopleJobScala
>
> import org.scalatest.funsuite._
> import org.scalatest.BeforeAndAfter
>
> import org.apache.flink.streaming.api.scala.{
> DataStream,
> StreamExecutionEnvironment
> }
> import org.apache.flink.streaming.util.TestStreamEnvironment
> import org.apache.flink.table.runtime.util._
> import org.apache.flink.test.util.AbstractTestBase
> import org.apache.flink.table.api._
> import org.apache.flink.table.api.bridge.scala._
> import org.apache.flink.streaming.api.scala._
> import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
> import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
> import org.apache.flink.api.common.state.ListState
> import org.apache.flink.runtime.state.FunctionInitializationContext
> import org.apache.flink.api.common.state.ListStateDescriptor
> import org.apache.flink.runtime.state.FunctionSnapshotContext
> import org.apache.flink.types.Row
>
> import java.io.Serializable;
> import java.sql.Timestamp;
> import java.text.SimpleDateFormat
> import java.util.concurrent.atomic.AtomicInteger
> import java.{util => ju}
>
> import scala.collection.JavaConverters._
> import scala.collection.mutable
> import scala.util.Try
>
> case class Order(user: Long, product: ProductItem, amount: Int) {
> def this() {
> this(0, null, 0)
> }
>
> override def toString(): String = {
> return "Order{" +
> "user=" + user +
> ", product='" + product + '\'' +
> ", amount=" + amount +
> '}';
> }
> }
>
> case class ProductItem(name: String, id: Long) {
> def this() {
> this(null, 0)
> }
>
> override def toString(): String = {
> return "Product{" +
> "name='" + name + '\'' +
> ", id=" + id +
> '}';
> }
> }
>
> class JobScalaTest extends AnyFunSuite with BeforeAndAfter {
> var env: StreamExecutionEnvironment = _
> var tEnv: StreamTableEnvironment = _
>
> before {
> this.env = StreamExecutionEnvironment.getExecutionEnvironment
> this.env.setParallelism(2)
> this.env.getConfig.enableObjectReuse()
> val setting = EnvironmentSettings.newInstance().inStreamingMode().build()
> this.tEnv = StreamTableEnvironment.create(env, setting)
> }
>
> after {
> StreamTestSink.clear()
> // TestValuesTableFactory.clearAllData()
> }
>
> def dateFrom(stringDate: String): java.sql.Date = {
> val date = new SimpleDateFormat("dd/MM/yyyy")
> .parse(stringDate)
> return new java.sql.Date(date.getTime())
> }
>
> def printTable(table: Table) = {
> println(table)
> table.printSchema()
> println(table.getSchema().getFieldNames().mkString(", "))
> }
>
> def printDataStream(dataStream: DataStream[_]) = {
> println(dataStream)
> println(dataStream.dataType)
> }
>
> test("dummy") {
> val orderA: DataStream[Order] = this.env.fromCollection(
> Seq(
> new Order(1L, new ProductItem("beer", 10L), 3),
> new Order(1L, new ProductItem("diaper", 11L), 4),
> new Order(3L, new ProductItem("rubber", 12L), 2)
> )
> )
>
> val orderB: DataStream[Order] = this.env.fromCollection(
> Seq(
> new Order(2L, new ProductItem("pen", 13L), 3),
> new Order(2L, new ProductItem("rubber", 12L), 3),
> new Order(4L, new ProductItem("beer", 10L), 1)
> )
> )
>
> println(orderB)
> println(orderB.dataType)
>
> // convert DataStream to Table
> val tableA =
> this.tEnv.fromDataStream(orderA, 'user, 'product, 'amount)
> println(tableA)
> tableA.printSchema()
> println(tableA.getSchema().getFieldNames().mkString(", "))
> // register DataStream as Table
> this.tEnv.createTemporaryView("OrderB", orderB, 'user, 'product, 'amount)
>
> // union the two tables
> val result = this.tEnv.sqlQuery(s"""
> |SELECT * FROM $tableA WHERE amount > 2
> |UNION ALL
> |SELECT * FROM OrderB WHERE amount < 2
> """.stripMargin)
>
> val sink = new StringSink[Order]()
> result.toAppendStream[Order].addSink(sink)
>
> this.env.execute()
>
> val expected = List(
> "Order{user=1, product='Product{name='beer', id=10}', amount=3}",
> "Order{user=1, product='Product{name='diaper', id=11}', amount=4}",
> "Order{user=4, product='Product{name='beer', id=10}', amount=1}"
> )
> val results = sink.getResults.sorted
> println("results")
> println(results)
> assert(expected.sorted === results)
> }
> }
>
> /**
> * Taken from:
> https://github.com/apache/flink/blob/release-1.11.2/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamTestSink.scala
> * There's a whole bunch of other test sinks to choose from there.
> */
> object StreamTestSink {
>
> val idCounter: AtomicInteger = new AtomicInteger(0)
>
> val globalResults =
> mutable.HashMap.empty[Int, mutable.Map[Int, mutable.ArrayBuffer[String]]]
> val globalRetractResults =
> mutable.HashMap.empty[Int, mutable.Map[Int, mutable.ArrayBuffer[String]]]
> val globalUpsertResults =
> mutable.HashMap.empty[Int, mutable.Map[Int, mutable.Map[String, String]]]
>
> def getNewSinkId: Int = {
> val idx = idCounter.getAndIncrement()
> this.synchronized {
> globalResults.put(
> idx,
> mutable.HashMap.empty[Int, mutable.ArrayBuffer[String]]
> )
> globalRetractResults.put(
> idx,
> mutable.HashMap.empty[Int, mutable.ArrayBuffer[String]]
> )
> globalUpsertResults.put(
> idx,
> mutable.HashMap.empty[Int, mutable.Map[String, String]]
> )
> }
> idx
> }
>
> def clear(): Unit = {
> globalResults.clear()
> globalRetractResults.clear()
> globalUpsertResults.clear()
> }
> }
>
> abstract class AbstractExactlyOnceSink[T]
> extends RichSinkFunction[T]
> with CheckpointedFunction {
> protected var resultsState: ListState[String] = _
> protected var localResults: mutable.ArrayBuffer[String] = _
> protected val idx: Int = StreamTestSink.getNewSinkId
>
> protected var globalResults: mutable.Map[Int,
> mutable.ArrayBuffer[String]] = _
> protected var globalRetractResults
> : mutable.Map[Int, mutable.ArrayBuffer[String]] = _
> protected var globalUpsertResults
> : mutable.Map[Int, mutable.Map[String, String]] = _
>
> def isInitialized: Boolean = globalResults != null
>
> override def initializeState(context: FunctionInitializationContext):
> Unit = {
> resultsState = context.getOperatorStateStore
> .getListState(
> new ListStateDescriptor[String]("sink-results", Types.STRING)
> )
>
> localResults = mutable.ArrayBuffer.empty[String]
>
> if (context.isRestored) {
> for (value <- resultsState.get().asScala) {
> localResults += value
> }
> }
>
> val taskId = getRuntimeContext.getIndexOfThisSubtask
> StreamTestSink.synchronized(
> StreamTestSink.globalResults(idx) += (taskId -> localResults)
> )
> }
>
> override def snapshotState(context: FunctionSnapshotContext): Unit = {
> resultsState.clear()
> for (value <- localResults) {
> resultsState.add(value)
> }
> }
>
> protected def clearAndStashGlobalResults(): Unit = {
> if (globalResults == null) {
> StreamTestSink.synchronized {
> globalResults = StreamTestSink.globalResults.remove(idx).get
> globalRetractResults =
> StreamTestSink.globalRetractResults.remove(idx).get
> globalUpsertResults = StreamTestSink.globalUpsertResults.remove(idx).get
> }
> }
> }
>
> protected def getResults: List[String] = {
> clearAndStashGlobalResults()
> val result = mutable.ArrayBuffer.empty[String]
> this.globalResults.foreach {
> case (_, list) => result ++= list
> }
> result.toList
> }
> }
>
> final class StringSink[T] extends AbstractExactlyOnceSink[T]() {
> override def invoke(value: T) {
> localResults += value.toString
> }
>
> override def getResults: List[String] = super.getResults
> }
>
>
>
> On Mon, Nov 2, 2020 at 5:23 AM Aljoscha Krettek <al...@apache.org>
> wrote:
>
>> @Timo: Is this sth that would work when using the new type stack? From
>> the message I'm assuming it's using the older type stack.
>>
>> @Rex: Which Flink version are you using and could you maybe post the
>> code snipped that you use to do conversions?
>>
>> Best,
>> Aljoscha
>>
>> On 02.11.20 06:50, Rex Fenley wrote:
>> > Maybe this is related to this issue?
>> > https://issues.apache.org/jira/browse/FLINK-17683
>> >
>> > On Fri, Oct 30, 2020 at 8:43 PM Rex Fenley <Re...@remind101.com> wrote:
>> >
>> >> Correction, I'm using Scala case classes not strictly Java POJOs just
>> to
>> >> be clear.
>> >>
>> >> On Fri, Oct 30, 2020 at 7:56 PM Rex Fenley <Re...@remind101.com> wrote:
>> >>
>> >>> Hello,
>> >>>
>> >>> I keep running into trouble moving between DataStream and SQL with
>> POJOs
>> >>> because my nested POJOs turn into LEGACY('STRUCTURED_TYPE', is there
>> any
>> >>> way to convert them back to POJOs in Flink when converting a SQL
>> Table back
>> >>> to a DataStream?
>> >>>
>> >>> Thanks!
>> >>>
>> >>> --
>> >>>
>> >>> Rex Fenley  |  Software Engineer - Mobile and Backend
>> >>>
>> >>>
>> >>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/
>> >
>> >>>   |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>> >>> <https://www.facebook.com/remindhq>
>> >>>
>> >>
>> >>
>> >> --
>> >>
>> >> Rex Fenley  |  Software Engineer - Mobile and Backend
>> >>
>> >>
>> >> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>> |
>> >>   FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>> >> <https://www.facebook.com/remindhq>
>> >>
>> >
>> >
>>
>>
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>  |
>  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
> <https://www.facebook.com/remindhq>
>


-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>

Re: LEGACY('STRUCTURED_TYPE' to pojo

Posted by Rex Fenley <Re...@remind101.com>.
Flink 1.11.2 with Scala 2.12

Error:
[info] JobScalaTest:
[info] - dummy *** FAILED ***
[info]   org.apache.flink.table.api.ValidationException: Field types of
query result and registered TableSink  do not match.
[info] Query schema: [user: BIGINT, product: ROW<`name`
VARCHAR(2147483647), `id` BIGINT>, amount: INT]
[info] Sink schema: [user: BIGINT, product: LEGACY('STRUCTURED_TYPE',
'ANY<ProductItem,
rO0ABXNyAB1Kb2JTY2FsYVRlc3QkJGFub24kOSQkYW5vbiQxMODHL_EXRquJAgAAeHIANm9yZy5hcGFjaGUuZmxpbmsuYXBpLnNjYWxhLnR5cGV1dGlscy5DYXNlQ2xhc3NUeXBlSW5mb46d1-iqONqQAgAMTAARUEFUVEVSTl9JTlRfRklFTER0ABlMamF2YS91dGlsL3JlZ2V4L1BhdHRlcm47TAAVUEFUVEVSTl9ORVNURURfRklFTERTcQB-AAJMAB5QQVRURVJOX05FU1RFRF9GSUVMRFNfV0lMRENBUkRxAH4AAkwAC1JFR0VYX0ZJRUxEdAASTGphdmEvbGFuZy9TdHJpbmc7TAAPUkVHRVhfSU5UX0ZJRUxEcQB-AANMABNSRUdFWF9ORVNURURfRklFTERTcQB-AANMABxSRUdFWF9ORVNURURfRklFTERTX1dJTERDQVJEcQB-AANMAA9SRUdFWF9TVFJfRklFTERxAH4AA0wABWNsYXp6dAARTGphdmEvbGFuZy9DbGFzcztMAApmaWVsZE5hbWVzdAAWTHNjYWxhL2NvbGxlY3Rpb24vU2VxO0wACmZpZWxkVHlwZXNxAH4ABVsAEnR5cGVQYXJhbVR5cGVJbmZvc3QAN1tMb3JnL2FwYWNoZS9mbGluay9hcGkvY29tbW9uL3R5cGVpbmZvL1R5cGVJbmZvcm1hdGlvbjt4cgA1b3JnLmFwYWNoZS5mbGluay5hcGkuamF2YS50eXBldXRpbHMuVHVwbGVUeXBlSW5mb0Jhc2UAAAAAAAAAAQIAAkkAC3RvdGFsRmllbGRzWwAFdHlwZXNxAH4ABnhyADNvcmcuYXBhY2hlLmZsaW5rLmFwaS5jb21tb24udHlwZXV0aWxzLkNvbXBvc2l0ZVR5cGUAAAAAAAAAAQIAAUwACXR5cGVDbGFzc3EAfgAEeHIANG9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBlaW5mby5UeXBlSW5mb3JtYXRpb26UjchIurN66wIAAHhwdnIAC1Byb2R1Y3RJdGVtBwFtETzcflcCAAJKAAJpZEwABG5hbWVxAH4AA3hwAAAAAnVyADdbTG9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBlaW5mby5UeXBlSW5mb3JtYXRpb247uKBspBqaFLYCAAB4cAAAAAJzcgAyb3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGVpbmZvLkJhc2ljVHlwZUluZm_6BPCCpWndBgIABEwABWNsYXp6cQB-AARMAA9jb21wYXJhdG9yQ2xhc3NxAH4ABFsAF3Bvc3NpYmxlQ2FzdFRhcmdldFR5cGVzdAASW0xqYXZhL2xhbmcvQ2xhc3M7TAAKc2VyaWFsaXplcnQANkxvcmcvYXBhY2hlL2ZsaW5rL2FwaS9jb21tb24vdHlwZXV0aWxzL1R5cGVTZXJpYWxpemVyO3hxAH4ACXZyABBqYXZhLmxhbmcuU3RyaW5noPCkOHo7s0ICAAB4cHZyADtvcmcuYXBhY2hlLmZsaW5rLmFwaS5jb21tb24udHlwZXV0aWxzLmJhc2UuU3RyaW5nQ29tcGFyYXRvcgAAAAAAAAABAgAAeHIAPm9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBldXRpbHMuYmFzZS5CYXNpY1R5cGVDb21wYXJhdG9yAAAAAAAAAAECAAJaABNhc2NlbmRpbmdDb21wYXJpc29uWwALY29tcGFyYXRvcnN0ADdbTG9yZy9hcGFjaGUvZmxpbmsvYXBpL2NvbW1vbi90eXBldXRpbHMvVHlwZUNvbXBhcmF0b3I7eHIANG9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBldXRpbHMuVHlwZUNvbXBhcmF0b3IAAAAAAAAAAQIAAHhwdXIAEltMamF2YS5sYW5nLkNsYXNzO6sW167LzVqZAgAAeHAAAAAAc3IAO29yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBldXRpbHMuYmFzZS5TdHJpbmdTZXJpYWxpemVyAAAAAAAAAAECAAB4cgBCb3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGV1dGlscy5iYXNlLlR5cGVTZXJpYWxpemVyU2luZ2xldG9ueamHqscud0UCAAB4cgA0b3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGV1dGlscy5UeXBlU2VyaWFsaXplcgAAAAAAAAABAgAAeHBzcgA0b3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGVpbmZvLkludGVnZXJUeXBlSW5mb5AFxEVpQEqVAgAAeHIANG9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBlaW5mby5OdW1lcmljVHlwZUluZm-tmMZzMAKMFgIAAHhxAH4AD3ZyAA5qYXZhLmxhbmcuTG9uZzuL5JDMjyPfAgABSgAFdmFsdWV4cgAQamF2YS5sYW5nLk51bWJlcoaslR0LlOCLAgAAeHB2cgA5b3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGV1dGlscy5iYXNlLkxvbmdDb21wYXJhdG9yAAAAAAAAAAECAAB4cQB-ABZ1cQB-ABoAAAADdnIAD2phdmEubGFuZy5GbG9hdNrtyaLbPPDsAgABRgAFdmFsdWV4cQB-ACR2cgAQamF2YS5sYW5nLkRvdWJsZYCzwkopa_sEAgABRAAFdmFsdWV4cQB-ACR2cgATamF2YS5sYW5nLkNoYXJhY3RlcjSLR9lrGiZ4AgABQwAFdmFsdWV4cHNyADlvcmcuYXBhY2hlLmZsaW5rLmFwaS5jb21tb24udHlwZXV0aWxzLmJhc2UuTG9uZ1NlcmlhbGl6ZXIAAAAAAAAAAQIAAHhxAH4AHXNyABdqYXZhLnV0aWwucmVnZXguUGF0dGVybkZn1WtuSQINAgACSQAFZmxhZ3NMAAdwYXR0ZXJucQB-AAN4cAAAAAB0AAZbMC05XStzcQB-ADEAAAAAdAAwKFtccHtMfV9cJF1bXHB7TH1ccHtEaWdpdH1fXCRdKnxbMC05XSspKFwuKC4rKSk_c3EAfgAxAAAAAHQANihbXHB7TH1fXCRdW1xwe0x9XHB7RGlnaXR9X1wkXSp8WzAtOV0rKShcLiguKykpP3xcKnxcX3QAJVtccHtMfV9cJF1bXHB7TH1ccHtEaWdpdH1fXCRdKnxbMC05XStxAH4AM3EAfgA1cQB-ADd0AB5bXHB7TH1fXCRdW1xwe0x9XHB7RGlnaXR9X1wkXSpxAH4ADHNyADJzY2FsYS5jb2xsZWN0aW9uLmltbXV0YWJsZS5MaXN0JFNlcmlhbGl6YXRpb25Qcm94eQAAAAAAAAABAwAAeHB0AARuYW1ldAACaWRzcgAsc2NhbGEuY29sbGVjdGlvbi5pbW11dGFibGUuTGlzdFNlcmlhbGl6ZUVuZCSKXGNb91MLbQIAAHhweHNxAH4AOnEAfgAScQB-ACJxAH4AP3h1cQB-AA0AAAAA>'),
amount: INT]
[info]   at
org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSchemaAndApplyImplicitCast(TableSinkUtils.scala:103)
[info]   at
org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:260)
[info]   at
org.apache.flink.table.planner.delegation.PlannerBase.$anonfun$translate$1(PlannerBase.scala:163)
[info]   at
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:285)
[info]   at scala.collection.Iterator.foreach(Iterator.scala:943)
[info]   at scala.collection.Iterator.foreach$(Iterator.scala:943)
[info]   at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
[info]   at scala.collection.IterableLike.foreach(IterableLike.scala:74)
[info]   at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
[info]   at scala.collection.AbstractIterable.foreach(Iterable.scala:56)

Code:
import com.remind.graph.people.PeopleJobScala

import org.scalatest.funsuite._
import org.scalatest.BeforeAndAfter

import org.apache.flink.streaming.api.scala.{
DataStream,
StreamExecutionEnvironment
}
import org.apache.flink.streaming.util.TestStreamEnvironment
import org.apache.flink.table.runtime.util._
import org.apache.flink.test.util.AbstractTestBase
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
import org.apache.flink.api.common.state.ListState
import org.apache.flink.runtime.state.FunctionInitializationContext
import org.apache.flink.api.common.state.ListStateDescriptor
import org.apache.flink.runtime.state.FunctionSnapshotContext
import org.apache.flink.types.Row

import java.io.Serializable;
import java.sql.Timestamp;
import java.text.SimpleDateFormat
import java.util.concurrent.atomic.AtomicInteger
import java.{util => ju}

import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.util.Try

case class Order(user: Long, product: ProductItem, amount: Int) {
def this() {
this(0, null, 0)
}

override def toString(): String = {
return "Order{" +
"user=" + user +
", product='" + product + '\'' +
", amount=" + amount +
'}';
}
}

case class ProductItem(name: String, id: Long) {
def this() {
this(null, 0)
}

override def toString(): String = {
return "Product{" +
"name='" + name + '\'' +
", id=" + id +
'}';
}
}

class JobScalaTest extends AnyFunSuite with BeforeAndAfter {
var env: StreamExecutionEnvironment = _
var tEnv: StreamTableEnvironment = _

before {
this.env = StreamExecutionEnvironment.getExecutionEnvironment
this.env.setParallelism(2)
this.env.getConfig.enableObjectReuse()
val setting = EnvironmentSettings.newInstance().inStreamingMode().build()
this.tEnv = StreamTableEnvironment.create(env, setting)
}

after {
StreamTestSink.clear()
// TestValuesTableFactory.clearAllData()
}

def dateFrom(stringDate: String): java.sql.Date = {
val date = new SimpleDateFormat("dd/MM/yyyy")
.parse(stringDate)
return new java.sql.Date(date.getTime())
}

def printTable(table: Table) = {
println(table)
table.printSchema()
println(table.getSchema().getFieldNames().mkString(", "))
}

def printDataStream(dataStream: DataStream[_]) = {
println(dataStream)
println(dataStream.dataType)
}

test("dummy") {
val orderA: DataStream[Order] = this.env.fromCollection(
Seq(
new Order(1L, new ProductItem("beer", 10L), 3),
new Order(1L, new ProductItem("diaper", 11L), 4),
new Order(3L, new ProductItem("rubber", 12L), 2)
)
)

val orderB: DataStream[Order] = this.env.fromCollection(
Seq(
new Order(2L, new ProductItem("pen", 13L), 3),
new Order(2L, new ProductItem("rubber", 12L), 3),
new Order(4L, new ProductItem("beer", 10L), 1)
)
)

println(orderB)
println(orderB.dataType)

// convert DataStream to Table
val tableA =
this.tEnv.fromDataStream(orderA, 'user, 'product, 'amount)
println(tableA)
tableA.printSchema()
println(tableA.getSchema().getFieldNames().mkString(", "))
// register DataStream as Table
this.tEnv.createTemporaryView("OrderB", orderB, 'user, 'product, 'amount)

// union the two tables
val result = this.tEnv.sqlQuery(s"""
|SELECT * FROM $tableA WHERE amount > 2
|UNION ALL
|SELECT * FROM OrderB WHERE amount < 2
""".stripMargin)

val sink = new StringSink[Order]()
result.toAppendStream[Order].addSink(sink)

this.env.execute()

val expected = List(
"Order{user=1, product='Product{name='beer', id=10}', amount=3}",
"Order{user=1, product='Product{name='diaper', id=11}', amount=4}",
"Order{user=4, product='Product{name='beer', id=10}', amount=1}"
)
val results = sink.getResults.sorted
println("results")
println(results)
assert(expected.sorted === results)
}
}

/**
* Taken from:
https://github.com/apache/flink/blob/release-1.11.2/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamTestSink.scala
* There's a whole bunch of other test sinks to choose from there.
*/
object StreamTestSink {

val idCounter: AtomicInteger = new AtomicInteger(0)

val globalResults =
mutable.HashMap.empty[Int, mutable.Map[Int, mutable.ArrayBuffer[String]]]
val globalRetractResults =
mutable.HashMap.empty[Int, mutable.Map[Int, mutable.ArrayBuffer[String]]]
val globalUpsertResults =
mutable.HashMap.empty[Int, mutable.Map[Int, mutable.Map[String, String]]]

def getNewSinkId: Int = {
val idx = idCounter.getAndIncrement()
this.synchronized {
globalResults.put(
idx,
mutable.HashMap.empty[Int, mutable.ArrayBuffer[String]]
)
globalRetractResults.put(
idx,
mutable.HashMap.empty[Int, mutable.ArrayBuffer[String]]
)
globalUpsertResults.put(
idx,
mutable.HashMap.empty[Int, mutable.Map[String, String]]
)
}
idx
}

def clear(): Unit = {
globalResults.clear()
globalRetractResults.clear()
globalUpsertResults.clear()
}
}

abstract class AbstractExactlyOnceSink[T]
extends RichSinkFunction[T]
with CheckpointedFunction {
protected var resultsState: ListState[String] = _
protected var localResults: mutable.ArrayBuffer[String] = _
protected val idx: Int = StreamTestSink.getNewSinkId

protected var globalResults: mutable.Map[Int, mutable.ArrayBuffer[String]]
= _
protected var globalRetractResults
: mutable.Map[Int, mutable.ArrayBuffer[String]] = _
protected var globalUpsertResults
: mutable.Map[Int, mutable.Map[String, String]] = _

def isInitialized: Boolean = globalResults != null

override def initializeState(context: FunctionInitializationContext): Unit
= {
resultsState = context.getOperatorStateStore
.getListState(
new ListStateDescriptor[String]("sink-results", Types.STRING)
)

localResults = mutable.ArrayBuffer.empty[String]

if (context.isRestored) {
for (value <- resultsState.get().asScala) {
localResults += value
}
}

val taskId = getRuntimeContext.getIndexOfThisSubtask
StreamTestSink.synchronized(
StreamTestSink.globalResults(idx) += (taskId -> localResults)
)
}

override def snapshotState(context: FunctionSnapshotContext): Unit = {
resultsState.clear()
for (value <- localResults) {
resultsState.add(value)
}
}

protected def clearAndStashGlobalResults(): Unit = {
if (globalResults == null) {
StreamTestSink.synchronized {
globalResults = StreamTestSink.globalResults.remove(idx).get
globalRetractResults =
StreamTestSink.globalRetractResults.remove(idx).get
globalUpsertResults = StreamTestSink.globalUpsertResults.remove(idx).get
}
}
}

protected def getResults: List[String] = {
clearAndStashGlobalResults()
val result = mutable.ArrayBuffer.empty[String]
this.globalResults.foreach {
case (_, list) => result ++= list
}
result.toList
}
}

final class StringSink[T] extends AbstractExactlyOnceSink[T]() {
override def invoke(value: T) {
localResults += value.toString
}

override def getResults: List[String] = super.getResults
}



On Mon, Nov 2, 2020 at 5:23 AM Aljoscha Krettek <al...@apache.org> wrote:

> @Timo: Is this sth that would work when using the new type stack? From
> the message I'm assuming it's using the older type stack.
>
> @Rex: Which Flink version are you using and could you maybe post the
> code snipped that you use to do conversions?
>
> Best,
> Aljoscha
>
> On 02.11.20 06:50, Rex Fenley wrote:
> > Maybe this is related to this issue?
> > https://issues.apache.org/jira/browse/FLINK-17683
> >
> > On Fri, Oct 30, 2020 at 8:43 PM Rex Fenley <Re...@remind101.com> wrote:
> >
> >> Correction, I'm using Scala case classes not strictly Java POJOs just to
> >> be clear.
> >>
> >> On Fri, Oct 30, 2020 at 7:56 PM Rex Fenley <Re...@remind101.com> wrote:
> >>
> >>> Hello,
> >>>
> >>> I keep running into trouble moving between DataStream and SQL with
> POJOs
> >>> because my nested POJOs turn into LEGACY('STRUCTURED_TYPE', is there
> any
> >>> way to convert them back to POJOs in Flink when converting a SQL Table
> back
> >>> to a DataStream?
> >>>
> >>> Thanks!
> >>>
> >>> --
> >>>
> >>> Rex Fenley  |  Software Engineer - Mobile and Backend
> >>>
> >>>
> >>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
> >>>   |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
> >>> <https://www.facebook.com/remindhq>
> >>>
> >>
> >>
> >> --
> >>
> >> Rex Fenley  |  Software Engineer - Mobile and Backend
> >>
> >>
> >> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
> |
> >>   FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
> >> <https://www.facebook.com/remindhq>
> >>
> >
> >
>
>

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>

Re: LEGACY('STRUCTURED_TYPE' to pojo

Posted by Aljoscha Krettek <al...@apache.org>.
@Timo: Is this sth that would work when using the new type stack? From 
the message I'm assuming it's using the older type stack.

@Rex: Which Flink version are you using and could you maybe post the 
code snipped that you use to do conversions?

Best,
Aljoscha

On 02.11.20 06:50, Rex Fenley wrote:
> Maybe this is related to this issue?
> https://issues.apache.org/jira/browse/FLINK-17683
> 
> On Fri, Oct 30, 2020 at 8:43 PM Rex Fenley <Re...@remind101.com> wrote:
> 
>> Correction, I'm using Scala case classes not strictly Java POJOs just to
>> be clear.
>>
>> On Fri, Oct 30, 2020 at 7:56 PM Rex Fenley <Re...@remind101.com> wrote:
>>
>>> Hello,
>>>
>>> I keep running into trouble moving between DataStream and SQL with POJOs
>>> because my nested POJOs turn into LEGACY('STRUCTURED_TYPE', is there any
>>> way to convert them back to POJOs in Flink when converting a SQL Table back
>>> to a DataStream?
>>>
>>> Thanks!
>>>
>>> --
>>>
>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>
>>>
>>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>>   |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>>> <https://www.facebook.com/remindhq>
>>>
>>
>>
>> --
>>
>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>
>>
>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>  |
>>   FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>> <https://www.facebook.com/remindhq>
>>
> 
>