You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Alexis Sarda <al...@gmail.com> on 2018/08/07 20:32:14 UTC

JDBCInputFormat and SplitDataProperties

Hi everyone,

I have the following scenario: I have a database table with 3 columns: a
host (string), a timestamp, and an integer ID. Conceptually, what I'd like
to do is:

group by host and timestamp -> based on all the IDs in each group, create a
mapping to n new tuples -> for each unique tuple, count how many times it
appeared across the resulting data

Each new tuple has 3 fields: the host, a new ID, and an Integer=1

What I'm currently doing is roughly:

val input = JDBCInputFormat.buildJDBCInputFormat()...finish()
val source = environment.createInput(inut)
source.partitionByHash("host", "timestamp").mapPartition(...).groupBy(0,
1).aggregate(SUM, 2)

The query given to JDBCInputFormat provides results ordered by host and
timestamp, and I was wondering if performance can be improved by specifying
this in the code. I've looked at
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Terminology-Split-Group-and-Partition-td11030.html
and
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fwd-Processing-Sorted-Input-Datasets-td20038.html,
but I still have some questions:

- If a split is a subset of a partition, what is the meaning of
SplitDataProperties#splitsPartitionedBy? The wording makes me thing that a
split is divided into partitions, meaning that a partition would be a
subset of a split.
- At which point can I retrieve and adjust a SplitDataProperties instance,
if possible at all?
- If I wanted a coarser parallelization where each slot gets all the data
for the same host, would I have to manually create the sub-groups based on
timestamp?

Regards,
Alexis.

Re: JDBCInputFormat and SplitDataProperties

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Alexis,

Yes, the job cannot be executed until the required number of processing
slots becomes available.
IIRC, there is a timeout and a job gets canceled once the waiting time
exceeds the threshold.

Best, Fabian

2018-08-10 15:35 GMT+02:00 Alexis Sarda <al...@gmail.com>:

> It ended up being a wrong configuration of the cluster; there was only 1
> task manager with 1 slot.
>
> If I submit a job with "flink run -p 24 ...", will the job hang until at
> least 24 slots are available?
>
> Regards,
> Alexis.
>
> On Fri, 10 Aug 2018, 14:01 Fabian Hueske <fh...@gmail.com> wrote:
>
>> Can you share the plan for the program?
>>
>> Are you sure that more than 1 split is generated by the JdbcInputFormat?
>>
>> 2018-08-10 12:04 GMT+02:00 Alexis Sarda <al...@gmail.com>:
>>
>>> It seems I may have spoken too soon. After executing the job with more
>>> data, I can see the following things in the Flink dashboard:
>>>
>>> - The first subtask is a chained DataSource -> GroupCombine. Even with
>>> parallelism set to 24 and a ParameterValuesProvider returning
>>> Array(Array("first"), Array("second")), only 1 thread processed all records.
>>> - The second subtask is a Sorted Group Reduce, and I see two weird
>>> things:
>>>   + The first subtask sent 5,923,802 records, yet the second subtask
>>> only received 5,575,154 records?
>>>   + Again, everything was done in a single thread, even though a groupBy
>>> was used.
>>> - The third and final subtask is a sink that saves back to the database.
>>>
>>> Does anyone know why parallelism is not being used?
>>>
>>> Regards,
>>> Alexis.
>>>
>>>
>>> On Thu, Aug 9, 2018 at 11:22 AM Alexis Sarda <al...@gmail.com>
>>> wrote:
>>>
>>>> Hi Fabian,
>>>>
>>>> Thanks a lot for the help. The scala DataSet, at least in version
>>>> 1.5.0, declares javaSet as private[flink], so I cannot access it directly.
>>>> Nevertheless, I managed to get around it by using the java environment:
>>>>
>>>> val env = org.apache.flink.api.java.ExecutionEnvironment.getExecut
>>>> ionEnvironment
>>>>
>>>> val inputFormat = getInputFormat(query, dbUrl, properties)
>>>> val outputFormat = getOutputFormat(dbUrl, properties)
>>>>
>>>> val source = env.createInput(inputFormat)
>>>> val sdp = source.getSplitDataProperties
>>>> sdp.splitsPartitionedBy(0)
>>>> sdp.splitsOrderedBy(Array(1), Array(Order.ASCENDING))
>>>>
>>>> // transform java DataSet to scala DataSet...
>>>> new DataSet(source.asInstanceOf[org.apache.flink.api.java.DataSet[Row]])
>>>>   .groupBy(0, 1)
>>>>   .combineGroup(groupCombiner)
>>>>   .withForwardedFields("f0->_1")
>>>>   .groupBy(0, 1)
>>>>   .reduceGroup(groupReducer)
>>>>   .withForwardedFields("_1")
>>>>   .output(outputFormat)
>>>>
>>>> It seems to work well, and the semantic annotation does remove a hash
>>>> partition from the execution plan.
>>>>
>>>> Regards,
>>>> Alexis.
>>>>
>>>>
>>>> On Thu, Aug 9, 2018 at 10:27 AM Fabian Hueske <fh...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Alexis,
>>>>>
>>>>> The Scala API does not expose a DataSource object but only a Scala
>>>>> DataSet which wraps the Java object.
>>>>> You can get the SplitDataProperties from the Scala DataSet as follows:
>>>>>
>>>>> val dbData: DataSet[...] = ???
>>>>> val sdp = dbData.javaSet.asInstanceOf[DataSource].
>>>>> getSplitDataProperties
>>>>>
>>>>> So you first have to get the wrapped Java DataSet, cast it to
>>>>> DataSource and then get the properties.
>>>>> It's not very nice, but should work.
>>>>>
>>>>> In order to use SDPs, you should be a bit familiar how physical data
>>>>> properties are propagated and discarded in the optimizer.
>>>>> For example, applying a simple MapFunction removes all properties
>>>>> because the function might have changed the fields on which a DataSet is
>>>>> partitioned or sorted.
>>>>> You can expose the behavior of a function to the optimizer by using
>>>>> Semantic Annotations [1]
>>>>>
>>>>> Some comments on the code and plan you shared:
>>>>> - You might want to add hostname to ORDER BY to have the output
>>>>> grouped by (ts, hostname).
>>>>> - Check the Global and Local data properties in the plan to validate
>>>>> that the SDP were correctly interpreted.
>>>>> - If the data is already correctly partitioned and sorted, you might
>>>>> not need the Combiners. In either case, you properly want to annotate them
>>>>> with Forward Field annoations.
>>>>>
>>>>> The number of source tasks is unrelated to the number of splits. If
>>>>> you have more tasks than splits, some tasks won't process any data.
>>>>>
>>>>> Best, Fabian
>>>>>
>>>>> [1] https://ci.apache.org/projects/flink/flink-docs-
>>>>> release-1.5/dev/batch/#semantic-annotations
>>>>>
>>>>>
>>>>> 2018-08-08 14:10 GMT+02:00 Alexis Sarda <al...@gmail.com>:
>>>>>
>>>>>> Hi Fabian,
>>>>>>
>>>>>> Thanks for the clarification. I have a few remarks, but let me
>>>>>> provide more concrete information. You can find the query I'm using, the
>>>>>> JDBCInputFormat creation, and the execution plan in this github gist:
>>>>>>
>>>>>> https://gist.github.com/asardaes/8331a117210d4e08139c66c86e8c952d
>>>>>>
>>>>>> I cannot call getSplitDataProperties because
>>>>>> env.createInput(inputFormat) returns a DataSet, not a DataSource. In the
>>>>>> code, I do this instead:
>>>>>>
>>>>>> val javaEnv = org.apache.flink.api.java.ExecutionEnvironment.
>>>>>> getExecutionEnvironment
>>>>>> val dataSource = new DataSource(javaEnv, inputFormat, rowTypeInfo,
>>>>>> "example")
>>>>>>
>>>>>> which feels wrong (the constructor doesn't accept a Scala
>>>>>> environment). Is there a better alternative?
>>>>>>
>>>>>> I see absolutely no difference in the execution plan whether I use
>>>>>> SDP or not, so therefore the results are indeed the same. Is this expected?
>>>>>>
>>>>>> My ParameterValuesProvider specifies 2 splits, yet the execution plan
>>>>>> shows Parallelism=24. Even the source code is a bit ambiguous, considering
>>>>>> that the constructor for GenericInputSplit takes two parameters:
>>>>>> partitionNumber and totalNumberOfPartitions. Should I assume that there are
>>>>>> 2 splits divided into 24 partitions?
>>>>>>
>>>>>> Regards,
>>>>>> Alexis.
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Wed, Aug 8, 2018 at 11:57 AM Fabian Hueske <fh...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Alexis,
>>>>>>>
>>>>>>> First of all, I think you leverage the partitioning and sorting
>>>>>>> properties of the data returned by the database using SplitDataProperties.
>>>>>>> However, please be aware that SplitDataProperties are a rather
>>>>>>> experimental feature.
>>>>>>>
>>>>>>> If used without query parameters, the JDBCInputFormat generates a
>>>>>>> single split and queries the database just once. If you want to leverage
>>>>>>> parallelism, you have to specify a query with parameters in the WHERE
>>>>>>> clause to read different parts of the table.
>>>>>>> Note, depending on the configuration of the database, multiple
>>>>>>> queries result in multiple full scans. Hence, it might make sense to have
>>>>>>> an index on the partitioning columns.
>>>>>>>
>>>>>>> If properly configured, the JDBCInputFormat generates multiple
>>>>>>> splits which are partitioned. Since the partitioning is encoded in the
>>>>>>> query, it is opaque to Flink and must be explicitly declared.
>>>>>>> This can be done with SDPs. The SDP.splitsPartitionedBy() method
>>>>>>> tells Flink that all records with the same value in the partitioning field
>>>>>>> are read from the same split, i.e, the full data is partitioned on the
>>>>>>> attribute across splits.
>>>>>>> The same can be done for ordering if the queries of the
>>>>>>> JDBCInputFormat is specified with an ORDER BY clause.
>>>>>>> Partitioning and grouping are two different things. You can define a
>>>>>>> query that partitions on hostname and orders by hostname and timestamp and
>>>>>>> declare these properties in the SDP.
>>>>>>>
>>>>>>> You can get a SDP object by calling DataSource.getSplitDataProperties().
>>>>>>> In your example this would be source.getSplitDataProperties().
>>>>>>>
>>>>>>> Whatever you do, you should carefully check the execution plan
>>>>>>> (ExecutionEnvironment.getExecutionPlan()) using the plan visualizer
>>>>>>> [1] and validate that the result are identical whether you use SDP or not.
>>>>>>>
>>>>>>> Best, Fabian
>>>>>>>
>>>>>>> [1] https://flink.apache.org/visualizer/
>>>>>>>
>>>>>>> 2018-08-07 22:32 GMT+02:00 Alexis Sarda <al...@gmail.com>:
>>>>>>>
>>>>>>>> Hi everyone,
>>>>>>>>
>>>>>>>> I have the following scenario: I have a database table with 3
>>>>>>>> columns: a host (string), a timestamp, and an integer ID. Conceptually,
>>>>>>>> what I'd like to do is:
>>>>>>>>
>>>>>>>> group by host and timestamp -> based on all the IDs in each group,
>>>>>>>> create a mapping to n new tuples -> for each unique tuple, count how many
>>>>>>>> times it appeared across the resulting data
>>>>>>>>
>>>>>>>> Each new tuple has 3 fields: the host, a new ID, and an Integer=1
>>>>>>>>
>>>>>>>> What I'm currently doing is roughly:
>>>>>>>>
>>>>>>>> val input = JDBCInputFormat.buildJDBCInputFormat()...finish()
>>>>>>>> val source = environment.createInput(inut)
>>>>>>>> source.partitionByHash("host", "timestamp").mapPartition(...).groupBy(0,
>>>>>>>> 1).aggregate(SUM, 2)
>>>>>>>>
>>>>>>>> The query given to JDBCInputFormat provides results ordered by host
>>>>>>>> and timestamp, and I was wondering if performance can be improved by
>>>>>>>> specifying this in the code. I've looked at
>>>>>>>> http://apache-flink-user-mailing-list-archive.2336050.
>>>>>>>> n4.nabble.com/Terminology-Split-Group-and-Partition-td11030.html
>>>>>>>> and http://apache-flink-user-mailing-list-archive.2336050.
>>>>>>>> n4.nabble.com/Fwd-Processing-Sorted-Input-Datasets-td20038.html,
>>>>>>>> but I still have some questions:
>>>>>>>>
>>>>>>>> - If a split is a subset of a partition, what is the meaning of
>>>>>>>> SplitDataProperties#splitsPartitionedBy? The wording makes me
>>>>>>>> thing that a split is divided into partitions, meaning that a partition
>>>>>>>> would be a subset of a split.
>>>>>>>> - At which point can I retrieve and adjust a SplitDataProperties
>>>>>>>> instance, if possible at all?
>>>>>>>> - If I wanted a coarser parallelization where each slot gets all
>>>>>>>> the data for the same host, would I have to manually create the sub-groups
>>>>>>>> based on timestamp?
>>>>>>>>
>>>>>>>> Regards,
>>>>>>>> Alexis.
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>
>>

