You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Steve Robert <co...@gmail.com> on 2019/09/04 13:56:48 UTC
TABLE API + DataStream outsourcing schema or Pojo?
Hi guys ,
It's been a while since I'm studying TABLE APIs for integration into my
system.
when i take a look on this documentation
:
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/connect.html#connectors
I understand that it is possible to apply a JSON FORMAT on the connector
and apply a JSON-SCHEMA without any hardcoded java pojo
.jsonSchema(
"{" +
" type: 'object'," +
" properties: {" +
" lon: {" +
" type: 'number'" +
" }," +
" rideTime: {" +
" type: 'string'," +
" format: 'date-time'" +
" }" +
" }" +
"}"
)
but my problematic is the following my data comes from REST-API , so I
have to process the data and transmit it via a DataStream
the problem is that between the conversation of a dataStream and a
table must pass through a Java Pojo. Datastream<YourPojo> input....
Table table=tEnv.fromDataStream(input);
I tried a trick while making a conversation from my JSON to AVRO using
a GenericRecord but it does not seem possible .
my user case and being able to add REST-API processing in runtime and
be able to outsource and dynamically load my Pojo / Schema without harcode
an Java-Pojo object
Do you have an approach to suggest me ?
Thank a lot
Re: TABLE API + DataStream outsourcing schema or Pojo?
Posted by Fabian Hueske <fh...@gmail.com>.
Hi Steve,
The memory catalog does not persist metadata and needs to be repopulated
everytime.
However, you can implement a catalog that persists the metadata to a file
or a database.
There is an effort to implement a Catalog interface of Hive's metastore.
A preview is available in the latest release (1.9.0)
Best, Fabian
Am Do., 5. Sept. 2019 um 14:52 Uhr schrieb Steve Robert <
contact.steverobert@gmail.com>:
> Hi Fabian ,
>
> thank you for your answer it is indeed the solution that I am currently
> testing
> i use TypeInformation<Row> convert =
> JsonRowSchemaConverter.convert(JSON_SCHEMA); provided by the
> flink-json and provide the TypeFormation to the operatorStream
> its look like to work :) with this solution my schema can be outside my
> package
>
> one additional question about . GenericMemoryCatalog
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/catalogs.html
> .
> catalog can be use accross multiple job running on the same cluster ? or
> the catalog are scoped on the job session only ?
>
> DataStream<JsonNode> dataStreamJson = dataStream.map(new MapFunction<String, JsonNode>() {
> @Override
> public JsonNode map(String s) throws Exception {
> ObjectMapper objectMapper = new ObjectMapper();
> JsonNode node = objectMapper.readTree(s);
> return node;
> }
> });
> DataStream<Row> dataStreamRow = dataStreamJson.map(new MapFunction<JsonNode, Row>() {
> @Override
> public Row map(JsonNode jsonNode) throws Exception {
> int pos = 0;
> Row row = new Row(jsonNode.size());
> Iterator<String> iterator = jsonNode.fieldNames();
> while (iterator.hasNext()) {
> String key = iterator.next();
> row.setField(pos, jsonNode.get(key).asText());
> pos++;
> }
> return row;
> }
> }).returns(convert);
>
> Table tableA = tEnv.fromDataStream(dataStreamRow);
>
>
> Le jeu. 5 sept. 2019 à 13:23, Fabian Hueske <fh...@gmail.com> a écrit :
>
>> Hi Steve,
>>
>> Maybe you could implement a custom TableSource that queries the data from
>> the rest API and converts the JSON directly into a Row data type.
>> This would also avoid going through the DataStream API just for ingesting
>> the data.
>>
>> Best, Fabian
>>
>> Am Mi., 4. Sept. 2019 um 15:57 Uhr schrieb Steve Robert <
>> contact.steverobert@gmail.com>:
>>
>>> Hi guys ,
>>>
>>> It's been a while since I'm studying TABLE APIs for integration into my
>>> system.
>>> when i take a look on this documentation
>>> :
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/connect.html#connectors
>>>
>>>
>>> I understand that it is possible to apply a JSON FORMAT on the connector
>>> and apply a JSON-SCHEMA without any hardcoded java pojo
>>> .jsonSchema(
>>> "{" +
>>> " type: 'object'," +
>>> " properties: {" +
>>> " lon: {" +
>>> " type: 'number'" +
>>> " }," +
>>> " rideTime: {" +
>>> " type: 'string'," +
>>> " format: 'date-time'" +
>>> " }" +
>>> " }" +
>>> "}"
>>> )
>>>
>>>
>>> but my problematic is the following my data comes from REST-API , so
>>> I have to process the data and transmit it via a DataStream
>>> the problem is that between the conversation of a dataStream and a
>>> table must pass through a Java Pojo. Datastream<YourPojo> input....
>>> Table table=tEnv.fromDataStream(input);
>>> I tried a trick while making a conversation from my JSON to AVRO
>>> using a GenericRecord but it does not seem possible .
>>>
>>> my user case and being able to add REST-API processing in runtime
>>> and be able to outsource and dynamically load my Pojo / Schema without
>>> harcode an Java-Pojo object
>>>
>>>
>>> Do you have an approach to suggest me ?
>>>
>>>
>>> Thank a lot
>>>
>>
Re: TABLE API + DataStream outsourcing schema or Pojo?
Posted by Steve Robert <co...@gmail.com>.
Hi Fabian ,
thank you for your answer it is indeed the solution that I am currently
testing
i use TypeInformation<Row> convert =
JsonRowSchemaConverter.convert(JSON_SCHEMA); provided by the
flink-json and provide the TypeFormation to the operatorStream
its look like to work :) with this solution my schema can be outside my
package
one additional question about . GenericMemoryCatalog
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/catalogs.html
.
catalog can be use accross multiple job running on the same cluster ? or
the catalog are scoped on the job session only ?
DataStream<JsonNode> dataStreamJson = dataStream.map(new
MapFunction<String, JsonNode>() {
@Override
public JsonNode map(String s) throws Exception {
ObjectMapper objectMapper = new ObjectMapper();
JsonNode node = objectMapper.readTree(s);
return node;
}
});
DataStream<Row> dataStreamRow = dataStreamJson.map(new
MapFunction<JsonNode, Row>() {
@Override
public Row map(JsonNode jsonNode) throws Exception {
int pos = 0;
Row row = new Row(jsonNode.size());
Iterator<String> iterator = jsonNode.fieldNames();
while (iterator.hasNext()) {
String key = iterator.next();
row.setField(pos, jsonNode.get(key).asText());
pos++;
}
return row;
}
}).returns(convert);
Table tableA = tEnv.fromDataStream(dataStreamRow);
Le jeu. 5 sept. 2019 à 13:23, Fabian Hueske <fh...@gmail.com> a écrit :
> Hi Steve,
>
> Maybe you could implement a custom TableSource that queries the data from
> the rest API and converts the JSON directly into a Row data type.
> This would also avoid going through the DataStream API just for ingesting
> the data.
>
> Best, Fabian
>
> Am Mi., 4. Sept. 2019 um 15:57 Uhr schrieb Steve Robert <
> contact.steverobert@gmail.com>:
>
>> Hi guys ,
>>
>> It's been a while since I'm studying TABLE APIs for integration into my
>> system.
>> when i take a look on this documentation
>> :
>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/connect.html#connectors
>>
>>
>> I understand that it is possible to apply a JSON FORMAT on the connector
>> and apply a JSON-SCHEMA without any hardcoded java pojo
>> .jsonSchema(
>> "{" +
>> " type: 'object'," +
>> " properties: {" +
>> " lon: {" +
>> " type: 'number'" +
>> " }," +
>> " rideTime: {" +
>> " type: 'string'," +
>> " format: 'date-time'" +
>> " }" +
>> " }" +
>> "}"
>> )
>>
>>
>> but my problematic is the following my data comes from REST-API , so
>> I have to process the data and transmit it via a DataStream
>> the problem is that between the conversation of a dataStream and a
>> table must pass through a Java Pojo. Datastream<YourPojo> input....
>> Table table=tEnv.fromDataStream(input);
>> I tried a trick while making a conversation from my JSON to AVRO
>> using a GenericRecord but it does not seem possible .
>>
>> my user case and being able to add REST-API processing in runtime
>> and be able to outsource and dynamically load my Pojo / Schema without
>> harcode an Java-Pojo object
>>
>>
>> Do you have an approach to suggest me ?
>>
>>
>> Thank a lot
>>
>
Re: TABLE API + DataStream outsourcing schema or Pojo?
Posted by Fabian Hueske <fh...@gmail.com>.
Hi Steve,
Maybe you could implement a custom TableSource that queries the data from
the rest API and converts the JSON directly into a Row data type.
This would also avoid going through the DataStream API just for ingesting
the data.
Best, Fabian
Am Mi., 4. Sept. 2019 um 15:57 Uhr schrieb Steve Robert <
contact.steverobert@gmail.com>:
> Hi guys ,
>
> It's been a while since I'm studying TABLE APIs for integration into my
> system.
> when i take a look on this documentation
> :
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/connect.html#connectors
>
>
> I understand that it is possible to apply a JSON FORMAT on the connector
> and apply a JSON-SCHEMA without any hardcoded java pojo
> .jsonSchema(
> "{" +
> " type: 'object'," +
> " properties: {" +
> " lon: {" +
> " type: 'number'" +
> " }," +
> " rideTime: {" +
> " type: 'string'," +
> " format: 'date-time'" +
> " }" +
> " }" +
> "}"
> )
>
>
> but my problematic is the following my data comes from REST-API , so I
> have to process the data and transmit it via a DataStream
> the problem is that between the conversation of a dataStream and a
> table must pass through a Java Pojo. Datastream<YourPojo> input....
> Table table=tEnv.fromDataStream(input);
> I tried a trick while making a conversation from my JSON to AVRO using
> a GenericRecord but it does not seem possible .
>
> my user case and being able to add REST-API processing in runtime and
> be able to outsource and dynamically load my Pojo / Schema without harcode
> an Java-Pojo object
>
>
> Do you have an approach to suggest me ?
>
>
> Thank a lot
>