Re: JDBCInputFormat and SplitDataProperties

Posted by Alexis Sarda <al...@gmail.com>.
It ended up being a wrong configuration of the cluster; there was only 1
task manager with 1 slot.

If I submit a job with "flink run -p 24 ...", will the job hang until at
least 24 slots are available?

Regards,
Alexis.

On Fri, 10 Aug 2018, 14:01 Fabian Hueske <fh...@gmail.com> wrote:

> Can you share the plan for the program?
>
> Are you sure that more than 1 split is generated by the JdbcInputFormat?
>
> 2018-08-10 12:04 GMT+02:00 Alexis Sarda <al...@gmail.com>:
>
>> It seems I may have spoken too soon. After executing the job with more
>> data, I can see the following things in the Flink dashboard:
>>
>> - The first subtask is a chained DataSource -> GroupCombine. Even with
>> parallelism set to 24 and a ParameterValuesProvider returning
>> Array(Array("first"), Array("second")), only 1 thread processed all records.
>> - The second subtask is a Sorted Group Reduce, and I see two weird things:
>>   + The first subtask sent 5,923,802 records, yet the second subtask only
>> received 5,575,154 records?
>>   + Again, everything was done in a single thread, even though a groupBy
>> was used.
>> - The third and final subtask is a sink that saves back to the database.
>>
>> Does anyone know why parallelism is not being used?
>>
>> Regards,
>> Alexis.
>>
>>
>> On Thu, Aug 9, 2018 at 11:22 AM Alexis Sarda <al...@gmail.com>
>> wrote:
>>
>>> Hi Fabian,
>>>
>>> Thanks a lot for the help. The scala DataSet, at least in version 1.5.0,
>>> declares javaSet as private[flink], so I cannot access it directly.
>>> Nevertheless, I managed to get around it by using the java environment:
>>>
>>> val env = org.apache.flink.api.java.ExecutionEnvironment.
>>> getExecutionEnvironment
>>>
>>> val inputFormat = getInputFormat(query, dbUrl, properties)
>>> val outputFormat = getOutputFormat(dbUrl, properties)
>>>
>>> val source = env.createInput(inputFormat)
>>> val sdp = source.getSplitDataProperties
>>> sdp.splitsPartitionedBy(0)
>>> sdp.splitsOrderedBy(Array(1), Array(Order.ASCENDING))
>>>
>>> // transform java DataSet to scala DataSet...
>>> new DataSet(source.asInstanceOf[org.apache.flink.api.java.DataSet[Row]])
>>>   .groupBy(0, 1)
>>>   .combineGroup(groupCombiner)
>>>   .withForwardedFields("f0->_1")
>>>   .groupBy(0, 1)
>>>   .reduceGroup(groupReducer)
>>>   .withForwardedFields("_1")
>>>   .output(outputFormat)
>>>
>>> It seems to work well, and the semantic annotation does remove a hash
>>> partition from the execution plan.
>>>
>>> Regards,
>>> Alexis.
>>>
>>>
>>> On Thu, Aug 9, 2018 at 10:27 AM Fabian Hueske <fh...@gmail.com> wrote:
>>>
>>>> Hi Alexis,
>>>>
>>>> The Scala API does not expose a DataSource object but only a Scala
>>>> DataSet which wraps the Java object.
>>>> You can get the SplitDataProperties from the Scala DataSet as follows:
>>>>
>>>> val dbData: DataSet[...] = ???
>>>> val sdp = dbData.javaSet.asInstanceOf[DataSource].getSplitDataProperties
>>>>
>>>> So you first have to get the wrapped Java DataSet, cast it to
>>>> DataSource and then get the properties.
>>>> It's not very nice, but should work.
>>>>
>>>> In order to use SDPs, you should be a bit familiar how physical data
>>>> properties are propagated and discarded in the optimizer.
>>>> For example, applying a simple MapFunction removes all properties
>>>> because the function might have changed the fields on which a DataSet is
>>>> partitioned or sorted.
>>>> You can expose the behavior of a function to the optimizer by using
>>>> Semantic Annotations [1]
>>>>
>>>> Some comments on the code and plan you shared:
>>>> - You might want to add hostname to ORDER BY to have the output grouped
>>>> by (ts, hostname).
>>>> - Check the Global and Local data properties in the plan to validate
>>>> that the SDP were correctly interpreted.
>>>> - If the data is already correctly partitioned and sorted, you might
>>>> not need the Combiners. In either case, you properly want to annotate them
>>>> with Forward Field annoations.
>>>>
>>>> The number of source tasks is unrelated to the number of splits. If you
>>>> have more tasks than splits, some tasks won't process any data.
>>>>
>>>> Best, Fabian
>>>>
>>>> [1]
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/batch/#semantic-annotations
>>>>
>>>>
>>>> 2018-08-08 14:10 GMT+02:00 Alexis Sarda <al...@gmail.com>:
>>>>
>>>>> Hi Fabian,
>>>>>
>>>>> Thanks for the clarification. I have a few remarks, but let me provide
>>>>> more concrete information. You can find the query I'm using, the
>>>>> JDBCInputFormat creation, and the execution plan in this github gist:
>>>>>
>>>>> https://gist.github.com/asardaes/8331a117210d4e08139c66c86e8c952d
>>>>>
>>>>> I cannot call getSplitDataProperties because
>>>>> env.createInput(inputFormat) returns a DataSet, not a DataSource. In the
>>>>> code, I do this instead:
>>>>>
>>>>> val javaEnv =
>>>>> org.apache.flink.api.java.ExecutionEnvironment.getExecutionEnvironment
>>>>> val dataSource = new DataSource(javaEnv, inputFormat, rowTypeInfo,
>>>>> "example")
>>>>>
>>>>> which feels wrong (the constructor doesn't accept a Scala
>>>>> environment). Is there a better alternative?
>>>>>
>>>>> I see absolutely no difference in the execution plan whether I use SDP
>>>>> or not, so therefore the results are indeed the same. Is this expected?
>>>>>
>>>>> My ParameterValuesProvider specifies 2 splits, yet the execution plan
>>>>> shows Parallelism=24. Even the source code is a bit ambiguous, considering
>>>>> that the constructor for GenericInputSplit takes two parameters:
>>>>> partitionNumber and totalNumberOfPartitions. Should I assume that there are
>>>>> 2 splits divided into 24 partitions?
>>>>>
>>>>> Regards,
>>>>> Alexis.
>>>>>
>>>>>
>>>>>
>>>>> On Wed, Aug 8, 2018 at 11:57 AM Fabian Hueske <fh...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Alexis,
>>>>>>
>>>>>> First of all, I think you leverage the partitioning and sorting
>>>>>> properties of the data returned by the database using SplitDataProperties.
>>>>>> However, please be aware that SplitDataProperties are a rather
>>>>>> experimental feature.
>>>>>>
>>>>>> If used without query parameters, the JDBCInputFormat generates a
>>>>>> single split and queries the database just once. If you want to leverage
>>>>>> parallelism, you have to specify a query with parameters in the WHERE
>>>>>> clause to read different parts of the table.
>>>>>> Note, depending on the configuration of the database, multiple
>>>>>> queries result in multiple full scans. Hence, it might make sense to have
>>>>>> an index on the partitioning columns.
>>>>>>
>>>>>> If properly configured, the JDBCInputFormat generates multiple splits
>>>>>> which are partitioned. Since the partitioning is encoded in the query, it
>>>>>> is opaque to Flink and must be explicitly declared.
>>>>>> This can be done with SDPs. The SDP.splitsPartitionedBy() method
>>>>>> tells Flink that all records with the same value in the partitioning field
>>>>>> are read from the same split, i.e, the full data is partitioned on the
>>>>>> attribute across splits.
>>>>>> The same can be done for ordering if the queries of the
>>>>>> JDBCInputFormat is specified with an ORDER BY clause.
>>>>>> Partitioning and grouping are two different things. You can define a
>>>>>> query that partitions on hostname and orders by hostname and timestamp and
>>>>>> declare these properties in the SDP.
>>>>>>
>>>>>> You can get a SDP object by calling
>>>>>> DataSource.getSplitDataProperties(). In your example this would be
>>>>>> source.getSplitDataProperties().
>>>>>>
>>>>>> Whatever you do, you should carefully check the execution plan
>>>>>> (ExecutionEnvironment.getExecutionPlan()) using the plan visualizer [1] and
>>>>>> validate that the result are identical whether you use SDP or not.
>>>>>>
>>>>>> Best, Fabian
>>>>>>
>>>>>> [1] https://flink.apache.org/visualizer/
>>>>>>
>>>>>> 2018-08-07 22:32 GMT+02:00 Alexis Sarda <al...@gmail.com>:
>>>>>>
>>>>>>> Hi everyone,
>>>>>>>
>>>>>>> I have the following scenario: I have a database table with 3
>>>>>>> columns: a host (string), a timestamp, and an integer ID. Conceptually,
>>>>>>> what I'd like to do is:
>>>>>>>
>>>>>>> group by host and timestamp -> based on all the IDs in each group,
>>>>>>> create a mapping to n new tuples -> for each unique tuple, count how many
>>>>>>> times it appeared across the resulting data
>>>>>>>
>>>>>>> Each new tuple has 3 fields: the host, a new ID, and an Integer=1
>>>>>>>
>>>>>>> What I'm currently doing is roughly:
>>>>>>>
>>>>>>> val input = JDBCInputFormat.buildJDBCInputFormat()...finish()
>>>>>>> val source = environment.createInput(inut)
>>>>>>> source.partitionByHash("host",
>>>>>>> "timestamp").mapPartition(...).groupBy(0, 1).aggregate(SUM, 2)
>>>>>>>
>>>>>>> The query given to JDBCInputFormat provides results ordered by host
>>>>>>> and timestamp, and I was wondering if performance can be improved by
>>>>>>> specifying this in the code. I've looked at
>>>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Terminology-Split-Group-and-Partition-td11030.html
>>>>>>> and
>>>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fwd-Processing-Sorted-Input-Datasets-td20038.html,
>>>>>>> but I still have some questions:
>>>>>>>
>>>>>>> - If a split is a subset of a partition, what is the meaning of
>>>>>>> SplitDataProperties#splitsPartitionedBy? The wording makes me thing that a
>>>>>>> split is divided into partitions, meaning that a partition would be a
>>>>>>> subset of a split.
>>>>>>> - At which point can I retrieve and adjust a SplitDataProperties
>>>>>>> instance, if possible at all?
>>>>>>> - If I wanted a coarser parallelization where each slot gets all the
>>>>>>> data for the same host, would I have to manually create the sub-groups
>>>>>>> based on timestamp?
>>>>>>>
>>>>>>> Regards,
>>>>>>> Alexis.
>>>>>>>
>>>>>>>
>>>>>>
>>>>
>

Re: JDBCInputFormat and SplitDataProperties

Posted by Fabian Hueske <fh...@gmail.com>.
Can you share the plan for the program?

Are you sure that more than 1 split is generated by the JdbcInputFormat?

2018-08-10 12:04 GMT+02:00 Alexis Sarda <al...@gmail.com>:

> It seems I may have spoken too soon. After executing the job with more
> data, I can see the following things in the Flink dashboard:
>
> - The first subtask is a chained DataSource -> GroupCombine. Even with
> parallelism set to 24 and a ParameterValuesProvider returning
> Array(Array("first"), Array("second")), only 1 thread processed all records.
> - The second subtask is a Sorted Group Reduce, and I see two weird things:
>   + The first subtask sent 5,923,802 records, yet the second subtask only
> received 5,575,154 records?
>   + Again, everything was done in a single thread, even though a groupBy
> was used.
> - The third and final subtask is a sink that saves back to the database.
>
> Does anyone know why parallelism is not being used?
>
> Regards,
> Alexis.
>
>
> On Thu, Aug 9, 2018 at 11:22 AM Alexis Sarda <al...@gmail.com>
> wrote:
>
>> Hi Fabian,
>>
>> Thanks a lot for the help. The scala DataSet, at least in version 1.5.0,
>> declares javaSet as private[flink], so I cannot access it directly.
>> Nevertheless, I managed to get around it by using the java environment:
>>
>> val env = org.apache.flink.api.java.ExecutionEnvironment.getExecut
>> ionEnvironment
>>
>> val inputFormat = getInputFormat(query, dbUrl, properties)
>> val outputFormat = getOutputFormat(dbUrl, properties)
>>
>> val source = env.createInput(inputFormat)
>> val sdp = source.getSplitDataProperties
>> sdp.splitsPartitionedBy(0)
>> sdp.splitsOrderedBy(Array(1), Array(Order.ASCENDING))
>>
>> // transform java DataSet to scala DataSet...
>> new DataSet(source.asInstanceOf[org.apache.flink.api.java.DataSet[Row]])
>>   .groupBy(0, 1)
>>   .combineGroup(groupCombiner)
>>   .withForwardedFields("f0->_1")
>>   .groupBy(0, 1)
>>   .reduceGroup(groupReducer)
>>   .withForwardedFields("_1")
>>   .output(outputFormat)
>>
>> It seems to work well, and the semantic annotation does remove a hash
>> partition from the execution plan.
>>
>> Regards,
>> Alexis.
>>
>>
>> On Thu, Aug 9, 2018 at 10:27 AM Fabian Hueske <fh...@gmail.com> wrote:
>>
>>> Hi Alexis,
>>>
>>> The Scala API does not expose a DataSource object but only a Scala
>>> DataSet which wraps the Java object.
>>> You can get the SplitDataProperties from the Scala DataSet as follows:
>>>
>>> val dbData: DataSet[...] = ???
>>> val sdp = dbData.javaSet.asInstanceOf[DataSource].getSplitDataProperties
>>>
>>> So you first have to get the wrapped Java DataSet, cast it to DataSource
>>> and then get the properties.
>>> It's not very nice, but should work.
>>>
>>> In order to use SDPs, you should be a bit familiar how physical data
>>> properties are propagated and discarded in the optimizer.
>>> For example, applying a simple MapFunction removes all properties
>>> because the function might have changed the fields on which a DataSet is
>>> partitioned or sorted.
>>> You can expose the behavior of a function to the optimizer by using
>>> Semantic Annotations [1]
>>>
>>> Some comments on the code and plan you shared:
>>> - You might want to add hostname to ORDER BY to have the output grouped
>>> by (ts, hostname).
>>> - Check the Global and Local data properties in the plan to validate
>>> that the SDP were correctly interpreted.
>>> - If the data is already correctly partitioned and sorted, you might not
>>> need the Combiners. In either case, you properly want to annotate them with
>>> Forward Field annoations.
>>>
>>> The number of source tasks is unrelated to the number of splits. If you
>>> have more tasks than splits, some tasks won't process any data.
>>>
>>> Best, Fabian
>>>
>>> [1] https://ci.apache.org/projects/flink/flink-docs-
>>> release-1.5/dev/batch/#semantic-annotations
>>>
>>>
>>> 2018-08-08 14:10 GMT+02:00 Alexis Sarda <al...@gmail.com>:
>>>
>>>> Hi Fabian,
>>>>
>>>> Thanks for the clarification. I have a few remarks, but let me provide
>>>> more concrete information. You can find the query I'm using, the
>>>> JDBCInputFormat creation, and the execution plan in this github gist:
>>>>
>>>> https://gist.github.com/asardaes/8331a117210d4e08139c66c86e8c952d
>>>>
>>>> I cannot call getSplitDataProperties because
>>>> env.createInput(inputFormat) returns a DataSet, not a DataSource. In the
>>>> code, I do this instead:
>>>>
>>>> val javaEnv = org.apache.flink.api.java.ExecutionEnvironment.
>>>> getExecutionEnvironment
>>>> val dataSource = new DataSource(javaEnv, inputFormat, rowTypeInfo,
>>>> "example")
>>>>
>>>> which feels wrong (the constructor doesn't accept a Scala environment).
>>>> Is there a better alternative?
>>>>
>>>> I see absolutely no difference in the execution plan whether I use SDP
>>>> or not, so therefore the results are indeed the same. Is this expected?
>>>>
>>>> My ParameterValuesProvider specifies 2 splits, yet the execution plan
>>>> shows Parallelism=24. Even the source code is a bit ambiguous, considering
>>>> that the constructor for GenericInputSplit takes two parameters:
>>>> partitionNumber and totalNumberOfPartitions. Should I assume that there are
>>>> 2 splits divided into 24 partitions?
>>>>
>>>> Regards,
>>>> Alexis.
>>>>
>>>>
>>>>
>>>> On Wed, Aug 8, 2018 at 11:57 AM Fabian Hueske <fh...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Alexis,
>>>>>
>>>>> First of all, I think you leverage the partitioning and sorting
>>>>> properties of the data returned by the database using SplitDataProperties.
>>>>> However, please be aware that SplitDataProperties are a rather
>>>>> experimental feature.
>>>>>
>>>>> If used without query parameters, the JDBCInputFormat generates a
>>>>> single split and queries the database just once. If you want to leverage
>>>>> parallelism, you have to specify a query with parameters in the WHERE
>>>>> clause to read different parts of the table.
>>>>> Note, depending on the configuration of the database, multiple queries
>>>>> result in multiple full scans. Hence, it might make sense to have an index
>>>>> on the partitioning columns.
>>>>>
>>>>> If properly configured, the JDBCInputFormat generates multiple splits
>>>>> which are partitioned. Since the partitioning is encoded in the query, it
>>>>> is opaque to Flink and must be explicitly declared.
>>>>> This can be done with SDPs. The SDP.splitsPartitionedBy() method tells
>>>>> Flink that all records with the same value in the partitioning field are
>>>>> read from the same split, i.e, the full data is partitioned on the
>>>>> attribute across splits.
>>>>> The same can be done for ordering if the queries of the
>>>>> JDBCInputFormat is specified with an ORDER BY clause.
>>>>> Partitioning and grouping are two different things. You can define a
>>>>> query that partitions on hostname and orders by hostname and timestamp and
>>>>> declare these properties in the SDP.
>>>>>
>>>>> You can get a SDP object by calling DataSource.getSplitDataProperties().
>>>>> In your example this would be source.getSplitDataProperties().
>>>>>
>>>>> Whatever you do, you should carefully check the execution plan
>>>>> (ExecutionEnvironment.getExecutionPlan()) using the plan visualizer
>>>>> [1] and validate that the result are identical whether you use SDP or not.
>>>>>
>>>>> Best, Fabian
>>>>>
>>>>> [1] https://flink.apache.org/visualizer/
>>>>>
>>>>> 2018-08-07 22:32 GMT+02:00 Alexis Sarda <al...@gmail.com>:
>>>>>
>>>>>> Hi everyone,
>>>>>>
>>>>>> I have the following scenario: I have a database table with 3
>>>>>> columns: a host (string), a timestamp, and an integer ID. Conceptually,
>>>>>> what I'd like to do is:
>>>>>>
>>>>>> group by host and timestamp -> based on all the IDs in each group,
>>>>>> create a mapping to n new tuples -> for each unique tuple, count how many
>>>>>> times it appeared across the resulting data
>>>>>>
>>>>>> Each new tuple has 3 fields: the host, a new ID, and an Integer=1
>>>>>>
>>>>>> What I'm currently doing is roughly:
>>>>>>
>>>>>> val input = JDBCInputFormat.buildJDBCInputFormat()...finish()
>>>>>> val source = environment.createInput(inut)
>>>>>> source.partitionByHash("host", "timestamp").mapPartition(...).groupBy(0,
>>>>>> 1).aggregate(SUM, 2)
>>>>>>
>>>>>> The query given to JDBCInputFormat provides results ordered by host
>>>>>> and timestamp, and I was wondering if performance can be improved by
>>>>>> specifying this in the code. I've looked at http://apache-flink-user-
>>>>>> mailing-list-archive.2336050.n4.nabble.com/Terminology-
>>>>>> Split-Group-and-Partition-td11030.html and http://apache-flink-user-
>>>>>> mailing-list-archive.2336050.n4.nabble.com/Fwd-Processing-
>>>>>> Sorted-Input-Datasets-td20038.html, but I still have some questions:
>>>>>>
>>>>>> - If a split is a subset of a partition, what is the meaning of
>>>>>> SplitDataProperties#splitsPartitionedBy? The wording makes me thing
>>>>>> that a split is divided into partitions, meaning that a partition would be
>>>>>> a subset of a split.
>>>>>> - At which point can I retrieve and adjust a SplitDataProperties
>>>>>> instance, if possible at all?
>>>>>> - If I wanted a coarser parallelization where each slot gets all the
>>>>>> data for the same host, would I have to manually create the sub-groups
>>>>>> based on timestamp?
>>>>>>
>>>>>> Regards,
>>>>>> Alexis.
>>>>>>
>>>>>>
>>>>>
>>>

Re: JDBCInputFormat and SplitDataProperties

Posted by Alexis Sarda <al...@gmail.com>.
It seems I may have spoken too soon. After executing the job with more
data, I can see the following things in the Flink dashboard:

- The first subtask is a chained DataSource -> GroupCombine. Even with
parallelism set to 24 and a ParameterValuesProvider returning
Array(Array("first"), Array("second")), only 1 thread processed all records.
- The second subtask is a Sorted Group Reduce, and I see two weird things:
  + The first subtask sent 5,923,802 records, yet the second subtask only
received 5,575,154 records?
  + Again, everything was done in a single thread, even though a groupBy
was used.
- The third and final subtask is a sink that saves back to the database.

Does anyone know why parallelism is not being used?

Regards,
Alexis.


On Thu, Aug 9, 2018 at 11:22 AM Alexis Sarda <al...@gmail.com> wrote:

> Hi Fabian,
>
> Thanks a lot for the help. The scala DataSet, at least in version 1.5.0,
> declares javaSet as private[flink], so I cannot access it directly.
> Nevertheless, I managed to get around it by using the java environment:
>
> val env = org.apache.flink.api.java.ExecutionEnvironment.
> getExecutionEnvironment
>
> val inputFormat = getInputFormat(query, dbUrl, properties)
> val outputFormat = getOutputFormat(dbUrl, properties)
>
> val source = env.createInput(inputFormat)
> val sdp = source.getSplitDataProperties
> sdp.splitsPartitionedBy(0)
> sdp.splitsOrderedBy(Array(1), Array(Order.ASCENDING))
>
> // transform java DataSet to scala DataSet...
> new DataSet(source.asInstanceOf[org.apache.flink.api.java.DataSet[Row]])
>   .groupBy(0, 1)
>   .combineGroup(groupCombiner)
>   .withForwardedFields("f0->_1")
>   .groupBy(0, 1)
>   .reduceGroup(groupReducer)
>   .withForwardedFields("_1")
>   .output(outputFormat)
>
> It seems to work well, and the semantic annotation does remove a hash
> partition from the execution plan.
>
> Regards,
> Alexis.
>
>
> On Thu, Aug 9, 2018 at 10:27 AM Fabian Hueske <fh...@gmail.com> wrote:
>
>> Hi Alexis,
>>
>> The Scala API does not expose a DataSource object but only a Scala
>> DataSet which wraps the Java object.
>> You can get the SplitDataProperties from the Scala DataSet as follows:
>>
>> val dbData: DataSet[...] = ???
>> val sdp = dbData.javaSet.asInstanceOf[DataSource].getSplitDataProperties
>>
>> So you first have to get the wrapped Java DataSet, cast it to DataSource
>> and then get the properties.
>> It's not very nice, but should work.
>>
>> In order to use SDPs, you should be a bit familiar how physical data
>> properties are propagated and discarded in the optimizer.
>> For example, applying a simple MapFunction removes all properties because
>> the function might have changed the fields on which a DataSet is
>> partitioned or sorted.
>> You can expose the behavior of a function to the optimizer by using
>> Semantic Annotations [1]
>>
>> Some comments on the code and plan you shared:
>> - You might want to add hostname to ORDER BY to have the output grouped
>> by (ts, hostname).
>> - Check the Global and Local data properties in the plan to validate that
>> the SDP were correctly interpreted.
>> - If the data is already correctly partitioned and sorted, you might not
>> need the Combiners. In either case, you properly want to annotate them with
>> Forward Field annoations.
>>
>> The number of source tasks is unrelated to the number of splits. If you
>> have more tasks than splits, some tasks won't process any data.
>>
>> Best, Fabian
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/batch/#semantic-annotations
>>
>>
>> 2018-08-08 14:10 GMT+02:00 Alexis Sarda <al...@gmail.com>:
>>
>>> Hi Fabian,
>>>
>>> Thanks for the clarification. I have a few remarks, but let me provide
>>> more concrete information. You can find the query I'm using, the
>>> JDBCInputFormat creation, and the execution plan in this github gist:
>>>
>>> https://gist.github.com/asardaes/8331a117210d4e08139c66c86e8c952d
>>>
>>> I cannot call getSplitDataProperties because
>>> env.createInput(inputFormat) returns a DataSet, not a DataSource. In the
>>> code, I do this instead:
>>>
>>> val javaEnv =
>>> org.apache.flink.api.java.ExecutionEnvironment.getExecutionEnvironment
>>> val dataSource = new DataSource(javaEnv, inputFormat, rowTypeInfo,
>>> "example")
>>>
>>> which feels wrong (the constructor doesn't accept a Scala environment).
>>> Is there a better alternative?
>>>
>>> I see absolutely no difference in the execution plan whether I use SDP
>>> or not, so therefore the results are indeed the same. Is this expected?
>>>
>>> My ParameterValuesProvider specifies 2 splits, yet the execution plan
>>> shows Parallelism=24. Even the source code is a bit ambiguous, considering
>>> that the constructor for GenericInputSplit takes two parameters:
>>> partitionNumber and totalNumberOfPartitions. Should I assume that there are
>>> 2 splits divided into 24 partitions?
>>>
>>> Regards,
>>> Alexis.
>>>
>>>
>>>
>>> On Wed, Aug 8, 2018 at 11:57 AM Fabian Hueske <fh...@gmail.com> wrote:
>>>
>>>> Hi Alexis,
>>>>
>>>> First of all, I think you leverage the partitioning and sorting
>>>> properties of the data returned by the database using SplitDataProperties.
>>>> However, please be aware that SplitDataProperties are a rather
>>>> experimental feature.
>>>>
>>>> If used without query parameters, the JDBCInputFormat generates a
>>>> single split and queries the database just once. If you want to leverage
>>>> parallelism, you have to specify a query with parameters in the WHERE
>>>> clause to read different parts of the table.
>>>> Note, depending on the configuration of the database, multiple queries
>>>> result in multiple full scans. Hence, it might make sense to have an index
>>>> on the partitioning columns.
>>>>
>>>> If properly configured, the JDBCInputFormat generates multiple splits
>>>> which are partitioned. Since the partitioning is encoded in the query, it
>>>> is opaque to Flink and must be explicitly declared.
>>>> This can be done with SDPs. The SDP.splitsPartitionedBy() method tells
>>>> Flink that all records with the same value in the partitioning field are
>>>> read from the same split, i.e, the full data is partitioned on the
>>>> attribute across splits.
>>>> The same can be done for ordering if the queries of the JDBCInputFormat
>>>> is specified with an ORDER BY clause.
>>>> Partitioning and grouping are two different things. You can define a
>>>> query that partitions on hostname and orders by hostname and timestamp and
>>>> declare these properties in the SDP.
>>>>
>>>> You can get a SDP object by calling
>>>> DataSource.getSplitDataProperties(). In your example this would be
>>>> source.getSplitDataProperties().
>>>>
>>>> Whatever you do, you should carefully check the execution plan
>>>> (ExecutionEnvironment.getExecutionPlan()) using the plan visualizer [1] and
>>>> validate that the result are identical whether you use SDP or not.
>>>>
>>>> Best, Fabian
>>>>
>>>> [1] https://flink.apache.org/visualizer/
>>>>
>>>> 2018-08-07 22:32 GMT+02:00 Alexis Sarda <al...@gmail.com>:
>>>>
>>>>> Hi everyone,
>>>>>
>>>>> I have the following scenario: I have a database table with 3 columns:
>>>>> a host (string), a timestamp, and an integer ID. Conceptually, what I'd
>>>>> like to do is:
>>>>>
>>>>> group by host and timestamp -> based on all the IDs in each group,
>>>>> create a mapping to n new tuples -> for each unique tuple, count how many
>>>>> times it appeared across the resulting data
>>>>>
>>>>> Each new tuple has 3 fields: the host, a new ID, and an Integer=1
>>>>>
>>>>> What I'm currently doing is roughly:
>>>>>
>>>>> val input = JDBCInputFormat.buildJDBCInputFormat()...finish()
>>>>> val source = environment.createInput(inut)
>>>>> source.partitionByHash("host",
>>>>> "timestamp").mapPartition(...).groupBy(0, 1).aggregate(SUM, 2)
>>>>>
>>>>> The query given to JDBCInputFormat provides results ordered by host
>>>>> and timestamp, and I was wondering if performance can be improved by
>>>>> specifying this in the code. I've looked at
>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Terminology-Split-Group-and-Partition-td11030.html
>>>>> and
>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fwd-Processing-Sorted-Input-Datasets-td20038.html,
>>>>> but I still have some questions:
>>>>>
>>>>> - If a split is a subset of a partition, what is the meaning of
>>>>> SplitDataProperties#splitsPartitionedBy? The wording makes me thing that a
>>>>> split is divided into partitions, meaning that a partition would be a
>>>>> subset of a split.
>>>>> - At which point can I retrieve and adjust a SplitDataProperties
>>>>> instance, if possible at all?
>>>>> - If I wanted a coarser parallelization where each slot gets all the
>>>>> data for the same host, would I have to manually create the sub-groups
>>>>> based on timestamp?
>>>>>
>>>>> Regards,
>>>>> Alexis.
>>>>>
>>>>>
>>>>
>>

Re: JDBCInputFormat and SplitDataProperties

Posted by Alexis Sarda <al...@gmail.com>.
Hi Fabian,

Thanks a lot for the help. The scala DataSet, at least in version 1.5.0,
declares javaSet as private[flink], so I cannot access it directly.
Nevertheless, I managed to get around it by using the java environment:

val env = org.apache.flink.api.java.ExecutionEnvironment.
getExecutionEnvironment

val inputFormat = getInputFormat(query, dbUrl, properties)
val outputFormat = getOutputFormat(dbUrl, properties)

val source = env.createInput(inputFormat)
val sdp = source.getSplitDataProperties
sdp.splitsPartitionedBy(0)
sdp.splitsOrderedBy(Array(1), Array(Order.ASCENDING))

// transform java DataSet to scala DataSet...
new DataSet(source.asInstanceOf[org.apache.flink.api.java.DataSet[Row]])
  .groupBy(0, 1)
  .combineGroup(groupCombiner)
  .withForwardedFields("f0->_1")
  .groupBy(0, 1)
  .reduceGroup(groupReducer)
  .withForwardedFields("_1")
  .output(outputFormat)

It seems to work well, and the semantic annotation does remove a hash
partition from the execution plan.

Regards,
Alexis.


On Thu, Aug 9, 2018 at 10:27 AM Fabian Hueske <fh...@gmail.com> wrote:

> Hi Alexis,
>
> The Scala API does not expose a DataSource object but only a Scala DataSet
> which wraps the Java object.
> You can get the SplitDataProperties from the Scala DataSet as follows:
>
> val dbData: DataSet[...] = ???
> val sdp = dbData.javaSet.asInstanceOf[DataSource].getSplitDataProperties
>
> So you first have to get the wrapped Java DataSet, cast it to DataSource
> and then get the properties.
> It's not very nice, but should work.
>
> In order to use SDPs, you should be a bit familiar how physical data
> properties are propagated and discarded in the optimizer.
> For example, applying a simple MapFunction removes all properties because
> the function might have changed the fields on which a DataSet is
> partitioned or sorted.
> You can expose the behavior of a function to the optimizer by using
> Semantic Annotations [1]
>
> Some comments on the code and plan you shared:
> - You might want to add hostname to ORDER BY to have the output grouped by
> (ts, hostname).
> - Check the Global and Local data properties in the plan to validate that
> the SDP were correctly interpreted.
> - If the data is already correctly partitioned and sorted, you might not
> need the Combiners. In either case, you properly want to annotate them with
> Forward Field annoations.
>
> The number of source tasks is unrelated to the number of splits. If you
> have more tasks than splits, some tasks won't process any data.
>
> Best, Fabian
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/batch/#semantic-annotations
>
>
> 2018-08-08 14:10 GMT+02:00 Alexis Sarda <al...@gmail.com>:
>
>> Hi Fabian,
>>
>> Thanks for the clarification. I have a few remarks, but let me provide
>> more concrete information. You can find the query I'm using, the
>> JDBCInputFormat creation, and the execution plan in this github gist:
>>
>> https://gist.github.com/asardaes/8331a117210d4e08139c66c86e8c952d
>>
>> I cannot call getSplitDataProperties because env.createInput(inputFormat)
>> returns a DataSet, not a DataSource. In the code, I do this instead:
>>
>> val javaEnv =
>> org.apache.flink.api.java.ExecutionEnvironment.getExecutionEnvironment
>> val dataSource = new DataSource(javaEnv, inputFormat, rowTypeInfo,
>> "example")
>>
>> which feels wrong (the constructor doesn't accept a Scala environment).
>> Is there a better alternative?
>>
>> I see absolutely no difference in the execution plan whether I use SDP or
>> not, so therefore the results are indeed the same. Is this expected?
>>
>> My ParameterValuesProvider specifies 2 splits, yet the execution plan
>> shows Parallelism=24. Even the source code is a bit ambiguous, considering
>> that the constructor for GenericInputSplit takes two parameters:
>> partitionNumber and totalNumberOfPartitions. Should I assume that there are
>> 2 splits divided into 24 partitions?
>>
>> Regards,
>> Alexis.
>>
>>
>>
>> On Wed, Aug 8, 2018 at 11:57 AM Fabian Hueske <fh...@gmail.com> wrote:
>>
>>> Hi Alexis,
>>>
>>> First of all, I think you leverage the partitioning and sorting
>>> properties of the data returned by the database using SplitDataProperties.
>>> However, please be aware that SplitDataProperties are a rather
>>> experimental feature.
>>>
>>> If used without query parameters, the JDBCInputFormat generates a single
>>> split and queries the database just once. If you want to leverage
>>> parallelism, you have to specify a query with parameters in the WHERE
>>> clause to read different parts of the table.
>>> Note, depending on the configuration of the database, multiple queries
>>> result in multiple full scans. Hence, it might make sense to have an index
>>> on the partitioning columns.
>>>
>>> If properly configured, the JDBCInputFormat generates multiple splits
>>> which are partitioned. Since the partitioning is encoded in the query, it
>>> is opaque to Flink and must be explicitly declared.
>>> This can be done with SDPs. The SDP.splitsPartitionedBy() method tells
>>> Flink that all records with the same value in the partitioning field are
>>> read from the same split, i.e, the full data is partitioned on the
>>> attribute across splits.
>>> The same can be done for ordering if the queries of the JDBCInputFormat
>>> is specified with an ORDER BY clause.
>>> Partitioning and grouping are two different things. You can define a
>>> query that partitions on hostname and orders by hostname and timestamp and
>>> declare these properties in the SDP.
>>>
>>> You can get a SDP object by calling DataSource.getSplitDataProperties().
>>> In your example this would be source.getSplitDataProperties().
>>>
>>> Whatever you do, you should carefully check the execution plan
>>> (ExecutionEnvironment.getExecutionPlan()) using the plan visualizer [1] and
>>> validate that the result are identical whether you use SDP or not.
>>>
>>> Best, Fabian
>>>
>>> [1] https://flink.apache.org/visualizer/
>>>
>>> 2018-08-07 22:32 GMT+02:00 Alexis Sarda <al...@gmail.com>:
>>>
>>>> Hi everyone,
>>>>
>>>> I have the following scenario: I have a database table with 3 columns:
>>>> a host (string), a timestamp, and an integer ID. Conceptually, what I'd
>>>> like to do is:
>>>>
>>>> group by host and timestamp -> based on all the IDs in each group,
>>>> create a mapping to n new tuples -> for each unique tuple, count how many
>>>> times it appeared across the resulting data
>>>>
>>>> Each new tuple has 3 fields: the host, a new ID, and an Integer=1
>>>>
>>>> What I'm currently doing is roughly:
>>>>
>>>> val input = JDBCInputFormat.buildJDBCInputFormat()...finish()
>>>> val source = environment.createInput(inut)
>>>> source.partitionByHash("host",
>>>> "timestamp").mapPartition(...).groupBy(0, 1).aggregate(SUM, 2)
>>>>
>>>> The query given to JDBCInputFormat provides results ordered by host and
>>>> timestamp, and I was wondering if performance can be improved by specifying
>>>> this in the code. I've looked at
>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Terminology-Split-Group-and-Partition-td11030.html
>>>> and
>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fwd-Processing-Sorted-Input-Datasets-td20038.html,
>>>> but I still have some questions:
>>>>
>>>> - If a split is a subset of a partition, what is the meaning of
>>>> SplitDataProperties#splitsPartitionedBy? The wording makes me thing that a
>>>> split is divided into partitions, meaning that a partition would be a
>>>> subset of a split.
>>>> - At which point can I retrieve and adjust a SplitDataProperties
>>>> instance, if possible at all?
>>>> - If I wanted a coarser parallelization where each slot gets all the
>>>> data for the same host, would I have to manually create the sub-groups
>>>> based on timestamp?
>>>>
>>>> Regards,
>>>> Alexis.
>>>>
>>>>
>>>
>

Re: JDBCInputFormat and SplitDataProperties

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Alexis,

The Scala API does not expose a DataSource object but only a Scala DataSet
which wraps the Java object.
You can get the SplitDataProperties from the Scala DataSet as follows:

val dbData: DataSet[...] = ???
val sdp = dbData.javaSet.asInstanceOf[DataSource].getSplitDataProperties

So you first have to get the wrapped Java DataSet, cast it to DataSource
and then get the properties.
It's not very nice, but should work.

In order to use SDPs, you should be a bit familiar how physical data
properties are propagated and discarded in the optimizer.
For example, applying a simple MapFunction removes all properties because
the function might have changed the fields on which a DataSet is
partitioned or sorted.
You can expose the behavior of a function to the optimizer by using
Semantic Annotations [1]

Some comments on the code and plan you shared:
- You might want to add hostname to ORDER BY to have the output grouped by
(ts, hostname).
- Check the Global and Local data properties in the plan to validate that
the SDP were correctly interpreted.
- If the data is already correctly partitioned and sorted, you might not
need the Combiners. In either case, you properly want to annotate them with
Forward Field annoations.

The number of source tasks is unrelated to the number of splits. If you
have more tasks than splits, some tasks won't process any data.

Best, Fabian

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/batch/#
semantic-annotations


2018-08-08 14:10 GMT+02:00 Alexis Sarda <al...@gmail.com>:

> Hi Fabian,
>
> Thanks for the clarification. I have a few remarks, but let me provide
> more concrete information. You can find the query I'm using, the
> JDBCInputFormat creation, and the execution plan in this github gist:
>
> https://gist.github.com/asardaes/8331a117210d4e08139c66c86e8c952d
>
> I cannot call getSplitDataProperties because env.createInput(inputFormat)
> returns a DataSet, not a DataSource. In the code, I do this instead:
>
> val javaEnv = org.apache.flink.api.java.ExecutionEnvironment.getExecutionE
> nvironment
> val dataSource = new DataSource(javaEnv, inputFormat, rowTypeInfo,
> "example")
>
> which feels wrong (the constructor doesn't accept a Scala environment). Is
> there a better alternative?
>
> I see absolutely no difference in the execution plan whether I use SDP or
> not, so therefore the results are indeed the same. Is this expected?
>
> My ParameterValuesProvider specifies 2 splits, yet the execution plan
> shows Parallelism=24. Even the source code is a bit ambiguous, considering
> that the constructor for GenericInputSplit takes two parameters:
> partitionNumber and totalNumberOfPartitions. Should I assume that there are
> 2 splits divided into 24 partitions?
>
> Regards,
> Alexis.
>
>
>
> On Wed, Aug 8, 2018 at 11:57 AM Fabian Hueske <fh...@gmail.com> wrote:
>
>> Hi Alexis,
>>
>> First of all, I think you leverage the partitioning and sorting
>> properties of the data returned by the database using SplitDataProperties.
>> However, please be aware that SplitDataProperties are a rather
>> experimental feature.
>>
>> If used without query parameters, the JDBCInputFormat generates a single
>> split and queries the database just once. If you want to leverage
>> parallelism, you have to specify a query with parameters in the WHERE
>> clause to read different parts of the table.
>> Note, depending on the configuration of the database, multiple queries
>> result in multiple full scans. Hence, it might make sense to have an index
>> on the partitioning columns.
>>
>> If properly configured, the JDBCInputFormat generates multiple splits
>> which are partitioned. Since the partitioning is encoded in the query, it
>> is opaque to Flink and must be explicitly declared.
>> This can be done with SDPs. The SDP.splitsPartitionedBy() method tells
>> Flink that all records with the same value in the partitioning field are
>> read from the same split, i.e, the full data is partitioned on the
>> attribute across splits.
>> The same can be done for ordering if the queries of the JDBCInputFormat
>> is specified with an ORDER BY clause.
>> Partitioning and grouping are two different things. You can define a
>> query that partitions on hostname and orders by hostname and timestamp and
>> declare these properties in the SDP.
>>
>> You can get a SDP object by calling DataSource.getSplitDataProperties().
>> In your example this would be source.getSplitDataProperties().
>>
>> Whatever you do, you should carefully check the execution plan
>> (ExecutionEnvironment.getExecutionPlan()) using the plan visualizer [1]
>> and validate that the result are identical whether you use SDP or not.
>>
>> Best, Fabian
>>
>> [1] https://flink.apache.org/visualizer/
>>
>> 2018-08-07 22:32 GMT+02:00 Alexis Sarda <al...@gmail.com>:
>>
>>> Hi everyone,
>>>
>>> I have the following scenario: I have a database table with 3 columns: a
>>> host (string), a timestamp, and an integer ID. Conceptually, what I'd like
>>> to do is:
>>>
>>> group by host and timestamp -> based on all the IDs in each group,
>>> create a mapping to n new tuples -> for each unique tuple, count how many
>>> times it appeared across the resulting data
>>>
>>> Each new tuple has 3 fields: the host, a new ID, and an Integer=1
>>>
>>> What I'm currently doing is roughly:
>>>
>>> val input = JDBCInputFormat.buildJDBCInputFormat()...finish()
>>> val source = environment.createInput(inut)
>>> source.partitionByHash("host", "timestamp").mapPartition(...).groupBy(0,
>>> 1).aggregate(SUM, 2)
>>>
>>> The query given to JDBCInputFormat provides results ordered by host and
>>> timestamp, and I was wondering if performance can be improved by specifying
>>> this in the code. I've looked at http://apache-flink-user-ma
>>> iling-list-archive.2336050.n4.nabble.com/Terminology-Split-
>>> Group-and-Partition-td11030.html and http://apache-flink-user-m
>>> ailing-list-archive.2336050.n4.nabble.com/Fwd-Processing-Sor
>>> ted-Input-Datasets-td20038.html, but I still have some questions:
>>>
>>> - If a split is a subset of a partition, what is the meaning of
>>> SplitDataProperties#splitsPartitionedBy? The wording makes me thing
>>> that a split is divided into partitions, meaning that a partition would be
>>> a subset of a split.
>>> - At which point can I retrieve and adjust a SplitDataProperties
>>> instance, if possible at all?
>>> - If I wanted a coarser parallelization where each slot gets all the
>>> data for the same host, would I have to manually create the sub-groups
>>> based on timestamp?
>>>
>>> Regards,
>>> Alexis.
>>>
>>>
>>

Re: JDBCInputFormat and SplitDataProperties

Posted by Alexis Sarda <al...@gmail.com>.
Hi Fabian,

Thanks for the clarification. I have a few remarks, but let me provide more
concrete information. You can find the query I'm using, the JDBCInputFormat
creation, and the execution plan in this github gist:

https://gist.github.com/asardaes/8331a117210d4e08139c66c86e8c952d

I cannot call getSplitDataProperties because env.createInput(inputFormat)
returns a DataSet, not a DataSource. In the code, I do this instead:

val javaEnv =
org.apache.flink.api.java.ExecutionEnvironment.getExecutionEnvironment
val dataSource = new DataSource(javaEnv, inputFormat, rowTypeInfo,
"example")

which feels wrong (the constructor doesn't accept a Scala environment). Is
there a better alternative?

I see absolutely no difference in the execution plan whether I use SDP or
not, so therefore the results are indeed the same. Is this expected?

My ParameterValuesProvider specifies 2 splits, yet the execution plan shows
Parallelism=24. Even the source code is a bit ambiguous, considering that
the constructor for GenericInputSplit takes two parameters: partitionNumber
and totalNumberOfPartitions. Should I assume that there are 2 splits
divided into 24 partitions?

Regards,
Alexis.



On Wed, Aug 8, 2018 at 11:57 AM Fabian Hueske <fh...@gmail.com> wrote:

> Hi Alexis,
>
> First of all, I think you leverage the partitioning and sorting properties
> of the data returned by the database using SplitDataProperties.
> However, please be aware that SplitDataProperties are a rather
> experimental feature.
>
> If used without query parameters, the JDBCInputFormat generates a single
> split and queries the database just once. If you want to leverage
> parallelism, you have to specify a query with parameters in the WHERE
> clause to read different parts of the table.
> Note, depending on the configuration of the database, multiple queries
> result in multiple full scans. Hence, it might make sense to have an index
> on the partitioning columns.
>
> If properly configured, the JDBCInputFormat generates multiple splits
> which are partitioned. Since the partitioning is encoded in the query, it
> is opaque to Flink and must be explicitly declared.
> This can be done with SDPs. The SDP.splitsPartitionedBy() method tells
> Flink that all records with the same value in the partitioning field are
> read from the same split, i.e, the full data is partitioned on the
> attribute across splits.
> The same can be done for ordering if the queries of the JDBCInputFormat is
> specified with an ORDER BY clause.
> Partitioning and grouping are two different things. You can define a query
> that partitions on hostname and orders by hostname and timestamp and
> declare these properties in the SDP.
>
> You can get a SDP object by calling DataSource.getSplitDataProperties().
> In your example this would be source.getSplitDataProperties().
>
> Whatever you do, you should carefully check the execution plan
> (ExecutionEnvironment.getExecutionPlan()) using the plan visualizer [1] and
> validate that the result are identical whether you use SDP or not.
>
> Best, Fabian
>
> [1] https://flink.apache.org/visualizer/
>
> 2018-08-07 22:32 GMT+02:00 Alexis Sarda <al...@gmail.com>:
>
>> Hi everyone,
>>
>> I have the following scenario: I have a database table with 3 columns: a
>> host (string), a timestamp, and an integer ID. Conceptually, what I'd like
>> to do is:
>>
>> group by host and timestamp -> based on all the IDs in each group, create
>> a mapping to n new tuples -> for each unique tuple, count how many times it
>> appeared across the resulting data
>>
>> Each new tuple has 3 fields: the host, a new ID, and an Integer=1
>>
>> What I'm currently doing is roughly:
>>
>> val input = JDBCInputFormat.buildJDBCInputFormat()...finish()
>> val source = environment.createInput(inut)
>> source.partitionByHash("host", "timestamp").mapPartition(...).groupBy(0,
>> 1).aggregate(SUM, 2)
>>
>> The query given to JDBCInputFormat provides results ordered by host and
>> timestamp, and I was wondering if performance can be improved by specifying
>> this in the code. I've looked at
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Terminology-Split-Group-and-Partition-td11030.html
>> and
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fwd-Processing-Sorted-Input-Datasets-td20038.html,
>> but I still have some questions:
>>
>> - If a split is a subset of a partition, what is the meaning of
>> SplitDataProperties#splitsPartitionedBy? The wording makes me thing that a
>> split is divided into partitions, meaning that a partition would be a
>> subset of a split.
>> - At which point can I retrieve and adjust a SplitDataProperties
>> instance, if possible at all?
>> - If I wanted a coarser parallelization where each slot gets all the data
>> for the same host, would I have to manually create the sub-groups based on
>> timestamp?
>>
>> Regards,
>> Alexis.
>>
>>
>

Re: JDBCInputFormat and SplitDataProperties

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Alexis,

First of all, I think you leverage the partitioning and sorting properties
of the data returned by the database using SplitDataProperties.
However, please be aware that SplitDataProperties are a rather experimental
feature.

If used without query parameters, the JDBCInputFormat generates a single
split and queries the database just once. If you want to leverage
parallelism, you have to specify a query with parameters in the WHERE
clause to read different parts of the table.
Note, depending on the configuration of the database, multiple queries
result in multiple full scans. Hence, it might make sense to have an index
on the partitioning columns.

If properly configured, the JDBCInputFormat generates multiple splits which
are partitioned. Since the partitioning is encoded in the query, it is
opaque to Flink and must be explicitly declared.
This can be done with SDPs. The SDP.splitsPartitionedBy() method tells
Flink that all records with the same value in the partitioning field are
read from the same split, i.e, the full data is partitioned on the
attribute across splits.
The same can be done for ordering if the queries of the JDBCInputFormat is
specified with an ORDER BY clause.
Partitioning and grouping are two different things. You can define a query
that partitions on hostname and orders by hostname and timestamp and
declare these properties in the SDP.

You can get a SDP object by calling DataSource.getSplitDataProperties(). In
your example this would be source.getSplitDataProperties().

Whatever you do, you should carefully check the execution plan
(ExecutionEnvironment.getExecutionPlan()) using the plan visualizer [1] and
validate that the result are identical whether you use SDP or not.

Best, Fabian

[1] https://flink.apache.org/visualizer/

2018-08-07 22:32 GMT+02:00 Alexis Sarda <al...@gmail.com>:

> Hi everyone,
>
> I have the following scenario: I have a database table with 3 columns: a
> host (string), a timestamp, and an integer ID. Conceptually, what I'd like
> to do is:
>
> group by host and timestamp -> based on all the IDs in each group, create
> a mapping to n new tuples -> for each unique tuple, count how many times it
> appeared across the resulting data
>
> Each new tuple has 3 fields: the host, a new ID, and an Integer=1
>
> What I'm currently doing is roughly:
>
> val input = JDBCInputFormat.buildJDBCInputFormat()...finish()
> val source = environment.createInput(inut)
> source.partitionByHash("host", "timestamp").mapPartition(...).groupBy(0,
> 1).aggregate(SUM, 2)
>
> The query given to JDBCInputFormat provides results ordered by host and
> timestamp, and I was wondering if performance can be improved by specifying
> this in the code. I've looked at http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/Terminology-
> Split-Group-and-Partition-td11030.html and http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/Fwd-Processing-
> Sorted-Input-Datasets-td20038.html, but I still have some questions:
>
> - If a split is a subset of a partition, what is the meaning of
> SplitDataProperties#splitsPartitionedBy? The wording makes me thing that
> a split is divided into partitions, meaning that a partition would be a
> subset of a split.
> - At which point can I retrieve and adjust a SplitDataProperties instance,
> if possible at all?
> - If I wanted a coarser parallelization where each slot gets all the data
> for the same host, would I have to manually create the sub-groups based on
> timestamp?
>
> Regards,
> Alexis.
>
>