You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Yangze Guo <ka...@gmail.com> on 2020/03/26 09:51:00 UTC

[DISCUSS] FLIP-118: Improve Flink’s ID system

Hi everyone,

We would like to start a discussion thread on "FLIP-118: Improve
Flink’s ID system"[1].

This FLIP mainly discusses the following issues, target to enhance the
readability of IDs in log and help user to debug in case of failures:

- Enhance the readability of the string literals of IDs. Most of them
are hashcodes, e.g. ExecutionAttemptID, which do not provide much
meaningful information and are hard to recognize and compare for
users.
- Log the ID’s lineage information to make debugging more convenient.
Currently, the log fails to always show the lineage information
between IDs. Finding out relationships between entities identified by
given IDs is a common demand, e.g., slot of which AllocationID is
assigned to satisfy slot request of with SlotRequestID. Absence of
such lineage information, it’s impossible to track the end to end
lifecycle of an Execution or a Task now, which makes debugging
difficult.

Key changes proposed in the FLIP are as follows:

- Add location information to distributed components
- Add topology information to graph components
- Log the ID’s lineage information
- Expose the identifier of distributing component to user

Please find more details in the FLIP wiki document [1]. Looking forward to
your feedbacks.

[1] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=148643521

Best,
Yangze Guo

Re: [DISCUSS] FLIP-118: Improve Flink’s ID system

Posted by Yangze Guo <ka...@gmail.com>.
To keep the VOTE thread clean, I just forward @Zhijiang 's reminder here:

> Sorry for not involving in the previous discussion. In general I like the proposed direction to make related IDs have more rich information for debugging and correlation.

> But I have a bit reminder for the changes. After failover restarting, every related IDs would be generated differently with  before by random way, and I am not sure whether
this was intentional design before, but it is the ground truth now.

> E.g. ExecutionAttemptID , and the ResultPartitionID is also derived from it to guarantee the uniqueness after failover. Based on current implementation, it seems that we will not store and rely on the previous ID states after failover, but I am not sure whether this assumption is valid for future features.

> Anyway, it is lucky that the proposed changes in this FLIP do not break the previous truth by introducing the `attemptNumber` in `ExecutionAttemptID`, so we do not need to further consider this issue now.

Thanks for the reminder, I agreed that it should not break the
failover related rule. If some contracts in the future need different
IDs across failover, we need to rethink the structure of those IDs.

Best,
Yangze Guo

On Wed, Apr 15, 2020 at 5:09 PM Yangze Guo <ka...@gmail.com> wrote:
>
> Thanks for the feedback, @ZhuZhu and @Till
> Thanks for the deeper analysis of the size of TDD, @ZhuZhu.
>
> If there is no further concern in the next 24 hours, I'll start voting
> for this FLIP.
>
> Best,
> Yangze Guo
>
> On Wed, Apr 15, 2020 at 4:56 PM Till Rohrmann <tr...@apache.org> wrote:
> >
> > This is good news Yangze. Decreasing the size of our id's is a really nice side effect :-)
> >
> > Hence, +1 from my side as well.
> >
> > Cheers,
> > Till
> >
> > On Tue, Apr 14, 2020 at 9:54 AM Zhu Zhu <re...@gmail.com> wrote:
> >>
> >> Thanks for doing this benchmark @Yangze Guo <ka...@gmail.com> .
> >> The result looks promising. So I would +1 to refactor ExecutionAttemptID
> >> and IntermediateResultPartitionID.
> >>
> >> Regarding why 'The size of TDD after serialization become smaller than
> >> before',  I guess it's because the new IntermediateResultPartitionIDs can
> >> share the same IntermediateDataSetID, in this way the space of
> >> IntermediateResultPartitionID is a ref (to IntermediateDataSetID) and an
> >> int (index), which is smaller than 2 Longs (AbstractID).
> >>
> >> Thanks,
> >> Zhu Zhu
> >>
> >> Yangze Guo <ka...@gmail.com> 于2020年4月14日周二 下午3:09写道:
> >>
> >> > Hi everyone,
> >> >
> >> > I've investigated the infect with higher parallelism jobs.
> >> >
> >> > The result shows:
> >> > - The size of TDD after serialization become smaller than before.
> >> > While I did not meet any issue with Akka framework when the
> >> > parallelism set to 6000.
> >> > - There was no significant difference regarding the end to end
> >> > schedule time, job runtime, young gc count and total full gc time.
> >> >
> >> > For details, please take a look at
> >> >
> >> > https://docs.google.com/spreadsheets/d/13lt6J29uikWoux79047lkvbGi2fS4ioWHSbvT1kTL2M/edit?usp=sharing
> >> > .
> >> >
> >> > From my perspective, I think it might be ok to refactor
> >> > ExecutionAttemptID and IntermediateResultPartitionID. If you have
> >> > further concerns or think we should make further investigation. Please
> >> > let me know.
> >> >
> >> > Best,
> >> > Yangze Guo
> >> >
> >> > On Mon, Apr 13, 2020 at 1:36 PM Yangze Guo <ka...@gmail.com> wrote:
> >> > >
> >> > > Hi everyone,
> >> > > After an offline discussion with ZhuZhu, I have some comments on this
> >> > > investigation.
> >> > >
> >> > > Regarding the maximum parallelism went from 760 to 685, it may because
> >> > > of that the tasks are not scheduled evenly. The result is inconsistent
> >> > > in multiple experiments. So, this phenomenon would be irrelevant to
> >> > > our changes.
> >> > >
> >> > > I think what we really care about is the framesize for Akka(Should
> >> > > user enlarge it after our change for the same job). The size of TDD
> >> > > after serialization seems to be smaller after change. I don't know the
> >> > > root reason of this phenomenon at the moment. The way I measure it is:
> >> > > ```
> >> > > ByteArrayOutputStream bos = new ByteArrayOutputStream();
> >> > > ObjectOutputStream oos = new ObjectOutputStream(bos);
> >> > > oos.writeObject(deployment);
> >> > > oos.flush();
> >> > > LOG.info("BENCHMARK TDD_SERIAL_SIZE {}.", bos.toByteArray().length);
> >> > > ```
> >> > > Please correct me if I'm wrong.
> >> > >
> >> > > I'll experiment with higher parallelism to see if there is any
> >> > > regression regarding Akka framesize.
> >> > >
> >> > > Regarding the TDD building time, the parallelism in my investigation
> >> > > might be too small. Meanwhile, this time might be influence by many
> >> > > factors. Thus, I'll
> >> > > - experiment with higher parallelism.
> >> > > - measure the time spent from "Starting scheduling" to the last task
> >> > > change state to running.
> >> > >
> >> > > Best,
> >> > > Yangze Guo
> >> > >
> >> > >
> >> > > On Fri, Apr 10, 2020 at 12:53 PM Yangze Guo <ka...@gmail.com> wrote:
> >> > > >
> >> > > > Hi there,
> >> > > >
> >> > > > Sorry for the belated reply. I just make a preliminary investigation
> >> > > > of the infect of refactoring IntermediateResultPartitionID.
> >> > > >
> >> > > > The key change is making it being composed of IntermediateDataSetID
> >> > > > and a partitionNum.
> >> > > > public class IntermediateResultPartitionID {
> >> > > >    private final IntermediateDataSetID intermediateDataSetID;
> >> > > >    private final int partitionNum;
> >> > > > }
> >> > > >
> >> > > > In this benchmark, I use examples/streaming/WordCount.jar as the test
> >> > > > job and run Flink on Yarn. All the configurations are kept default
> >> > > > except for "taskmanager.numberOfTaskSlots", which is set to 2.
> >> > > >
> >> > > > The result shows it does have an impact on performance.
> >> > > > - After this change, the maximum parallelism went from 760 to 685,
> >> > > > which limited by the total number of network buffers. For the same
> >> > > > parallelism, user needs more network buffer than before.
> >> > > > - The netty message "PartitionRequest" and "TaskEventRequest" increase
> >> > > > by 4 bytes. For "PartitionRequest", it means 7% increase.
> >> > > > - The TDD takes longer to construct. With 600 parallelisms, it takes
> >> > > > twice as long to construct TDD than before.
> >> > > >
> >> > > > Details record in
> >> > > >
> >> > https://docs.google.com/spreadsheets/d/13lt6J29uikWoux79047lkvbGi2fS4ioWHSbvT1kTL2M/edit?usp=sharing
> >> > > >
> >> > > > The same issue could happen in ExecutionAttemptID, which will increase
> >> > > > the "PartitionRequest" and "TaskEventRequest" by 8 bytes(subtaskIndex
> >> > > > and attemptNumber). But it may not infect the TDD as much as
> >> > > > IntermediateResultPartitionID, since there is only one
> >> > > > ExecutionAttemptID per TDD.
> >> > > >
> >> > > > After that preliminary investigation, I tend to not refactor
> >> > > > ExecutionAttemptID and IntermediateResultPartitionID at the moment or
> >> > > > treat it as future work.
> >> > > >
> >> > > > WDYT? @ZhuZhu @Till
> >> > > >
> >> > > > Best,
> >> > > > Yangze Guo
> >> > > >
> >> > > > On Wed, Apr 1, 2020 at 11:53 AM Zhu Zhu <re...@gmail.com> wrote:
> >> > > > >
> >> > > > > >> However, it seems the JobVertexID is derived from hashcode ...
> >> > > > > You are right. JobVertexID is widely used and reworking it may
> >> > affect the
> >> > > > > public
> >> > > > > interfaces, e.g. REST api. We can take it as a long term goal and
> >> > exclude
> >> > > > > it from this FLIP.
> >> > > > > This same applies to IntermediateDataSetID, which can be also
> >> > composed of a
> >> > > > > JobID
> >> > > > > and an index as Till proposed.
> >> > > > >
> >> > > > > Thanks,
> >> > > > > Zhu Zhu
> >> > > > >
> >> > > > > Till Rohrmann <tr...@apache.org> 于2020年3月31日周二 下午8:36写道:
> >> > > > >
> >> > > > > > For the IntermediateDataSetID I was just thinking that it might
> >> > actually be
> >> > > > > > interesting to know which job produced the result which, by using
> >> > cluster
> >> > > > > > partitions, could be used across different jobs. Not saying that
> >> > we have to
> >> > > > > > do it, though.
> >> > > > > >
> >> > > > > > A small addition to Zhu Zhu's comment about TDD sizes: For the
> >> > problem with
> >> > > > > > too large TDDs there is already an issue [1]. The current
> >> > suspicion is that
> >> > > > > > the size of TDDs for jobs with a large parallelism can indeed
> >> > become
> >> > > > > > problematic for Flink. Hence, it would be great to investigate the
> >> > impacts
> >> > > > > > of the proposed changes.
> >> > > > > >
> >> > > > > > [1] https://issues.apache.org/jira/browse/FLINK-16069
> >> > > > > >
> >> > > > > > Cheers,
> >> > > > > > Till
> >> > > > > >
> >> > > > > > On Tue, Mar 31, 2020 at 11:50 AM Yangze Guo <ka...@gmail.com>
> >> > wrote:
> >> > > > > >
> >> > > > > > > Hi, Zhu,
> >> > > > > > >
> >> > > > > > > Thanks for the feedback.
> >> > > > > > >
> >> > > > > > > > make JobVertexID a composition of JobID and a topology index
> >> > > > > > > I think it is a good idea. However, it seems the JobVertexID is
> >> > > > > > > derived from hashcode which used to identify them across
> >> > submission.
> >> > > > > > > I'm not familiar with that component though. I prefer to keep
> >> > this
> >> > > > > > > idea out the scope of this FLIP if no one could help us to
> >> > figure it
> >> > > > > > > out.
> >> > > > > > >
> >> > > > > > > > How about we still keep IntermediateDataSetID independent from
> >> > > > > > > JobVertexID,
> >> > > > > > > > but just print the producing relationships in logs? I think
> >> > keeping
> >> > > > > > > > IntermediateDataSetID independent may be better considering
> >> > the cross
> >> > > > > > job
> >> > > > > > > > result usages in interactive query cases.
> >> > > > > > > I think you are right. I'll keep IntermediateDataSetID
> >> > independent
> >> > > > > > > from JobVertexID.
> >> > > > > > >
> >> > > > > > > > The new IDs will become larger with this rework.
> >> > > > > > > Yes, I also have the same concern. Benchmark is necessary, I'll
> >> > try to
> >> > > > > > > provide one during the implementation phase.
> >> > > > > > >
> >> > > > > > >
> >> > > > > > > Best,
> >> > > > > > > Yangze Guo
> >> > > > > > >
> >> > > > > > > On Tue, Mar 31, 2020 at 4:55 PM Zhu Zhu <re...@gmail.com>
> >> > wrote:
> >> > > > > > > >
> >> > > > > > > > Thanks for proposing this improvement Yangze. Big +1 for the
> >> > overall
> >> > > > > > > > proposal. It can help a lot in troubleshooting.
> >> > > > > > > >
> >> > > > > > > > Here are a few questions for it:
> >> > > > > > > > 1. Shall we make JobVertexID a composition of JobID and a
> >> > topology
> >> > > > > > index?
> >> > > > > > > > This would help in the session cluster case, so that we can
> >> > identify
> >> > > > > > > which
> >> > > > > > > > tasks are from which jobs along with the rework of
> >> > ExecutionAttemptID.
> >> > > > > > > >
> >> > > > > > > > 2. You mentioned that "Add the producer info to the string
> >> > literal of
> >> > > > > > > > IntermediateDataSetID". Do you mean to make
> >> > IntermediateDataSetID a
> >> > > > > > > > composition of JobVertexID and a consumer index?
> >> > > > > > > > How about we still keep IntermediateDataSetID independent from
> >> > > > > > > JobVertexID,
> >> > > > > > > > but just print the producing relationships in logs? I think
> >> > keeping
> >> > > > > > > > IntermediateDataSetID independent may be better considering
> >> > the cross
> >> > > > > > job
> >> > > > > > > > result usages in interactive query cases.
> >> > > > > > > >
> >> > > > > > > > 3. The new IDs will become larger with this rework. The
> >> > > > > > > > TaskDeploymentDescriptor can become much larger since it is
> >> > mainly
> >> > > > > > > composed
> >> > > > > > > > of a variety DIs. I'm not sure how much it would be but there
> >> > can be
> >> > > > > > more
> >> > > > > > > > memory and CPU cost for it, and results in more frequent GCs,
> >> > message
> >> > > > > > > size
> >> > > > > > > > exceeding akka frame limits, and a longer blocked time of main
> >> > thread.
> >> > > > > > > > This should not be a problem in most cases but might be a
> >> > problem for
> >> > > > > > > large
> >> > > > > > > > scale jobs. Shall we have an benchmark for it?
> >> > > > > > > >
> >> > > > > > > > Thanks,
> >> > > > > > > > Zhu Zhu
> >> > > > > > > >
> >> > > > > > > > Yangze Guo <ka...@gmail.com> 于2020年3月31日周二 下午2:19写道:
> >> > > > > > > >
> >> > > > > > > > > Thank you all for the feedback! Sorry for the belated reply.
> >> > > > > > > > >
> >> > > > > > > > > @Till
> >> > > > > > > > > I'm +1 for your two ideas and I'd like to move these two out
> >> > of the
> >> > > > > > > > > scope of this FLIP since the pipelined region scheduling is
> >> > an
> >> > > > > > ongoing
> >> > > > > > > > > work now.
> >> > > > > > > > > I also agree that we should not make the InstanceID in
> >> > > > > > > > > TaskExecutorConnection being composed of the ResourceID plus
> >> > a
> >> > > > > > > > > monotonically increasing value. Thanks a lot for your
> >> > explanation.
> >> > > > > > > > >
> >> > > > > > > > > @Konstantin @Yang
> >> > > > > > > > > Regarding the PodName of TaskExecutor on K8s, I second Yang's
> >> > > > > > > > > suggestion. It makes sense to me to let user export
> >> > RESOURCE_ID and
> >> > > > > > > > > make TM respect it. User needs to guarantee there is no
> >> > collision for
> >> > > > > > > > > different TM.
> >> > > > > > > > >
> >> > > > > > > > > Best,
> >> > > > > > > > > Yangze Guo
> >> > > > > > > > >
> >> > > > > > > > >
> >> > > > > > > > > On Tue, Mar 31, 2020 at 12:25 AM Steven Wu <
> >> > stevenz3wu@gmail.com>
> >> > > > > > > wrote:
> >> > > > > > > > > >
> >> > > > > > > > > > +1 on allowing user defined resourceId for taskmanager
> >> > > > > > > > > >
> >> > > > > > > > > > On Sun, Mar 29, 2020 at 7:24 PM Yang Wang <
> >> > danrtsey.wy@gmail.com>
> >> > > > > > > wrote:
> >> > > > > > > > > >
> >> > > > > > > > > > > Hi Konstantin,
> >> > > > > > > > > > >
> >> > > > > > > > > > > I think it is a good idea. Currently, our users also
> >> > report a
> >> > > > > > > similar
> >> > > > > > > > > issue
> >> > > > > > > > > > > with
> >> > > > > > > > > > > resourceId of standalone cluster. When we start a
> >> > standalone
> >> > > > > > > cluster
> >> > > > > > > > > now,
> >> > > > > > > > > > > the `TaskManagerRunner` always generates a uuid for the
> >> > > > > > > resourceId. It
> >> > > > > > > > > will
> >> > > > > > > > > > > be used to register to the jobmanager and not convenient
> >> > to match
> >> > > > > > > with
> >> > > > > > > > > the
> >> > > > > > > > > > > real
> >> > > > > > > > > > > taskmanager, especially in container environment.
> >> > > > > > > > > > >
> >> > > > > > > > > > > I think a probably solution is we could support the user
> >> > defined
> >> > > > > > > > > > > resourceId.
> >> > > > > > > > > > > We could get it from the environment. For standalone on
> >> > K8s, we
> >> > > > > > > could
> >> > > > > > > > > set
> >> > > > > > > > > > > the "RESOURCE_ID" env to the pod name so that it is
> >> > easier to
> >> > > > > > > match the
> >> > > > > > > > > > > taskmanager with K8s pod.
> >> > > > > > > > > > >
> >> > > > > > > > > > > Moreover, i am afraid we could not set the pod name to
> >> > the
> >> > > > > > > resourceId.
> >> > > > > > > > > I
> >> > > > > > > > > > > think
> >> > > > > > > > > > > you could set the "deployment.meta.name". Since the pod
> >> > name is
> >> > > > > > > > > generated
> >> > > > > > > > > > > by
> >> > > > > > > > > > > K8s in the pattern
> >> > {deployment.meta.nane}-{rc.uuid}-{uuid}. On
> >> > > > > > the
> >> > > > > > > > > > > contrary, we
> >> > > > > > > > > > > will set the resourceId to the pod name.
> >> > > > > > > > > > >
> >> > > > > > > > > > >
> >> > > > > > > > > > > Best,
> >> > > > > > > > > > > Yang
> >> > > > > > > > > > >
> >> > > > > > > > > > > Konstantin Knauf <ko...@ververica.com>
> >> > 于2020年3月29日周日
> >> > > > > > > 下午8:06写道:
> >> > > > > > > > > > >
> >> > > > > > > > > > > > Hi Yangze, Hi Till,
> >> > > > > > > > > > > >
> >> > > > > > > > > > > > thanks you for working on this topic. I believe it
> >> > will make
> >> > > > > > > > > debugging
> >> > > > > > > > > > > > large Apache Flink deployments much more feasible.
> >> > > > > > > > > > > >
> >> > > > > > > > > > > > I was wondering whether it would make sense to allow
> >> > the user
> >> > > > > > to
> >> > > > > > > > > specify
> >> > > > > > > > > > > > the Resource ID in standalone setups?  For example,
> >> > many users
> >> > > > > > > still
> >> > > > > > > > > > > > implicitly use standalone clusters on Kubernetes (the
> >> > native
> >> > > > > > > support
> >> > > > > > > > > is
> >> > > > > > > > > > > > still experimental) and in these cases it would be
> >> > interesting
> >> > > > > > to
> >> > > > > > > > > also
> >> > > > > > > > > > > set
> >> > > > > > > > > > > > the PodName as the ResourceID. What do you think?
> >> > > > > > > > > > > >
> >> > > > > > > > > > > > Cheers,
> >> > > > > > > > > > > >
> >> > > > > > > > > > > > Kosntantin
> >> > > > > > > > > > > >
> >> > > > > > > > > > > > On Thu, Mar 26, 2020 at 6:49 PM Till Rohrmann <
> >> > > > > > > trohrmann@apache.org>
> >> > > > > > > > > > > > wrote:
> >> > > > > > > > > > > >
> >> > > > > > > > > > > > > Hi Yangze,
> >> > > > > > > > > > > > >
> >> > > > > > > > > > > > > thanks for creating this FLIP. I think it is a very
> >> > good
> >> > > > > > > > > improvement
> >> > > > > > > > > > > > > helping our users and ourselves understanding better
> >> > what's
> >> > > > > > > going
> >> > > > > > > > > on in
> >> > > > > > > > > > > > > Flink.
> >> > > > > > > > > > > > >
> >> > > > > > > > > > > > > Creating the ResourceIDs with host information/pod
> >> > name is a
> >> > > > > > > good
> >> > > > > > > > > idea.
> >> > > > > > > > > > > > >
> >> > > > > > > > > > > > > Also deriving ExecutionGraph IDs from their superset
> >> > ID is a
> >> > > > > > > good
> >> > > > > > > > > idea.
> >> > > > > > > > > > > > >
> >> > > > > > > > > > > > > The InstanceID is used for fencing purposes. I would
> >> > not make
> >> > > > > > > it a
> >> > > > > > > > > > > > > composition of the ResourceID + a monotonically
> >> > increasing
> >> > > > > > > number.
> >> > > > > > > > > The
> >> > > > > > > > > > > > > problem is that in case of a RM failure the
> >> > InstanceIDs would
> >> > > > > > > start
> >> > > > > > > > > > > from
> >> > > > > > > > > > > > 0
> >> > > > > > > > > > > > > again and this could lead to collisions.
> >> > > > > > > > > > > > >
> >> > > > > > > > > > > > > Logging more information on how the different
> >> > runtime IDs are
> >> > > > > > > > > > > correlated
> >> > > > > > > > > > > > is
> >> > > > > > > > > > > > > also a good idea.
> >> > > > > > > > > > > > >
> >> > > > > > > > > > > > > Two other ideas for simplifying the ids are the
> >> > following:
> >> > > > > > > > > > > > >
> >> > > > > > > > > > > > > * The SlotRequestID was introduced because the
> >> > SlotPool was a
> >> > > > > > > > > separate
> >> > > > > > > > > > > > > RpcEndpoint a while ago. With this no longer being
> >> > the case I
> >> > > > > > > > > think we
> >> > > > > > > > > > > > > could remove the SlotRequestID and replace it with
> >> > the
> >> > > > > > > > > AllocationID.
> >> > > > > > > > > > > > > * Instead of creating new SlotRequestIDs for multi
> >> > task slots
> >> > > > > > > one
> >> > > > > > > > > could
> >> > > > > > > > > > > > > derive them from the SlotRequestID used for
> >> > requesting the
> >> > > > > > > > > underlying
> >> > > > > > > > > > > > > AllocatedSlot.
> >> > > > > > > > > > > > >
> >> > > > > > > > > > > > > Given that the slot sharing logic will most likely be
> >> > > > > > reworked
> >> > > > > > > > > with the
> >> > > > > > > > > > > > > pipelined region scheduling, we might be able to
> >> > resolve
> >> > > > > > these
> >> > > > > > > two
> >> > > > > > > > > > > points
> >> > > > > > > > > > > > > as part of the pipelined region scheduling effort.
> >> > > > > > > > > > > > >
> >> > > > > > > > > > > > > Cheers,
> >> > > > > > > > > > > > > Till
> >> > > > > > > > > > > > >
> >> > > > > > > > > > > > > On Thu, Mar 26, 2020 at 10:51 AM Yangze Guo <
> >> > > > > > > karmagyz@gmail.com>
> >> > > > > > > > > > > wrote:
> >> > > > > > > > > > > > >
> >> > > > > > > > > > > > > > Hi everyone,
> >> > > > > > > > > > > > > >
> >> > > > > > > > > > > > > > We would like to start a discussion thread on
> >> > "FLIP-118:
> >> > > > > > > Improve
> >> > > > > > > > > > > > > > Flink’s ID system"[1].
> >> > > > > > > > > > > > > >
> >> > > > > > > > > > > > > > This FLIP mainly discusses the following issues,
> >> > target to
> >> > > > > > > > > enhance
> >> > > > > > > > > > > the
> >> > > > > > > > > > > > > > readability of IDs in log and help user to debug
> >> > in case of
> >> > > > > > > > > failures:
> >> > > > > > > > > > > > > >
> >> > > > > > > > > > > > > > - Enhance the readability of the string literals
> >> > of IDs.
> >> > > > > > > Most of
> >> > > > > > > > > them
> >> > > > > > > > > > > > > > are hashcodes, e.g. ExecutionAttemptID, which do
> >> > not
> >> > > > > > provide
> >> > > > > > > much
> >> > > > > > > > > > > > > > meaningful information and are hard to recognize
> >> > and
> >> > > > > > compare
> >> > > > > > > for
> >> > > > > > > > > > > > > > users.
> >> > > > > > > > > > > > > > - Log the ID’s lineage information to make
> >> > debugging more
> >> > > > > > > > > convenient.
> >> > > > > > > > > > > > > > Currently, the log fails to always show the lineage
> >> > > > > > > information
> >> > > > > > > > > > > > > > between IDs. Finding out relationships between
> >> > entities
> >> > > > > > > > > identified by
> >> > > > > > > > > > > > > > given IDs is a common demand, e.g., slot of which
> >> > > > > > > AllocationID is
> >> > > > > > > > > > > > > > assigned to satisfy slot request of with
> >> > SlotRequestID.
> >> > > > > > > Absence
> >> > > > > > > > > of
> >> > > > > > > > > > > > > > such lineage information, it’s impossible to track
> >> > the end
> >> > > > > > > to end
> >> > > > > > > > > > > > > > lifecycle of an Execution or a Task now, which
> >> > makes
> >> > > > > > > debugging
> >> > > > > > > > > > > > > > difficult.
> >> > > > > > > > > > > > > >
> >> > > > > > > > > > > > > > Key changes proposed in the FLIP are as follows:
> >> > > > > > > > > > > > > >
> >> > > > > > > > > > > > > > - Add location information to distributed
> >> > components
> >> > > > > > > > > > > > > > - Add topology information to graph components
> >> > > > > > > > > > > > > > - Log the ID’s lineage information
> >> > > > > > > > > > > > > > - Expose the identifier of distributing component
> >> > to user
> >> > > > > > > > > > > > > >
> >> > > > > > > > > > > > > > Please find more details in the FLIP wiki document
> >> > [1].
> >> > > > > > > Looking
> >> > > > > > > > > > > forward
> >> > > > > > > > > > > > > to
> >> > > > > > > > > > > > > > your feedbacks.
> >> > > > > > > > > > > > > >
> >> > > > > > > > > > > > > > [1]
> >> > > > > > > > > > > > > >
> >> > > > > > > > > > > > >
> >> > > > > > > > > > > >
> >> > > > > > > > > > >
> >> > > > > > > > >
> >> > > > > > >
> >> > > > > >
> >> > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=148643521
> >> > > > > > > > > > > > > >
> >> > > > > > > > > > > > > > Best,
> >> > > > > > > > > > > > > > Yangze Guo
> >> > > > > > > > > > > > > >
> >> > > > > > > > > > > > >
> >> > > > > > > > > > > >
> >> > > > > > > > > > > >
> >> > > > > > > > > > > > --
> >> > > > > > > > > > > >
> >> > > > > > > > > > > > Konstantin Knauf | Head of Product
> >> > > > > > > > > > > >
> >> > > > > > > > > > > > +49 160 91394525
> >> > > > > > > > > > > >
> >> > > > > > > > > > > >
> >> > > > > > > > > > > > Follow us @VervericaData Ververica <
> >> > https://www.ververica.com/
> >> > > > > > >
> >> > > > > > > > > > > >
> >> > > > > > > > > > > >
> >> > > > > > > > > > > > --
> >> > > > > > > > > > > >
> >> > > > > > > > > > > > Join Flink Forward <https://flink-forward.org/> - The
> >> > Apache
> >> > > > > > > Flink
> >> > > > > > > > > > > > Conference
> >> > > > > > > > > > > >
> >> > > > > > > > > > > > Stream Processing | Event Driven | Real Time
> >> > > > > > > > > > > >
> >> > > > > > > > > > > > --
> >> > > > > > > > > > > >
> >> > > > > > > > > > > > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin,
> >> > Germany
> >> > > > > > > > > > > >
> >> > > > > > > > > > > > --
> >> > > > > > > > > > > > Ververica GmbH
> >> > > > > > > > > > > > Registered at Amtsgericht Charlottenburg: HRB 158244 B
> >> > > > > > > > > > > > Managing Directors: Timothy Alexander Steinert, Yip
> >> > Park Tung
> >> > > > > > > Jason,
> >> > > > > > > > > Ji
> >> > > > > > > > > > > > (Tony) Cheng
> >> > > > > > > > > > > >
> >> > > > > > > > > > >
> >> > > > > > > > >
> >> > > > > > >
> >> > > > > >
> >> >

Re: [DISCUSS] FLIP-118: Improve Flink’s ID system

Posted by Yangze Guo <ka...@gmail.com>.
Thanks for the feedback, @ZhuZhu and @Till
Thanks for the deeper analysis of the size of TDD, @ZhuZhu.

If there is no further concern in the next 24 hours, I'll start voting
for this FLIP.

Best,
Yangze Guo

On Wed, Apr 15, 2020 at 4:56 PM Till Rohrmann <tr...@apache.org> wrote:
>
> This is good news Yangze. Decreasing the size of our id's is a really nice side effect :-)
>
> Hence, +1 from my side as well.
>
> Cheers,
> Till
>
> On Tue, Apr 14, 2020 at 9:54 AM Zhu Zhu <re...@gmail.com> wrote:
>>
>> Thanks for doing this benchmark @Yangze Guo <ka...@gmail.com> .
>> The result looks promising. So I would +1 to refactor ExecutionAttemptID
>> and IntermediateResultPartitionID.
>>
>> Regarding why 'The size of TDD after serialization become smaller than
>> before',  I guess it's because the new IntermediateResultPartitionIDs can
>> share the same IntermediateDataSetID, in this way the space of
>> IntermediateResultPartitionID is a ref (to IntermediateDataSetID) and an
>> int (index), which is smaller than 2 Longs (AbstractID).
>>
>> Thanks,
>> Zhu Zhu
>>
>> Yangze Guo <ka...@gmail.com> 于2020年4月14日周二 下午3:09写道:
>>
>> > Hi everyone,
>> >
>> > I've investigated the infect with higher parallelism jobs.
>> >
>> > The result shows:
>> > - The size of TDD after serialization become smaller than before.
>> > While I did not meet any issue with Akka framework when the
>> > parallelism set to 6000.
>> > - There was no significant difference regarding the end to end
>> > schedule time, job runtime, young gc count and total full gc time.
>> >
>> > For details, please take a look at
>> >
>> > https://docs.google.com/spreadsheets/d/13lt6J29uikWoux79047lkvbGi2fS4ioWHSbvT1kTL2M/edit?usp=sharing
>> > .
>> >
>> > From my perspective, I think it might be ok to refactor
>> > ExecutionAttemptID and IntermediateResultPartitionID. If you have
>> > further concerns or think we should make further investigation. Please
>> > let me know.
>> >
>> > Best,
>> > Yangze Guo
>> >
>> > On Mon, Apr 13, 2020 at 1:36 PM Yangze Guo <ka...@gmail.com> wrote:
>> > >
>> > > Hi everyone,
>> > > After an offline discussion with ZhuZhu, I have some comments on this
>> > > investigation.
>> > >
>> > > Regarding the maximum parallelism went from 760 to 685, it may because
>> > > of that the tasks are not scheduled evenly. The result is inconsistent
>> > > in multiple experiments. So, this phenomenon would be irrelevant to
>> > > our changes.
>> > >
>> > > I think what we really care about is the framesize for Akka(Should
>> > > user enlarge it after our change for the same job). The size of TDD
>> > > after serialization seems to be smaller after change. I don't know the
>> > > root reason of this phenomenon at the moment. The way I measure it is:
>> > > ```
>> > > ByteArrayOutputStream bos = new ByteArrayOutputStream();
>> > > ObjectOutputStream oos = new ObjectOutputStream(bos);
>> > > oos.writeObject(deployment);
>> > > oos.flush();
>> > > LOG.info("BENCHMARK TDD_SERIAL_SIZE {}.", bos.toByteArray().length);
>> > > ```
>> > > Please correct me if I'm wrong.
>> > >
>> > > I'll experiment with higher parallelism to see if there is any
>> > > regression regarding Akka framesize.
>> > >
>> > > Regarding the TDD building time, the parallelism in my investigation
>> > > might be too small. Meanwhile, this time might be influence by many
>> > > factors. Thus, I'll
>> > > - experiment with higher parallelism.
>> > > - measure the time spent from "Starting scheduling" to the last task
>> > > change state to running.
>> > >
>> > > Best,
>> > > Yangze Guo
>> > >
>> > >
>> > > On Fri, Apr 10, 2020 at 12:53 PM Yangze Guo <ka...@gmail.com> wrote:
>> > > >
>> > > > Hi there,
>> > > >
>> > > > Sorry for the belated reply. I just make a preliminary investigation
>> > > > of the infect of refactoring IntermediateResultPartitionID.
>> > > >
>> > > > The key change is making it being composed of IntermediateDataSetID
>> > > > and a partitionNum.
>> > > > public class IntermediateResultPartitionID {
>> > > >    private final IntermediateDataSetID intermediateDataSetID;
>> > > >    private final int partitionNum;
>> > > > }
>> > > >
>> > > > In this benchmark, I use examples/streaming/WordCount.jar as the test
>> > > > job and run Flink on Yarn. All the configurations are kept default
>> > > > except for "taskmanager.numberOfTaskSlots", which is set to 2.
>> > > >
>> > > > The result shows it does have an impact on performance.
>> > > > - After this change, the maximum parallelism went from 760 to 685,
>> > > > which limited by the total number of network buffers. For the same
>> > > > parallelism, user needs more network buffer than before.
>> > > > - The netty message "PartitionRequest" and "TaskEventRequest" increase
>> > > > by 4 bytes. For "PartitionRequest", it means 7% increase.
>> > > > - The TDD takes longer to construct. With 600 parallelisms, it takes
>> > > > twice as long to construct TDD than before.
>> > > >
>> > > > Details record in
>> > > >
>> > https://docs.google.com/spreadsheets/d/13lt6J29uikWoux79047lkvbGi2fS4ioWHSbvT1kTL2M/edit?usp=sharing
>> > > >
>> > > > The same issue could happen in ExecutionAttemptID, which will increase
>> > > > the "PartitionRequest" and "TaskEventRequest" by 8 bytes(subtaskIndex
>> > > > and attemptNumber). But it may not infect the TDD as much as
>> > > > IntermediateResultPartitionID, since there is only one
>> > > > ExecutionAttemptID per TDD.
>> > > >
>> > > > After that preliminary investigation, I tend to not refactor
>> > > > ExecutionAttemptID and IntermediateResultPartitionID at the moment or
>> > > > treat it as future work.
>> > > >
>> > > > WDYT? @ZhuZhu @Till
>> > > >
>> > > > Best,
>> > > > Yangze Guo
>> > > >
>> > > > On Wed, Apr 1, 2020 at 11:53 AM Zhu Zhu <re...@gmail.com> wrote:
>> > > > >
>> > > > > >> However, it seems the JobVertexID is derived from hashcode ...
>> > > > > You are right. JobVertexID is widely used and reworking it may
>> > affect the
>> > > > > public
>> > > > > interfaces, e.g. REST api. We can take it as a long term goal and
>> > exclude
>> > > > > it from this FLIP.
>> > > > > This same applies to IntermediateDataSetID, which can be also
>> > composed of a
>> > > > > JobID
>> > > > > and an index as Till proposed.
>> > > > >
>> > > > > Thanks,
>> > > > > Zhu Zhu
>> > > > >
>> > > > > Till Rohrmann <tr...@apache.org> 于2020年3月31日周二 下午8:36写道:
>> > > > >
>> > > > > > For the IntermediateDataSetID I was just thinking that it might
>> > actually be
>> > > > > > interesting to know which job produced the result which, by using
>> > cluster
>> > > > > > partitions, could be used across different jobs. Not saying that
>> > we have to
>> > > > > > do it, though.
>> > > > > >
>> > > > > > A small addition to Zhu Zhu's comment about TDD sizes: For the
>> > problem with
>> > > > > > too large TDDs there is already an issue [1]. The current
>> > suspicion is that
>> > > > > > the size of TDDs for jobs with a large parallelism can indeed
>> > become
>> > > > > > problematic for Flink. Hence, it would be great to investigate the
>> > impacts
>> > > > > > of the proposed changes.
>> > > > > >
>> > > > > > [1] https://issues.apache.org/jira/browse/FLINK-16069
>> > > > > >
>> > > > > > Cheers,
>> > > > > > Till
>> > > > > >
>> > > > > > On Tue, Mar 31, 2020 at 11:50 AM Yangze Guo <ka...@gmail.com>
>> > wrote:
>> > > > > >
>> > > > > > > Hi, Zhu,
>> > > > > > >
>> > > > > > > Thanks for the feedback.
>> > > > > > >
>> > > > > > > > make JobVertexID a composition of JobID and a topology index
>> > > > > > > I think it is a good idea. However, it seems the JobVertexID is
>> > > > > > > derived from hashcode which used to identify them across
>> > submission.
>> > > > > > > I'm not familiar with that component though. I prefer to keep
>> > this
>> > > > > > > idea out the scope of this FLIP if no one could help us to
>> > figure it
>> > > > > > > out.
>> > > > > > >
>> > > > > > > > How about we still keep IntermediateDataSetID independent from
>> > > > > > > JobVertexID,
>> > > > > > > > but just print the producing relationships in logs? I think
>> > keeping
>> > > > > > > > IntermediateDataSetID independent may be better considering
>> > the cross
>> > > > > > job
>> > > > > > > > result usages in interactive query cases.
>> > > > > > > I think you are right. I'll keep IntermediateDataSetID
>> > independent
>> > > > > > > from JobVertexID.
>> > > > > > >
>> > > > > > > > The new IDs will become larger with this rework.
>> > > > > > > Yes, I also have the same concern. Benchmark is necessary, I'll
>> > try to
>> > > > > > > provide one during the implementation phase.
>> > > > > > >
>> > > > > > >
>> > > > > > > Best,
>> > > > > > > Yangze Guo
>> > > > > > >
>> > > > > > > On Tue, Mar 31, 2020 at 4:55 PM Zhu Zhu <re...@gmail.com>
>> > wrote:
>> > > > > > > >
>> > > > > > > > Thanks for proposing this improvement Yangze. Big +1 for the
>> > overall
>> > > > > > > > proposal. It can help a lot in troubleshooting.
>> > > > > > > >
>> > > > > > > > Here are a few questions for it:
>> > > > > > > > 1. Shall we make JobVertexID a composition of JobID and a
>> > topology
>> > > > > > index?
>> > > > > > > > This would help in the session cluster case, so that we can
>> > identify
>> > > > > > > which
>> > > > > > > > tasks are from which jobs along with the rework of
>> > ExecutionAttemptID.
>> > > > > > > >
>> > > > > > > > 2. You mentioned that "Add the producer info to the string
>> > literal of
>> > > > > > > > IntermediateDataSetID". Do you mean to make
>> > IntermediateDataSetID a
>> > > > > > > > composition of JobVertexID and a consumer index?
>> > > > > > > > How about we still keep IntermediateDataSetID independent from
>> > > > > > > JobVertexID,
>> > > > > > > > but just print the producing relationships in logs? I think
>> > keeping
>> > > > > > > > IntermediateDataSetID independent may be better considering
>> > the cross
>> > > > > > job
>> > > > > > > > result usages in interactive query cases.
>> > > > > > > >
>> > > > > > > > 3. The new IDs will become larger with this rework. The
>> > > > > > > > TaskDeploymentDescriptor can become much larger since it is
>> > mainly
>> > > > > > > composed
>> > > > > > > > of a variety DIs. I'm not sure how much it would be but there
>> > can be
>> > > > > > more
>> > > > > > > > memory and CPU cost for it, and results in more frequent GCs,
>> > message
>> > > > > > > size
>> > > > > > > > exceeding akka frame limits, and a longer blocked time of main
>> > thread.
>> > > > > > > > This should not be a problem in most cases but might be a
>> > problem for
>> > > > > > > large
>> > > > > > > > scale jobs. Shall we have an benchmark for it?
>> > > > > > > >
>> > > > > > > > Thanks,
>> > > > > > > > Zhu Zhu
>> > > > > > > >
>> > > > > > > > Yangze Guo <ka...@gmail.com> 于2020年3月31日周二 下午2:19写道:
>> > > > > > > >
>> > > > > > > > > Thank you all for the feedback! Sorry for the belated reply.
>> > > > > > > > >
>> > > > > > > > > @Till
>> > > > > > > > > I'm +1 for your two ideas and I'd like to move these two out
>> > of the
>> > > > > > > > > scope of this FLIP since the pipelined region scheduling is
>> > an
>> > > > > > ongoing
>> > > > > > > > > work now.
>> > > > > > > > > I also agree that we should not make the InstanceID in
>> > > > > > > > > TaskExecutorConnection being composed of the ResourceID plus
>> > a
>> > > > > > > > > monotonically increasing value. Thanks a lot for your
>> > explanation.
>> > > > > > > > >
>> > > > > > > > > @Konstantin @Yang
>> > > > > > > > > Regarding the PodName of TaskExecutor on K8s, I second Yang's
>> > > > > > > > > suggestion. It makes sense to me to let user export
>> > RESOURCE_ID and
>> > > > > > > > > make TM respect it. User needs to guarantee there is no
>> > collision for
>> > > > > > > > > different TM.
>> > > > > > > > >
>> > > > > > > > > Best,
>> > > > > > > > > Yangze Guo
>> > > > > > > > >
>> > > > > > > > >
>> > > > > > > > > On Tue, Mar 31, 2020 at 12:25 AM Steven Wu <
>> > stevenz3wu@gmail.com>
>> > > > > > > wrote:
>> > > > > > > > > >
>> > > > > > > > > > +1 on allowing user defined resourceId for taskmanager
>> > > > > > > > > >
>> > > > > > > > > > On Sun, Mar 29, 2020 at 7:24 PM Yang Wang <
>> > danrtsey.wy@gmail.com>
>> > > > > > > wrote:
>> > > > > > > > > >
>> > > > > > > > > > > Hi Konstantin,
>> > > > > > > > > > >
>> > > > > > > > > > > I think it is a good idea. Currently, our users also
>> > report a
>> > > > > > > similar
>> > > > > > > > > issue
>> > > > > > > > > > > with
>> > > > > > > > > > > resourceId of standalone cluster. When we start a
>> > standalone
>> > > > > > > cluster
>> > > > > > > > > now,
>> > > > > > > > > > > the `TaskManagerRunner` always generates a uuid for the
>> > > > > > > resourceId. It
>> > > > > > > > > will
>> > > > > > > > > > > be used to register to the jobmanager and not convenient
>> > to match
>> > > > > > > with
>> > > > > > > > > the
>> > > > > > > > > > > real
>> > > > > > > > > > > taskmanager, especially in container environment.
>> > > > > > > > > > >
>> > > > > > > > > > > I think a probably solution is we could support the user
>> > defined
>> > > > > > > > > > > resourceId.
>> > > > > > > > > > > We could get it from the environment. For standalone on
>> > K8s, we
>> > > > > > > could
>> > > > > > > > > set
>> > > > > > > > > > > the "RESOURCE_ID" env to the pod name so that it is
>> > easier to
>> > > > > > > match the
>> > > > > > > > > > > taskmanager with K8s pod.
>> > > > > > > > > > >
>> > > > > > > > > > > Moreover, i am afraid we could not set the pod name to
>> > the
>> > > > > > > resourceId.
>> > > > > > > > > I
>> > > > > > > > > > > think
>> > > > > > > > > > > you could set the "deployment.meta.name". Since the pod
>> > name is
>> > > > > > > > > generated
>> > > > > > > > > > > by
>> > > > > > > > > > > K8s in the pattern
>> > {deployment.meta.nane}-{rc.uuid}-{uuid}. On
>> > > > > > the
>> > > > > > > > > > > contrary, we
>> > > > > > > > > > > will set the resourceId to the pod name.
>> > > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > > > > Best,
>> > > > > > > > > > > Yang
>> > > > > > > > > > >
>> > > > > > > > > > > Konstantin Knauf <ko...@ververica.com>
>> > 于2020年3月29日周日
>> > > > > > > 下午8:06写道:
>> > > > > > > > > > >
>> > > > > > > > > > > > Hi Yangze, Hi Till,
>> > > > > > > > > > > >
>> > > > > > > > > > > > thanks you for working on this topic. I believe it
>> > will make
>> > > > > > > > > debugging
>> > > > > > > > > > > > large Apache Flink deployments much more feasible.
>> > > > > > > > > > > >
>> > > > > > > > > > > > I was wondering whether it would make sense to allow
>> > the user
>> > > > > > to
>> > > > > > > > > specify
>> > > > > > > > > > > > the Resource ID in standalone setups?  For example,
>> > many users
>> > > > > > > still
>> > > > > > > > > > > > implicitly use standalone clusters on Kubernetes (the
>> > native
>> > > > > > > support
>> > > > > > > > > is
>> > > > > > > > > > > > still experimental) and in these cases it would be
>> > interesting
>> > > > > > to
>> > > > > > > > > also
>> > > > > > > > > > > set
>> > > > > > > > > > > > the PodName as the ResourceID. What do you think?
>> > > > > > > > > > > >
>> > > > > > > > > > > > Cheers,
>> > > > > > > > > > > >
>> > > > > > > > > > > > Kosntantin
>> > > > > > > > > > > >
>> > > > > > > > > > > > On Thu, Mar 26, 2020 at 6:49 PM Till Rohrmann <
>> > > > > > > trohrmann@apache.org>
>> > > > > > > > > > > > wrote:
>> > > > > > > > > > > >
>> > > > > > > > > > > > > Hi Yangze,
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > thanks for creating this FLIP. I think it is a very
>> > good
>> > > > > > > > > improvement
>> > > > > > > > > > > > > helping our users and ourselves understanding better
>> > what's
>> > > > > > > going
>> > > > > > > > > on in
>> > > > > > > > > > > > > Flink.
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > Creating the ResourceIDs with host information/pod
>> > name is a
>> > > > > > > good
>> > > > > > > > > idea.
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > Also deriving ExecutionGraph IDs from their superset
>> > ID is a
>> > > > > > > good
>> > > > > > > > > idea.
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > The InstanceID is used for fencing purposes. I would
>> > not make
>> > > > > > > it a
>> > > > > > > > > > > > > composition of the ResourceID + a monotonically
>> > increasing
>> > > > > > > number.
>> > > > > > > > > The
>> > > > > > > > > > > > > problem is that in case of a RM failure the
>> > InstanceIDs would
>> > > > > > > start
>> > > > > > > > > > > from
>> > > > > > > > > > > > 0
>> > > > > > > > > > > > > again and this could lead to collisions.
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > Logging more information on how the different
>> > runtime IDs are
>> > > > > > > > > > > correlated
>> > > > > > > > > > > > is
>> > > > > > > > > > > > > also a good idea.
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > Two other ideas for simplifying the ids are the
>> > following:
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > * The SlotRequestID was introduced because the
>> > SlotPool was a
>> > > > > > > > > separate
>> > > > > > > > > > > > > RpcEndpoint a while ago. With this no longer being
>> > the case I
>> > > > > > > > > think we
>> > > > > > > > > > > > > could remove the SlotRequestID and replace it with
>> > the
>> > > > > > > > > AllocationID.
>> > > > > > > > > > > > > * Instead of creating new SlotRequestIDs for multi
>> > task slots
>> > > > > > > one
>> > > > > > > > > could
>> > > > > > > > > > > > > derive them from the SlotRequestID used for
>> > requesting the
>> > > > > > > > > underlying
>> > > > > > > > > > > > > AllocatedSlot.
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > Given that the slot sharing logic will most likely be
>> > > > > > reworked
>> > > > > > > > > with the
>> > > > > > > > > > > > > pipelined region scheduling, we might be able to
>> > resolve
>> > > > > > these
>> > > > > > > two
>> > > > > > > > > > > points
>> > > > > > > > > > > > > as part of the pipelined region scheduling effort.
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > Cheers,
>> > > > > > > > > > > > > Till
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > On Thu, Mar 26, 2020 at 10:51 AM Yangze Guo <
>> > > > > > > karmagyz@gmail.com>
>> > > > > > > > > > > wrote:
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > > Hi everyone,
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > > > We would like to start a discussion thread on
>> > "FLIP-118:
>> > > > > > > Improve
>> > > > > > > > > > > > > > Flink’s ID system"[1].
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > > > This FLIP mainly discusses the following issues,
>> > target to
>> > > > > > > > > enhance
>> > > > > > > > > > > the
>> > > > > > > > > > > > > > readability of IDs in log and help user to debug
>> > in case of
>> > > > > > > > > failures:
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > > > - Enhance the readability of the string literals
>> > of IDs.
>> > > > > > > Most of
>> > > > > > > > > them
>> > > > > > > > > > > > > > are hashcodes, e.g. ExecutionAttemptID, which do
>> > not
>> > > > > > provide
>> > > > > > > much
>> > > > > > > > > > > > > > meaningful information and are hard to recognize
>> > and
>> > > > > > compare
>> > > > > > > for
>> > > > > > > > > > > > > > users.
>> > > > > > > > > > > > > > - Log the ID’s lineage information to make
>> > debugging more
>> > > > > > > > > convenient.
>> > > > > > > > > > > > > > Currently, the log fails to always show the lineage
>> > > > > > > information
>> > > > > > > > > > > > > > between IDs. Finding out relationships between
>> > entities
>> > > > > > > > > identified by
>> > > > > > > > > > > > > > given IDs is a common demand, e.g., slot of which
>> > > > > > > AllocationID is
>> > > > > > > > > > > > > > assigned to satisfy slot request of with
>> > SlotRequestID.
>> > > > > > > Absence
>> > > > > > > > > of
>> > > > > > > > > > > > > > such lineage information, it’s impossible to track
>> > the end
>> > > > > > > to end
>> > > > > > > > > > > > > > lifecycle of an Execution or a Task now, which
>> > makes
>> > > > > > > debugging
>> > > > > > > > > > > > > > difficult.
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > > > Key changes proposed in the FLIP are as follows:
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > > > - Add location information to distributed
>> > components
>> > > > > > > > > > > > > > - Add topology information to graph components
>> > > > > > > > > > > > > > - Log the ID’s lineage information
>> > > > > > > > > > > > > > - Expose the identifier of distributing component
>> > to user
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > > > Please find more details in the FLIP wiki document
>> > [1].
>> > > > > > > Looking
>> > > > > > > > > > > forward
>> > > > > > > > > > > > > to
>> > > > > > > > > > > > > > your feedbacks.
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > > > [1]
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > >
>> > > > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > >
>> > > > > > >
>> > > > > >
>> > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=148643521
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > > > Best,
>> > > > > > > > > > > > > > Yangze Guo
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > >
>> > > > > > > > > > > >
>> > > > > > > > > > > >
>> > > > > > > > > > > > --
>> > > > > > > > > > > >
>> > > > > > > > > > > > Konstantin Knauf | Head of Product
>> > > > > > > > > > > >
>> > > > > > > > > > > > +49 160 91394525
>> > > > > > > > > > > >
>> > > > > > > > > > > >
>> > > > > > > > > > > > Follow us @VervericaData Ververica <
>> > https://www.ververica.com/
>> > > > > > >
>> > > > > > > > > > > >
>> > > > > > > > > > > >
>> > > > > > > > > > > > --
>> > > > > > > > > > > >
>> > > > > > > > > > > > Join Flink Forward <https://flink-forward.org/> - The
>> > Apache
>> > > > > > > Flink
>> > > > > > > > > > > > Conference
>> > > > > > > > > > > >
>> > > > > > > > > > > > Stream Processing | Event Driven | Real Time
>> > > > > > > > > > > >
>> > > > > > > > > > > > --
>> > > > > > > > > > > >
>> > > > > > > > > > > > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin,
>> > Germany
>> > > > > > > > > > > >
>> > > > > > > > > > > > --
>> > > > > > > > > > > > Ververica GmbH
>> > > > > > > > > > > > Registered at Amtsgericht Charlottenburg: HRB 158244 B
>> > > > > > > > > > > > Managing Directors: Timothy Alexander Steinert, Yip
>> > Park Tung
>> > > > > > > Jason,
>> > > > > > > > > Ji
>> > > > > > > > > > > > (Tony) Cheng
>> > > > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > >
>> > > > > > >
>> > > > > >
>> >

Re: [DISCUSS] FLIP-118: Improve Flink’s ID system

Posted by Till Rohrmann <tr...@apache.org>.
This is good news Yangze. Decreasing the size of our id's is a really nice
side effect :-)

Hence, +1 from my side as well.

Cheers,
Till

On Tue, Apr 14, 2020 at 9:54 AM Zhu Zhu <re...@gmail.com> wrote:

> Thanks for doing this benchmark @Yangze Guo <ka...@gmail.com> .
> The result looks promising. So I would +1 to refactor ExecutionAttemptID
> and IntermediateResultPartitionID.
>
> Regarding why 'The size of TDD after serialization become smaller than
> before',  I guess it's because the new IntermediateResultPartitionIDs can
> share the same IntermediateDataSetID, in this way the space of
> IntermediateResultPartitionID is a ref (to IntermediateDataSetID) and an
> int (index), which is smaller than 2 Longs (AbstractID).
>
> Thanks,
> Zhu Zhu
>
> Yangze Guo <ka...@gmail.com> 于2020年4月14日周二 下午3:09写道:
>
> > Hi everyone,
> >
> > I've investigated the infect with higher parallelism jobs.
> >
> > The result shows:
> > - The size of TDD after serialization become smaller than before.
> > While I did not meet any issue with Akka framework when the
> > parallelism set to 6000.
> > - There was no significant difference regarding the end to end
> > schedule time, job runtime, young gc count and total full gc time.
> >
> > For details, please take a look at
> >
> >
> https://docs.google.com/spreadsheets/d/13lt6J29uikWoux79047lkvbGi2fS4ioWHSbvT1kTL2M/edit?usp=sharing
> > .
> >
> > From my perspective, I think it might be ok to refactor
> > ExecutionAttemptID and IntermediateResultPartitionID. If you have
> > further concerns or think we should make further investigation. Please
> > let me know.
> >
> > Best,
> > Yangze Guo
> >
> > On Mon, Apr 13, 2020 at 1:36 PM Yangze Guo <ka...@gmail.com> wrote:
> > >
> > > Hi everyone,
> > > After an offline discussion with ZhuZhu, I have some comments on this
> > > investigation.
> > >
> > > Regarding the maximum parallelism went from 760 to 685, it may because
> > > of that the tasks are not scheduled evenly. The result is inconsistent
> > > in multiple experiments. So, this phenomenon would be irrelevant to
> > > our changes.
> > >
> > > I think what we really care about is the framesize for Akka(Should
> > > user enlarge it after our change for the same job). The size of TDD
> > > after serialization seems to be smaller after change. I don't know the
> > > root reason of this phenomenon at the moment. The way I measure it is:
> > > ```
> > > ByteArrayOutputStream bos = new ByteArrayOutputStream();
> > > ObjectOutputStream oos = new ObjectOutputStream(bos);
> > > oos.writeObject(deployment);
> > > oos.flush();
> > > LOG.info("BENCHMARK TDD_SERIAL_SIZE {}.", bos.toByteArray().length);
> > > ```
> > > Please correct me if I'm wrong.
> > >
> > > I'll experiment with higher parallelism to see if there is any
> > > regression regarding Akka framesize.
> > >
> > > Regarding the TDD building time, the parallelism in my investigation
> > > might be too small. Meanwhile, this time might be influence by many
> > > factors. Thus, I'll
> > > - experiment with higher parallelism.
> > > - measure the time spent from "Starting scheduling" to the last task
> > > change state to running.
> > >
> > > Best,
> > > Yangze Guo
> > >
> > >
> > > On Fri, Apr 10, 2020 at 12:53 PM Yangze Guo <ka...@gmail.com>
> wrote:
> > > >
> > > > Hi there,
> > > >
> > > > Sorry for the belated reply. I just make a preliminary investigation
> > > > of the infect of refactoring IntermediateResultPartitionID.
> > > >
> > > > The key change is making it being composed of IntermediateDataSetID
> > > > and a partitionNum.
> > > > public class IntermediateResultPartitionID {
> > > >    private final IntermediateDataSetID intermediateDataSetID;
> > > >    private final int partitionNum;
> > > > }
> > > >
> > > > In this benchmark, I use examples/streaming/WordCount.jar as the test
> > > > job and run Flink on Yarn. All the configurations are kept default
> > > > except for "taskmanager.numberOfTaskSlots", which is set to 2.
> > > >
> > > > The result shows it does have an impact on performance.
> > > > - After this change, the maximum parallelism went from 760 to 685,
> > > > which limited by the total number of network buffers. For the same
> > > > parallelism, user needs more network buffer than before.
> > > > - The netty message "PartitionRequest" and "TaskEventRequest"
> increase
> > > > by 4 bytes. For "PartitionRequest", it means 7% increase.
> > > > - The TDD takes longer to construct. With 600 parallelisms, it takes
> > > > twice as long to construct TDD than before.
> > > >
> > > > Details record in
> > > >
> >
> https://docs.google.com/spreadsheets/d/13lt6J29uikWoux79047lkvbGi2fS4ioWHSbvT1kTL2M/edit?usp=sharing
> > > >
> > > > The same issue could happen in ExecutionAttemptID, which will
> increase
> > > > the "PartitionRequest" and "TaskEventRequest" by 8 bytes(subtaskIndex
> > > > and attemptNumber). But it may not infect the TDD as much as
> > > > IntermediateResultPartitionID, since there is only one
> > > > ExecutionAttemptID per TDD.
> > > >
> > > > After that preliminary investigation, I tend to not refactor
> > > > ExecutionAttemptID and IntermediateResultPartitionID at the moment or
> > > > treat it as future work.
> > > >
> > > > WDYT? @ZhuZhu @Till
> > > >
> > > > Best,
> > > > Yangze Guo
> > > >
> > > > On Wed, Apr 1, 2020 at 11:53 AM Zhu Zhu <re...@gmail.com> wrote:
> > > > >
> > > > > >> However, it seems the JobVertexID is derived from hashcode ...
> > > > > You are right. JobVertexID is widely used and reworking it may
> > affect the
> > > > > public
> > > > > interfaces, e.g. REST api. We can take it as a long term goal and
> > exclude
> > > > > it from this FLIP.
> > > > > This same applies to IntermediateDataSetID, which can be also
> > composed of a
> > > > > JobID
> > > > > and an index as Till proposed.
> > > > >
> > > > > Thanks,
> > > > > Zhu Zhu
> > > > >
> > > > > Till Rohrmann <tr...@apache.org> 于2020年3月31日周二 下午8:36写道:
> > > > >
> > > > > > For the IntermediateDataSetID I was just thinking that it might
> > actually be
> > > > > > interesting to know which job produced the result which, by using
> > cluster
> > > > > > partitions, could be used across different jobs. Not saying that
> > we have to
> > > > > > do it, though.
> > > > > >
> > > > > > A small addition to Zhu Zhu's comment about TDD sizes: For the
> > problem with
> > > > > > too large TDDs there is already an issue [1]. The current
> > suspicion is that
> > > > > > the size of TDDs for jobs with a large parallelism can indeed
> > become
> > > > > > problematic for Flink. Hence, it would be great to investigate
> the
> > impacts
> > > > > > of the proposed changes.
> > > > > >
> > > > > > [1] https://issues.apache.org/jira/browse/FLINK-16069
> > > > > >
> > > > > > Cheers,
> > > > > > Till
> > > > > >
> > > > > > On Tue, Mar 31, 2020 at 11:50 AM Yangze Guo <ka...@gmail.com>
> > wrote:
> > > > > >
> > > > > > > Hi, Zhu,
> > > > > > >
> > > > > > > Thanks for the feedback.
> > > > > > >
> > > > > > > > make JobVertexID a composition of JobID and a topology index
> > > > > > > I think it is a good idea. However, it seems the JobVertexID is
> > > > > > > derived from hashcode which used to identify them across
> > submission.
> > > > > > > I'm not familiar with that component though. I prefer to keep
> > this
> > > > > > > idea out the scope of this FLIP if no one could help us to
> > figure it
> > > > > > > out.
> > > > > > >
> > > > > > > > How about we still keep IntermediateDataSetID independent
> from
> > > > > > > JobVertexID,
> > > > > > > > but just print the producing relationships in logs? I think
> > keeping
> > > > > > > > IntermediateDataSetID independent may be better considering
> > the cross
> > > > > > job
> > > > > > > > result usages in interactive query cases.
> > > > > > > I think you are right. I'll keep IntermediateDataSetID
> > independent
> > > > > > > from JobVertexID.
> > > > > > >
> > > > > > > > The new IDs will become larger with this rework.
> > > > > > > Yes, I also have the same concern. Benchmark is necessary, I'll
> > try to
> > > > > > > provide one during the implementation phase.
> > > > > > >
> > > > > > >
> > > > > > > Best,
> > > > > > > Yangze Guo
> > > > > > >
> > > > > > > On Tue, Mar 31, 2020 at 4:55 PM Zhu Zhu <re...@gmail.com>
> > wrote:
> > > > > > > >
> > > > > > > > Thanks for proposing this improvement Yangze. Big +1 for the
> > overall
> > > > > > > > proposal. It can help a lot in troubleshooting.
> > > > > > > >
> > > > > > > > Here are a few questions for it:
> > > > > > > > 1. Shall we make JobVertexID a composition of JobID and a
> > topology
> > > > > > index?
> > > > > > > > This would help in the session cluster case, so that we can
> > identify
> > > > > > > which
> > > > > > > > tasks are from which jobs along with the rework of
> > ExecutionAttemptID.
> > > > > > > >
> > > > > > > > 2. You mentioned that "Add the producer info to the string
> > literal of
> > > > > > > > IntermediateDataSetID". Do you mean to make
> > IntermediateDataSetID a
> > > > > > > > composition of JobVertexID and a consumer index?
> > > > > > > > How about we still keep IntermediateDataSetID independent
> from
> > > > > > > JobVertexID,
> > > > > > > > but just print the producing relationships in logs? I think
> > keeping
> > > > > > > > IntermediateDataSetID independent may be better considering
> > the cross
> > > > > > job
> > > > > > > > result usages in interactive query cases.
> > > > > > > >
> > > > > > > > 3. The new IDs will become larger with this rework. The
> > > > > > > > TaskDeploymentDescriptor can become much larger since it is
> > mainly
> > > > > > > composed
> > > > > > > > of a variety DIs. I'm not sure how much it would be but there
> > can be
> > > > > > more
> > > > > > > > memory and CPU cost for it, and results in more frequent GCs,
> > message
> > > > > > > size
> > > > > > > > exceeding akka frame limits, and a longer blocked time of
> main
> > thread.
> > > > > > > > This should not be a problem in most cases but might be a
> > problem for
> > > > > > > large
> > > > > > > > scale jobs. Shall we have an benchmark for it?
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > > Zhu Zhu
> > > > > > > >
> > > > > > > > Yangze Guo <ka...@gmail.com> 于2020年3月31日周二 下午2:19写道:
> > > > > > > >
> > > > > > > > > Thank you all for the feedback! Sorry for the belated
> reply.
> > > > > > > > >
> > > > > > > > > @Till
> > > > > > > > > I'm +1 for your two ideas and I'd like to move these two
> out
> > of the
> > > > > > > > > scope of this FLIP since the pipelined region scheduling is
> > an
> > > > > > ongoing
> > > > > > > > > work now.
> > > > > > > > > I also agree that we should not make the InstanceID in
> > > > > > > > > TaskExecutorConnection being composed of the ResourceID
> plus
> > a
> > > > > > > > > monotonically increasing value. Thanks a lot for your
> > explanation.
> > > > > > > > >
> > > > > > > > > @Konstantin @Yang
> > > > > > > > > Regarding the PodName of TaskExecutor on K8s, I second
> Yang's
> > > > > > > > > suggestion. It makes sense to me to let user export
> > RESOURCE_ID and
> > > > > > > > > make TM respect it. User needs to guarantee there is no
> > collision for
> > > > > > > > > different TM.
> > > > > > > > >
> > > > > > > > > Best,
> > > > > > > > > Yangze Guo
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On Tue, Mar 31, 2020 at 12:25 AM Steven Wu <
> > stevenz3wu@gmail.com>
> > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > +1 on allowing user defined resourceId for taskmanager
> > > > > > > > > >
> > > > > > > > > > On Sun, Mar 29, 2020 at 7:24 PM Yang Wang <
> > danrtsey.wy@gmail.com>
> > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Hi Konstantin,
> > > > > > > > > > >
> > > > > > > > > > > I think it is a good idea. Currently, our users also
> > report a
> > > > > > > similar
> > > > > > > > > issue
> > > > > > > > > > > with
> > > > > > > > > > > resourceId of standalone cluster. When we start a
> > standalone
> > > > > > > cluster
> > > > > > > > > now,
> > > > > > > > > > > the `TaskManagerRunner` always generates a uuid for the
> > > > > > > resourceId. It
> > > > > > > > > will
> > > > > > > > > > > be used to register to the jobmanager and not
> convenient
> > to match
> > > > > > > with
> > > > > > > > > the
> > > > > > > > > > > real
> > > > > > > > > > > taskmanager, especially in container environment.
> > > > > > > > > > >
> > > > > > > > > > > I think a probably solution is we could support the
> user
> > defined
> > > > > > > > > > > resourceId.
> > > > > > > > > > > We could get it from the environment. For standalone on
> > K8s, we
> > > > > > > could
> > > > > > > > > set
> > > > > > > > > > > the "RESOURCE_ID" env to the pod name so that it is
> > easier to
> > > > > > > match the
> > > > > > > > > > > taskmanager with K8s pod.
> > > > > > > > > > >
> > > > > > > > > > > Moreover, i am afraid we could not set the pod name to
> > the
> > > > > > > resourceId.
> > > > > > > > > I
> > > > > > > > > > > think
> > > > > > > > > > > you could set the "deployment.meta.name". Since the
> pod
> > name is
> > > > > > > > > generated
> > > > > > > > > > > by
> > > > > > > > > > > K8s in the pattern
> > {deployment.meta.nane}-{rc.uuid}-{uuid}. On
> > > > > > the
> > > > > > > > > > > contrary, we
> > > > > > > > > > > will set the resourceId to the pod name.
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > Best,
> > > > > > > > > > > Yang
> > > > > > > > > > >
> > > > > > > > > > > Konstantin Knauf <ko...@ververica.com>
> > 于2020年3月29日周日
> > > > > > > 下午8:06写道:
> > > > > > > > > > >
> > > > > > > > > > > > Hi Yangze, Hi Till,
> > > > > > > > > > > >
> > > > > > > > > > > > thanks you for working on this topic. I believe it
> > will make
> > > > > > > > > debugging
> > > > > > > > > > > > large Apache Flink deployments much more feasible.
> > > > > > > > > > > >
> > > > > > > > > > > > I was wondering whether it would make sense to allow
> > the user
> > > > > > to
> > > > > > > > > specify
> > > > > > > > > > > > the Resource ID in standalone setups?  For example,
> > many users
> > > > > > > still
> > > > > > > > > > > > implicitly use standalone clusters on Kubernetes (the
> > native
> > > > > > > support
> > > > > > > > > is
> > > > > > > > > > > > still experimental) and in these cases it would be
> > interesting
> > > > > > to
> > > > > > > > > also
> > > > > > > > > > > set
> > > > > > > > > > > > the PodName as the ResourceID. What do you think?
> > > > > > > > > > > >
> > > > > > > > > > > > Cheers,
> > > > > > > > > > > >
> > > > > > > > > > > > Kosntantin
> > > > > > > > > > > >
> > > > > > > > > > > > On Thu, Mar 26, 2020 at 6:49 PM Till Rohrmann <
> > > > > > > trohrmann@apache.org>
> > > > > > > > > > > > wrote:
> > > > > > > > > > > >
> > > > > > > > > > > > > Hi Yangze,
> > > > > > > > > > > > >
> > > > > > > > > > > > > thanks for creating this FLIP. I think it is a very
> > good
> > > > > > > > > improvement
> > > > > > > > > > > > > helping our users and ourselves understanding
> better
> > what's
> > > > > > > going
> > > > > > > > > on in
> > > > > > > > > > > > > Flink.
> > > > > > > > > > > > >
> > > > > > > > > > > > > Creating the ResourceIDs with host information/pod
> > name is a
> > > > > > > good
> > > > > > > > > idea.
> > > > > > > > > > > > >
> > > > > > > > > > > > > Also deriving ExecutionGraph IDs from their
> superset
> > ID is a
> > > > > > > good
> > > > > > > > > idea.
> > > > > > > > > > > > >
> > > > > > > > > > > > > The InstanceID is used for fencing purposes. I
> would
> > not make
> > > > > > > it a
> > > > > > > > > > > > > composition of the ResourceID + a monotonically
> > increasing
> > > > > > > number.
> > > > > > > > > The
> > > > > > > > > > > > > problem is that in case of a RM failure the
> > InstanceIDs would
> > > > > > > start
> > > > > > > > > > > from
> > > > > > > > > > > > 0
> > > > > > > > > > > > > again and this could lead to collisions.
> > > > > > > > > > > > >
> > > > > > > > > > > > > Logging more information on how the different
> > runtime IDs are
> > > > > > > > > > > correlated
> > > > > > > > > > > > is
> > > > > > > > > > > > > also a good idea.
> > > > > > > > > > > > >
> > > > > > > > > > > > > Two other ideas for simplifying the ids are the
> > following:
> > > > > > > > > > > > >
> > > > > > > > > > > > > * The SlotRequestID was introduced because the
> > SlotPool was a
> > > > > > > > > separate
> > > > > > > > > > > > > RpcEndpoint a while ago. With this no longer being
> > the case I
> > > > > > > > > think we
> > > > > > > > > > > > > could remove the SlotRequestID and replace it with
> > the
> > > > > > > > > AllocationID.
> > > > > > > > > > > > > * Instead of creating new SlotRequestIDs for multi
> > task slots
> > > > > > > one
> > > > > > > > > could
> > > > > > > > > > > > > derive them from the SlotRequestID used for
> > requesting the
> > > > > > > > > underlying
> > > > > > > > > > > > > AllocatedSlot.
> > > > > > > > > > > > >
> > > > > > > > > > > > > Given that the slot sharing logic will most likely
> be
> > > > > > reworked
> > > > > > > > > with the
> > > > > > > > > > > > > pipelined region scheduling, we might be able to
> > resolve
> > > > > > these
> > > > > > > two
> > > > > > > > > > > points
> > > > > > > > > > > > > as part of the pipelined region scheduling effort.
> > > > > > > > > > > > >
> > > > > > > > > > > > > Cheers,
> > > > > > > > > > > > > Till
> > > > > > > > > > > > >
> > > > > > > > > > > > > On Thu, Mar 26, 2020 at 10:51 AM Yangze Guo <
> > > > > > > karmagyz@gmail.com>
> > > > > > > > > > > wrote:
> > > > > > > > > > > > >
> > > > > > > > > > > > > > Hi everyone,
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > We would like to start a discussion thread on
> > "FLIP-118:
> > > > > > > Improve
> > > > > > > > > > > > > > Flink’s ID system"[1].
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > This FLIP mainly discusses the following issues,
> > target to
> > > > > > > > > enhance
> > > > > > > > > > > the
> > > > > > > > > > > > > > readability of IDs in log and help user to debug
> > in case of
> > > > > > > > > failures:
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > - Enhance the readability of the string literals
> > of IDs.
> > > > > > > Most of
> > > > > > > > > them
> > > > > > > > > > > > > > are hashcodes, e.g. ExecutionAttemptID, which do
> > not
> > > > > > provide
> > > > > > > much
> > > > > > > > > > > > > > meaningful information and are hard to recognize
> > and
> > > > > > compare
> > > > > > > for
> > > > > > > > > > > > > > users.
> > > > > > > > > > > > > > - Log the ID’s lineage information to make
> > debugging more
> > > > > > > > > convenient.
> > > > > > > > > > > > > > Currently, the log fails to always show the
> lineage
> > > > > > > information
> > > > > > > > > > > > > > between IDs. Finding out relationships between
> > entities
> > > > > > > > > identified by
> > > > > > > > > > > > > > given IDs is a common demand, e.g., slot of which
> > > > > > > AllocationID is
> > > > > > > > > > > > > > assigned to satisfy slot request of with
> > SlotRequestID.
> > > > > > > Absence
> > > > > > > > > of
> > > > > > > > > > > > > > such lineage information, it’s impossible to
> track
> > the end
> > > > > > > to end
> > > > > > > > > > > > > > lifecycle of an Execution or a Task now, which
> > makes
> > > > > > > debugging
> > > > > > > > > > > > > > difficult.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Key changes proposed in the FLIP are as follows:
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > - Add location information to distributed
> > components
> > > > > > > > > > > > > > - Add topology information to graph components
> > > > > > > > > > > > > > - Log the ID’s lineage information
> > > > > > > > > > > > > > - Expose the identifier of distributing component
> > to user
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Please find more details in the FLIP wiki
> document
> > [1].
> > > > > > > Looking
> > > > > > > > > > > forward
> > > > > > > > > > > > > to
> > > > > > > > > > > > > > your feedbacks.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > [1]
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > >
> > > > > > >
> > > > > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=148643521
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Best,
> > > > > > > > > > > > > > Yangze Guo
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > --
> > > > > > > > > > > >
> > > > > > > > > > > > Konstantin Knauf | Head of Product
> > > > > > > > > > > >
> > > > > > > > > > > > +49 160 91394525
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > Follow us @VervericaData Ververica <
> > https://www.ververica.com/
> > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > --
> > > > > > > > > > > >
> > > > > > > > > > > > Join Flink Forward <https://flink-forward.org/> -
> The
> > Apache
> > > > > > > Flink
> > > > > > > > > > > > Conference
> > > > > > > > > > > >
> > > > > > > > > > > > Stream Processing | Event Driven | Real Time
> > > > > > > > > > > >
> > > > > > > > > > > > --
> > > > > > > > > > > >
> > > > > > > > > > > > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin,
> > Germany
> > > > > > > > > > > >
> > > > > > > > > > > > --
> > > > > > > > > > > > Ververica GmbH
> > > > > > > > > > > > Registered at Amtsgericht Charlottenburg: HRB 158244
> B
> > > > > > > > > > > > Managing Directors: Timothy Alexander Steinert, Yip
> > Park Tung
> > > > > > > Jason,
> > > > > > > > > Ji
> > > > > > > > > > > > (Tony) Cheng
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > >
> > > > > > >
> > > > > >
> >
>

Re: [DISCUSS] FLIP-118: Improve Flink’s ID system

Posted by Zhu Zhu <re...@gmail.com>.
Thanks for doing this benchmark @Yangze Guo <ka...@gmail.com> .
The result looks promising. So I would +1 to refactor ExecutionAttemptID
and IntermediateResultPartitionID.

Regarding why 'The size of TDD after serialization become smaller than
before',  I guess it's because the new IntermediateResultPartitionIDs can
share the same IntermediateDataSetID, in this way the space of
IntermediateResultPartitionID is a ref (to IntermediateDataSetID) and an
int (index), which is smaller than 2 Longs (AbstractID).

Thanks,
Zhu Zhu

Yangze Guo <ka...@gmail.com> 于2020年4月14日周二 下午3:09写道:

> Hi everyone,
>
> I've investigated the infect with higher parallelism jobs.
>
> The result shows:
> - The size of TDD after serialization become smaller than before.
> While I did not meet any issue with Akka framework when the
> parallelism set to 6000.
> - There was no significant difference regarding the end to end
> schedule time, job runtime, young gc count and total full gc time.
>
> For details, please take a look at
>
> https://docs.google.com/spreadsheets/d/13lt6J29uikWoux79047lkvbGi2fS4ioWHSbvT1kTL2M/edit?usp=sharing
> .
>
> From my perspective, I think it might be ok to refactor
> ExecutionAttemptID and IntermediateResultPartitionID. If you have
> further concerns or think we should make further investigation. Please
> let me know.
>
> Best,
> Yangze Guo
>
> On Mon, Apr 13, 2020 at 1:36 PM Yangze Guo <ka...@gmail.com> wrote:
> >
> > Hi everyone,
> > After an offline discussion with ZhuZhu, I have some comments on this
> > investigation.
> >
> > Regarding the maximum parallelism went from 760 to 685, it may because
> > of that the tasks are not scheduled evenly. The result is inconsistent
> > in multiple experiments. So, this phenomenon would be irrelevant to
> > our changes.
> >
> > I think what we really care about is the framesize for Akka(Should
> > user enlarge it after our change for the same job). The size of TDD
> > after serialization seems to be smaller after change. I don't know the
> > root reason of this phenomenon at the moment. The way I measure it is:
> > ```
> > ByteArrayOutputStream bos = new ByteArrayOutputStream();
> > ObjectOutputStream oos = new ObjectOutputStream(bos);
> > oos.writeObject(deployment);
> > oos.flush();
> > LOG.info("BENCHMARK TDD_SERIAL_SIZE {}.", bos.toByteArray().length);
> > ```
> > Please correct me if I'm wrong.
> >
> > I'll experiment with higher parallelism to see if there is any
> > regression regarding Akka framesize.
> >
> > Regarding the TDD building time, the parallelism in my investigation
> > might be too small. Meanwhile, this time might be influence by many
> > factors. Thus, I'll
> > - experiment with higher parallelism.
> > - measure the time spent from "Starting scheduling" to the last task
> > change state to running.
> >
> > Best,
> > Yangze Guo
> >
> >
> > On Fri, Apr 10, 2020 at 12:53 PM Yangze Guo <ka...@gmail.com> wrote:
> > >
> > > Hi there,
> > >
> > > Sorry for the belated reply. I just make a preliminary investigation
> > > of the infect of refactoring IntermediateResultPartitionID.
> > >
> > > The key change is making it being composed of IntermediateDataSetID
> > > and a partitionNum.
> > > public class IntermediateResultPartitionID {
> > >    private final IntermediateDataSetID intermediateDataSetID;
> > >    private final int partitionNum;
> > > }
> > >
> > > In this benchmark, I use examples/streaming/WordCount.jar as the test
> > > job and run Flink on Yarn. All the configurations are kept default
> > > except for "taskmanager.numberOfTaskSlots", which is set to 2.
> > >
> > > The result shows it does have an impact on performance.
> > > - After this change, the maximum parallelism went from 760 to 685,
> > > which limited by the total number of network buffers. For the same
> > > parallelism, user needs more network buffer than before.
> > > - The netty message "PartitionRequest" and "TaskEventRequest" increase
> > > by 4 bytes. For "PartitionRequest", it means 7% increase.
> > > - The TDD takes longer to construct. With 600 parallelisms, it takes
> > > twice as long to construct TDD than before.
> > >
> > > Details record in
> > >
> https://docs.google.com/spreadsheets/d/13lt6J29uikWoux79047lkvbGi2fS4ioWHSbvT1kTL2M/edit?usp=sharing
> > >
> > > The same issue could happen in ExecutionAttemptID, which will increase
> > > the "PartitionRequest" and "TaskEventRequest" by 8 bytes(subtaskIndex
> > > and attemptNumber). But it may not infect the TDD as much as
> > > IntermediateResultPartitionID, since there is only one
> > > ExecutionAttemptID per TDD.
> > >
> > > After that preliminary investigation, I tend to not refactor
> > > ExecutionAttemptID and IntermediateResultPartitionID at the moment or
> > > treat it as future work.
> > >
> > > WDYT? @ZhuZhu @Till
> > >
> > > Best,
> > > Yangze Guo
> > >
> > > On Wed, Apr 1, 2020 at 11:53 AM Zhu Zhu <re...@gmail.com> wrote:
> > > >
> > > > >> However, it seems the JobVertexID is derived from hashcode ...
> > > > You are right. JobVertexID is widely used and reworking it may
> affect the
> > > > public
> > > > interfaces, e.g. REST api. We can take it as a long term goal and
> exclude
> > > > it from this FLIP.
> > > > This same applies to IntermediateDataSetID, which can be also
> composed of a
> > > > JobID
> > > > and an index as Till proposed.
> > > >
> > > > Thanks,
> > > > Zhu Zhu
> > > >
> > > > Till Rohrmann <tr...@apache.org> 于2020年3月31日周二 下午8:36写道:
> > > >
> > > > > For the IntermediateDataSetID I was just thinking that it might
> actually be
> > > > > interesting to know which job produced the result which, by using
> cluster
> > > > > partitions, could be used across different jobs. Not saying that
> we have to
> > > > > do it, though.
> > > > >
> > > > > A small addition to Zhu Zhu's comment about TDD sizes: For the
> problem with
> > > > > too large TDDs there is already an issue [1]. The current
> suspicion is that
> > > > > the size of TDDs for jobs with a large parallelism can indeed
> become
> > > > > problematic for Flink. Hence, it would be great to investigate the
> impacts
> > > > > of the proposed changes.
> > > > >
> > > > > [1] https://issues.apache.org/jira/browse/FLINK-16069
> > > > >
> > > > > Cheers,
> > > > > Till
> > > > >
> > > > > On Tue, Mar 31, 2020 at 11:50 AM Yangze Guo <ka...@gmail.com>
> wrote:
> > > > >
> > > > > > Hi, Zhu,
> > > > > >
> > > > > > Thanks for the feedback.
> > > > > >
> > > > > > > make JobVertexID a composition of JobID and a topology index
> > > > > > I think it is a good idea. However, it seems the JobVertexID is
> > > > > > derived from hashcode which used to identify them across
> submission.
> > > > > > I'm not familiar with that component though. I prefer to keep
> this
> > > > > > idea out the scope of this FLIP if no one could help us to
> figure it
> > > > > > out.
> > > > > >
> > > > > > > How about we still keep IntermediateDataSetID independent from
> > > > > > JobVertexID,
> > > > > > > but just print the producing relationships in logs? I think
> keeping
> > > > > > > IntermediateDataSetID independent may be better considering
> the cross
> > > > > job
> > > > > > > result usages in interactive query cases.
> > > > > > I think you are right. I'll keep IntermediateDataSetID
> independent
> > > > > > from JobVertexID.
> > > > > >
> > > > > > > The new IDs will become larger with this rework.
> > > > > > Yes, I also have the same concern. Benchmark is necessary, I'll
> try to
> > > > > > provide one during the implementation phase.
> > > > > >
> > > > > >
> > > > > > Best,
> > > > > > Yangze Guo
> > > > > >
> > > > > > On Tue, Mar 31, 2020 at 4:55 PM Zhu Zhu <re...@gmail.com>
> wrote:
> > > > > > >
> > > > > > > Thanks for proposing this improvement Yangze. Big +1 for the
> overall
> > > > > > > proposal. It can help a lot in troubleshooting.
> > > > > > >
> > > > > > > Here are a few questions for it:
> > > > > > > 1. Shall we make JobVertexID a composition of JobID and a
> topology
> > > > > index?
> > > > > > > This would help in the session cluster case, so that we can
> identify
> > > > > > which
> > > > > > > tasks are from which jobs along with the rework of
> ExecutionAttemptID.
> > > > > > >
> > > > > > > 2. You mentioned that "Add the producer info to the string
> literal of
> > > > > > > IntermediateDataSetID". Do you mean to make
> IntermediateDataSetID a
> > > > > > > composition of JobVertexID and a consumer index?
> > > > > > > How about we still keep IntermediateDataSetID independent from
> > > > > > JobVertexID,
> > > > > > > but just print the producing relationships in logs? I think
> keeping
> > > > > > > IntermediateDataSetID independent may be better considering
> the cross
> > > > > job
> > > > > > > result usages in interactive query cases.
> > > > > > >
> > > > > > > 3. The new IDs will become larger with this rework. The
> > > > > > > TaskDeploymentDescriptor can become much larger since it is
> mainly
> > > > > > composed
> > > > > > > of a variety DIs. I'm not sure how much it would be but there
> can be
> > > > > more
> > > > > > > memory and CPU cost for it, and results in more frequent GCs,
> message
> > > > > > size
> > > > > > > exceeding akka frame limits, and a longer blocked time of main
> thread.
> > > > > > > This should not be a problem in most cases but might be a
> problem for
> > > > > > large
> > > > > > > scale jobs. Shall we have an benchmark for it?
> > > > > > >
> > > > > > > Thanks,
> > > > > > > Zhu Zhu
> > > > > > >
> > > > > > > Yangze Guo <ka...@gmail.com> 于2020年3月31日周二 下午2:19写道:
> > > > > > >
> > > > > > > > Thank you all for the feedback! Sorry for the belated reply.
> > > > > > > >
> > > > > > > > @Till
> > > > > > > > I'm +1 for your two ideas and I'd like to move these two out
> of the
> > > > > > > > scope of this FLIP since the pipelined region scheduling is
> an
> > > > > ongoing
> > > > > > > > work now.
> > > > > > > > I also agree that we should not make the InstanceID in
> > > > > > > > TaskExecutorConnection being composed of the ResourceID plus
> a
> > > > > > > > monotonically increasing value. Thanks a lot for your
> explanation.
> > > > > > > >
> > > > > > > > @Konstantin @Yang
> > > > > > > > Regarding the PodName of TaskExecutor on K8s, I second Yang's
> > > > > > > > suggestion. It makes sense to me to let user export
> RESOURCE_ID and
> > > > > > > > make TM respect it. User needs to guarantee there is no
> collision for
> > > > > > > > different TM.
> > > > > > > >
> > > > > > > > Best,
> > > > > > > > Yangze Guo
> > > > > > > >
> > > > > > > >
> > > > > > > > On Tue, Mar 31, 2020 at 12:25 AM Steven Wu <
> stevenz3wu@gmail.com>
> > > > > > wrote:
> > > > > > > > >
> > > > > > > > > +1 on allowing user defined resourceId for taskmanager
> > > > > > > > >
> > > > > > > > > On Sun, Mar 29, 2020 at 7:24 PM Yang Wang <
> danrtsey.wy@gmail.com>
> > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hi Konstantin,
> > > > > > > > > >
> > > > > > > > > > I think it is a good idea. Currently, our users also
> report a
> > > > > > similar
> > > > > > > > issue
> > > > > > > > > > with
> > > > > > > > > > resourceId of standalone cluster. When we start a
> standalone
> > > > > > cluster
> > > > > > > > now,
> > > > > > > > > > the `TaskManagerRunner` always generates a uuid for the
> > > > > > resourceId. It
> > > > > > > > will
> > > > > > > > > > be used to register to the jobmanager and not convenient
> to match
> > > > > > with
> > > > > > > > the
> > > > > > > > > > real
> > > > > > > > > > taskmanager, especially in container environment.
> > > > > > > > > >
> > > > > > > > > > I think a probably solution is we could support the user
> defined
> > > > > > > > > > resourceId.
> > > > > > > > > > We could get it from the environment. For standalone on
> K8s, we
> > > > > > could
> > > > > > > > set
> > > > > > > > > > the "RESOURCE_ID" env to the pod name so that it is
> easier to
> > > > > > match the
> > > > > > > > > > taskmanager with K8s pod.
> > > > > > > > > >
> > > > > > > > > > Moreover, i am afraid we could not set the pod name to
> the
> > > > > > resourceId.
> > > > > > > > I
> > > > > > > > > > think
> > > > > > > > > > you could set the "deployment.meta.name". Since the pod
> name is
> > > > > > > > generated
> > > > > > > > > > by
> > > > > > > > > > K8s in the pattern
> {deployment.meta.nane}-{rc.uuid}-{uuid}. On
> > > > > the
> > > > > > > > > > contrary, we
> > > > > > > > > > will set the resourceId to the pod name.
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > Best,
> > > > > > > > > > Yang
> > > > > > > > > >
> > > > > > > > > > Konstantin Knauf <ko...@ververica.com>
> 于2020年3月29日周日
> > > > > > 下午8:06写道:
> > > > > > > > > >
> > > > > > > > > > > Hi Yangze, Hi Till,
> > > > > > > > > > >
> > > > > > > > > > > thanks you for working on this topic. I believe it
> will make
> > > > > > > > debugging
> > > > > > > > > > > large Apache Flink deployments much more feasible.
> > > > > > > > > > >
> > > > > > > > > > > I was wondering whether it would make sense to allow
> the user
> > > > > to
> > > > > > > > specify
> > > > > > > > > > > the Resource ID in standalone setups?  For example,
> many users
> > > > > > still
> > > > > > > > > > > implicitly use standalone clusters on Kubernetes (the
> native
> > > > > > support
> > > > > > > > is
> > > > > > > > > > > still experimental) and in these cases it would be
> interesting
> > > > > to
> > > > > > > > also
> > > > > > > > > > set
> > > > > > > > > > > the PodName as the ResourceID. What do you think?
> > > > > > > > > > >
> > > > > > > > > > > Cheers,
> > > > > > > > > > >
> > > > > > > > > > > Kosntantin
> > > > > > > > > > >
> > > > > > > > > > > On Thu, Mar 26, 2020 at 6:49 PM Till Rohrmann <
> > > > > > trohrmann@apache.org>
> > > > > > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > > Hi Yangze,
> > > > > > > > > > > >
> > > > > > > > > > > > thanks for creating this FLIP. I think it is a very
> good
> > > > > > > > improvement
> > > > > > > > > > > > helping our users and ourselves understanding better
> what's
> > > > > > going
> > > > > > > > on in
> > > > > > > > > > > > Flink.
> > > > > > > > > > > >
> > > > > > > > > > > > Creating the ResourceIDs with host information/pod
> name is a
> > > > > > good
> > > > > > > > idea.
> > > > > > > > > > > >
> > > > > > > > > > > > Also deriving ExecutionGraph IDs from their superset
> ID is a
> > > > > > good
> > > > > > > > idea.
> > > > > > > > > > > >
> > > > > > > > > > > > The InstanceID is used for fencing purposes. I would
> not make
> > > > > > it a
> > > > > > > > > > > > composition of the ResourceID + a monotonically
> increasing
> > > > > > number.
> > > > > > > > The
> > > > > > > > > > > > problem is that in case of a RM failure the
> InstanceIDs would
> > > > > > start
> > > > > > > > > > from
> > > > > > > > > > > 0
> > > > > > > > > > > > again and this could lead to collisions.
> > > > > > > > > > > >
> > > > > > > > > > > > Logging more information on how the different
> runtime IDs are
> > > > > > > > > > correlated
> > > > > > > > > > > is
> > > > > > > > > > > > also a good idea.
> > > > > > > > > > > >
> > > > > > > > > > > > Two other ideas for simplifying the ids are the
> following:
> > > > > > > > > > > >
> > > > > > > > > > > > * The SlotRequestID was introduced because the
> SlotPool was a
> > > > > > > > separate
> > > > > > > > > > > > RpcEndpoint a while ago. With this no longer being
> the case I
> > > > > > > > think we
> > > > > > > > > > > > could remove the SlotRequestID and replace it with
> the
> > > > > > > > AllocationID.
> > > > > > > > > > > > * Instead of creating new SlotRequestIDs for multi
> task slots
> > > > > > one
> > > > > > > > could
> > > > > > > > > > > > derive them from the SlotRequestID used for
> requesting the
> > > > > > > > underlying
> > > > > > > > > > > > AllocatedSlot.
> > > > > > > > > > > >
> > > > > > > > > > > > Given that the slot sharing logic will most likely be
> > > > > reworked
> > > > > > > > with the
> > > > > > > > > > > > pipelined region scheduling, we might be able to
> resolve
> > > > > these
> > > > > > two
> > > > > > > > > > points
> > > > > > > > > > > > as part of the pipelined region scheduling effort.
> > > > > > > > > > > >
> > > > > > > > > > > > Cheers,
> > > > > > > > > > > > Till
> > > > > > > > > > > >
> > > > > > > > > > > > On Thu, Mar 26, 2020 at 10:51 AM Yangze Guo <
> > > > > > karmagyz@gmail.com>
> > > > > > > > > > wrote:
> > > > > > > > > > > >
> > > > > > > > > > > > > Hi everyone,
> > > > > > > > > > > > >
> > > > > > > > > > > > > We would like to start a discussion thread on
> "FLIP-118:
> > > > > > Improve
> > > > > > > > > > > > > Flink’s ID system"[1].
> > > > > > > > > > > > >
> > > > > > > > > > > > > This FLIP mainly discusses the following issues,
> target to
> > > > > > > > enhance
> > > > > > > > > > the
> > > > > > > > > > > > > readability of IDs in log and help user to debug
> in case of
> > > > > > > > failures:
> > > > > > > > > > > > >
> > > > > > > > > > > > > - Enhance the readability of the string literals
> of IDs.
> > > > > > Most of
> > > > > > > > them
> > > > > > > > > > > > > are hashcodes, e.g. ExecutionAttemptID, which do
> not
> > > > > provide
> > > > > > much
> > > > > > > > > > > > > meaningful information and are hard to recognize
> and
> > > > > compare
> > > > > > for
> > > > > > > > > > > > > users.
> > > > > > > > > > > > > - Log the ID’s lineage information to make
> debugging more
> > > > > > > > convenient.
> > > > > > > > > > > > > Currently, the log fails to always show the lineage
> > > > > > information
> > > > > > > > > > > > > between IDs. Finding out relationships between
> entities
> > > > > > > > identified by
> > > > > > > > > > > > > given IDs is a common demand, e.g., slot of which
> > > > > > AllocationID is
> > > > > > > > > > > > > assigned to satisfy slot request of with
> SlotRequestID.
> > > > > > Absence
> > > > > > > > of
> > > > > > > > > > > > > such lineage information, it’s impossible to track
> the end
> > > > > > to end
> > > > > > > > > > > > > lifecycle of an Execution or a Task now, which
> makes
> > > > > > debugging
> > > > > > > > > > > > > difficult.
> > > > > > > > > > > > >
> > > > > > > > > > > > > Key changes proposed in the FLIP are as follows:
> > > > > > > > > > > > >
> > > > > > > > > > > > > - Add location information to distributed
> components
> > > > > > > > > > > > > - Add topology information to graph components
> > > > > > > > > > > > > - Log the ID’s lineage information
> > > > > > > > > > > > > - Expose the identifier of distributing component
> to user
> > > > > > > > > > > > >
> > > > > > > > > > > > > Please find more details in the FLIP wiki document
> [1].
> > > > > > Looking
> > > > > > > > > > forward
> > > > > > > > > > > > to
> > > > > > > > > > > > > your feedbacks.
> > > > > > > > > > > > >
> > > > > > > > > > > > > [1]
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > >
> > > > > >
> > > > >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=148643521
> > > > > > > > > > > > >
> > > > > > > > > > > > > Best,
> > > > > > > > > > > > > Yangze Guo
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > --
> > > > > > > > > > >
> > > > > > > > > > > Konstantin Knauf | Head of Product
> > > > > > > > > > >
> > > > > > > > > > > +49 160 91394525
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > Follow us @VervericaData Ververica <
> https://www.ververica.com/
> > > > > >
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > --
> > > > > > > > > > >
> > > > > > > > > > > Join Flink Forward <https://flink-forward.org/> - The
> Apache
> > > > > > Flink
> > > > > > > > > > > Conference
> > > > > > > > > > >
> > > > > > > > > > > Stream Processing | Event Driven | Real Time
> > > > > > > > > > >
> > > > > > > > > > > --
> > > > > > > > > > >
> > > > > > > > > > > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin,
> Germany
> > > > > > > > > > >
> > > > > > > > > > > --
> > > > > > > > > > > Ververica GmbH
> > > > > > > > > > > Registered at Amtsgericht Charlottenburg: HRB 158244 B
> > > > > > > > > > > Managing Directors: Timothy Alexander Steinert, Yip
> Park Tung
> > > > > > Jason,
> > > > > > > > Ji
> > > > > > > > > > > (Tony) Cheng
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > >
> > > > > >
> > > > >
>

Re: [DISCUSS] FLIP-118: Improve Flink’s ID system

Posted by Yangze Guo <ka...@gmail.com>.
Hi everyone,

I've investigated the infect with higher parallelism jobs.

The result shows:
- The size of TDD after serialization become smaller than before.
While I did not meet any issue with Akka framework when the
parallelism set to 6000.
- There was no significant difference regarding the end to end
schedule time, job runtime, young gc count and total full gc time.

For details, please take a look at
https://docs.google.com/spreadsheets/d/13lt6J29uikWoux79047lkvbGi2fS4ioWHSbvT1kTL2M/edit?usp=sharing.

From my perspective, I think it might be ok to refactor
ExecutionAttemptID and IntermediateResultPartitionID. If you have
further concerns or think we should make further investigation. Please
let me know.

Best,
Yangze Guo

On Mon, Apr 13, 2020 at 1:36 PM Yangze Guo <ka...@gmail.com> wrote:
>
> Hi everyone,
> After an offline discussion with ZhuZhu, I have some comments on this
> investigation.
>
> Regarding the maximum parallelism went from 760 to 685, it may because
> of that the tasks are not scheduled evenly. The result is inconsistent
> in multiple experiments. So, this phenomenon would be irrelevant to
> our changes.
>
> I think what we really care about is the framesize for Akka(Should
> user enlarge it after our change for the same job). The size of TDD
> after serialization seems to be smaller after change. I don't know the
> root reason of this phenomenon at the moment. The way I measure it is:
> ```
> ByteArrayOutputStream bos = new ByteArrayOutputStream();
> ObjectOutputStream oos = new ObjectOutputStream(bos);
> oos.writeObject(deployment);
> oos.flush();
> LOG.info("BENCHMARK TDD_SERIAL_SIZE {}.", bos.toByteArray().length);
> ```
> Please correct me if I'm wrong.
>
> I'll experiment with higher parallelism to see if there is any
> regression regarding Akka framesize.
>
> Regarding the TDD building time, the parallelism in my investigation
> might be too small. Meanwhile, this time might be influence by many
> factors. Thus, I'll
> - experiment with higher parallelism.
> - measure the time spent from "Starting scheduling" to the last task
> change state to running.
>
> Best,
> Yangze Guo
>
>
> On Fri, Apr 10, 2020 at 12:53 PM Yangze Guo <ka...@gmail.com> wrote:
> >
> > Hi there,
> >
> > Sorry for the belated reply. I just make a preliminary investigation
> > of the infect of refactoring IntermediateResultPartitionID.
> >
> > The key change is making it being composed of IntermediateDataSetID
> > and a partitionNum.
> > public class IntermediateResultPartitionID {
> >    private final IntermediateDataSetID intermediateDataSetID;
> >    private final int partitionNum;
> > }
> >
> > In this benchmark, I use examples/streaming/WordCount.jar as the test
> > job and run Flink on Yarn. All the configurations are kept default
> > except for "taskmanager.numberOfTaskSlots", which is set to 2.
> >
> > The result shows it does have an impact on performance.
> > - After this change, the maximum parallelism went from 760 to 685,
> > which limited by the total number of network buffers. For the same
> > parallelism, user needs more network buffer than before.
> > - The netty message "PartitionRequest" and "TaskEventRequest" increase
> > by 4 bytes. For "PartitionRequest", it means 7% increase.
> > - The TDD takes longer to construct. With 600 parallelisms, it takes
> > twice as long to construct TDD than before.
> >
> > Details record in
> > https://docs.google.com/spreadsheets/d/13lt6J29uikWoux79047lkvbGi2fS4ioWHSbvT1kTL2M/edit?usp=sharing
> >
> > The same issue could happen in ExecutionAttemptID, which will increase
> > the "PartitionRequest" and "TaskEventRequest" by 8 bytes(subtaskIndex
> > and attemptNumber). But it may not infect the TDD as much as
> > IntermediateResultPartitionID, since there is only one
> > ExecutionAttemptID per TDD.
> >
> > After that preliminary investigation, I tend to not refactor
> > ExecutionAttemptID and IntermediateResultPartitionID at the moment or
> > treat it as future work.
> >
> > WDYT? @ZhuZhu @Till
> >
> > Best,
> > Yangze Guo
> >
> > On Wed, Apr 1, 2020 at 11:53 AM Zhu Zhu <re...@gmail.com> wrote:
> > >
> > > >> However, it seems the JobVertexID is derived from hashcode ...
> > > You are right. JobVertexID is widely used and reworking it may affect the
> > > public
> > > interfaces, e.g. REST api. We can take it as a long term goal and exclude
> > > it from this FLIP.
> > > This same applies to IntermediateDataSetID, which can be also composed of a
> > > JobID
> > > and an index as Till proposed.
> > >
> > > Thanks,
> > > Zhu Zhu
> > >
> > > Till Rohrmann <tr...@apache.org> 于2020年3月31日周二 下午8:36写道:
> > >
> > > > For the IntermediateDataSetID I was just thinking that it might actually be
> > > > interesting to know which job produced the result which, by using cluster
> > > > partitions, could be used across different jobs. Not saying that we have to
> > > > do it, though.
> > > >
> > > > A small addition to Zhu Zhu's comment about TDD sizes: For the problem with
> > > > too large TDDs there is already an issue [1]. The current suspicion is that
> > > > the size of TDDs for jobs with a large parallelism can indeed become
> > > > problematic for Flink. Hence, it would be great to investigate the impacts
> > > > of the proposed changes.
> > > >
> > > > [1] https://issues.apache.org/jira/browse/FLINK-16069
> > > >
> > > > Cheers,
> > > > Till
> > > >
> > > > On Tue, Mar 31, 2020 at 11:50 AM Yangze Guo <ka...@gmail.com> wrote:
> > > >
> > > > > Hi, Zhu,
> > > > >
> > > > > Thanks for the feedback.
> > > > >
> > > > > > make JobVertexID a composition of JobID and a topology index
> > > > > I think it is a good idea. However, it seems the JobVertexID is
> > > > > derived from hashcode which used to identify them across submission.
> > > > > I'm not familiar with that component though. I prefer to keep this
> > > > > idea out the scope of this FLIP if no one could help us to figure it
> > > > > out.
> > > > >
> > > > > > How about we still keep IntermediateDataSetID independent from
> > > > > JobVertexID,
> > > > > > but just print the producing relationships in logs? I think keeping
> > > > > > IntermediateDataSetID independent may be better considering the cross
> > > > job
> > > > > > result usages in interactive query cases.
> > > > > I think you are right. I'll keep IntermediateDataSetID independent
> > > > > from JobVertexID.
> > > > >
> > > > > > The new IDs will become larger with this rework.
> > > > > Yes, I also have the same concern. Benchmark is necessary, I'll try to
> > > > > provide one during the implementation phase.
> > > > >
> > > > >
> > > > > Best,
> > > > > Yangze Guo
> > > > >
> > > > > On Tue, Mar 31, 2020 at 4:55 PM Zhu Zhu <re...@gmail.com> wrote:
> > > > > >
> > > > > > Thanks for proposing this improvement Yangze. Big +1 for the overall
> > > > > > proposal. It can help a lot in troubleshooting.
> > > > > >
> > > > > > Here are a few questions for it:
> > > > > > 1. Shall we make JobVertexID a composition of JobID and a topology
> > > > index?
> > > > > > This would help in the session cluster case, so that we can identify
> > > > > which
> > > > > > tasks are from which jobs along with the rework of ExecutionAttemptID.
> > > > > >
> > > > > > 2. You mentioned that "Add the producer info to the string literal of
> > > > > > IntermediateDataSetID". Do you mean to make IntermediateDataSetID a
> > > > > > composition of JobVertexID and a consumer index?
> > > > > > How about we still keep IntermediateDataSetID independent from
> > > > > JobVertexID,
> > > > > > but just print the producing relationships in logs? I think keeping
> > > > > > IntermediateDataSetID independent may be better considering the cross
> > > > job
> > > > > > result usages in interactive query cases.
> > > > > >
> > > > > > 3. The new IDs will become larger with this rework. The
> > > > > > TaskDeploymentDescriptor can become much larger since it is mainly
> > > > > composed
> > > > > > of a variety DIs. I'm not sure how much it would be but there can be
> > > > more
> > > > > > memory and CPU cost for it, and results in more frequent GCs, message
> > > > > size
> > > > > > exceeding akka frame limits, and a longer blocked time of main thread.
> > > > > > This should not be a problem in most cases but might be a problem for
> > > > > large
> > > > > > scale jobs. Shall we have an benchmark for it?
> > > > > >
> > > > > > Thanks,
> > > > > > Zhu Zhu
> > > > > >
> > > > > > Yangze Guo <ka...@gmail.com> 于2020年3月31日周二 下午2:19写道:
> > > > > >
> > > > > > > Thank you all for the feedback! Sorry for the belated reply.
> > > > > > >
> > > > > > > @Till
> > > > > > > I'm +1 for your two ideas and I'd like to move these two out of the
> > > > > > > scope of this FLIP since the pipelined region scheduling is an
> > > > ongoing
> > > > > > > work now.
> > > > > > > I also agree that we should not make the InstanceID in
> > > > > > > TaskExecutorConnection being composed of the ResourceID plus a
> > > > > > > monotonically increasing value. Thanks a lot for your explanation.
> > > > > > >
> > > > > > > @Konstantin @Yang
> > > > > > > Regarding the PodName of TaskExecutor on K8s, I second Yang's
> > > > > > > suggestion. It makes sense to me to let user export RESOURCE_ID and
> > > > > > > make TM respect it. User needs to guarantee there is no collision for
> > > > > > > different TM.
> > > > > > >
> > > > > > > Best,
> > > > > > > Yangze Guo
> > > > > > >
> > > > > > >
> > > > > > > On Tue, Mar 31, 2020 at 12:25 AM Steven Wu <st...@gmail.com>
> > > > > wrote:
> > > > > > > >
> > > > > > > > +1 on allowing user defined resourceId for taskmanager
> > > > > > > >
> > > > > > > > On Sun, Mar 29, 2020 at 7:24 PM Yang Wang <da...@gmail.com>
> > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi Konstantin,
> > > > > > > > >
> > > > > > > > > I think it is a good idea. Currently, our users also report a
> > > > > similar
> > > > > > > issue
> > > > > > > > > with
> > > > > > > > > resourceId of standalone cluster. When we start a standalone
> > > > > cluster
> > > > > > > now,
> > > > > > > > > the `TaskManagerRunner` always generates a uuid for the
> > > > > resourceId. It
> > > > > > > will
> > > > > > > > > be used to register to the jobmanager and not convenient to match
> > > > > with
> > > > > > > the
> > > > > > > > > real
> > > > > > > > > taskmanager, especially in container environment.
> > > > > > > > >
> > > > > > > > > I think a probably solution is we could support the user defined
> > > > > > > > > resourceId.
> > > > > > > > > We could get it from the environment. For standalone on K8s, we
> > > > > could
> > > > > > > set
> > > > > > > > > the "RESOURCE_ID" env to the pod name so that it is easier to
> > > > > match the
> > > > > > > > > taskmanager with K8s pod.
> > > > > > > > >
> > > > > > > > > Moreover, i am afraid we could not set the pod name to the
> > > > > resourceId.
> > > > > > > I
> > > > > > > > > think
> > > > > > > > > you could set the "deployment.meta.name". Since the pod name is
> > > > > > > generated
> > > > > > > > > by
> > > > > > > > > K8s in the pattern {deployment.meta.nane}-{rc.uuid}-{uuid}. On
> > > > the
> > > > > > > > > contrary, we
> > > > > > > > > will set the resourceId to the pod name.
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > Best,
> > > > > > > > > Yang
> > > > > > > > >
> > > > > > > > > Konstantin Knauf <ko...@ververica.com> 于2020年3月29日周日
> > > > > 下午8:06写道:
> > > > > > > > >
> > > > > > > > > > Hi Yangze, Hi Till,
> > > > > > > > > >
> > > > > > > > > > thanks you for working on this topic. I believe it will make
> > > > > > > debugging
> > > > > > > > > > large Apache Flink deployments much more feasible.
> > > > > > > > > >
> > > > > > > > > > I was wondering whether it would make sense to allow the user
> > > > to
> > > > > > > specify
> > > > > > > > > > the Resource ID in standalone setups?  For example, many users
> > > > > still
> > > > > > > > > > implicitly use standalone clusters on Kubernetes (the native
> > > > > support
> > > > > > > is
> > > > > > > > > > still experimental) and in these cases it would be interesting
> > > > to
> > > > > > > also
> > > > > > > > > set
> > > > > > > > > > the PodName as the ResourceID. What do you think?
> > > > > > > > > >
> > > > > > > > > > Cheers,
> > > > > > > > > >
> > > > > > > > > > Kosntantin
> > > > > > > > > >
> > > > > > > > > > On Thu, Mar 26, 2020 at 6:49 PM Till Rohrmann <
> > > > > trohrmann@apache.org>
> > > > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Hi Yangze,
> > > > > > > > > > >
> > > > > > > > > > > thanks for creating this FLIP. I think it is a very good
> > > > > > > improvement
> > > > > > > > > > > helping our users and ourselves understanding better what's
> > > > > going
> > > > > > > on in
> > > > > > > > > > > Flink.
> > > > > > > > > > >
> > > > > > > > > > > Creating the ResourceIDs with host information/pod name is a
> > > > > good
> > > > > > > idea.
> > > > > > > > > > >
> > > > > > > > > > > Also deriving ExecutionGraph IDs from their superset ID is a
> > > > > good
> > > > > > > idea.
> > > > > > > > > > >
> > > > > > > > > > > The InstanceID is used for fencing purposes. I would not make
> > > > > it a
> > > > > > > > > > > composition of the ResourceID + a monotonically increasing
> > > > > number.
> > > > > > > The
> > > > > > > > > > > problem is that in case of a RM failure the InstanceIDs would
> > > > > start
> > > > > > > > > from
> > > > > > > > > > 0
> > > > > > > > > > > again and this could lead to collisions.
> > > > > > > > > > >
> > > > > > > > > > > Logging more information on how the different runtime IDs are
> > > > > > > > > correlated
> > > > > > > > > > is
> > > > > > > > > > > also a good idea.
> > > > > > > > > > >
> > > > > > > > > > > Two other ideas for simplifying the ids are the following:
> > > > > > > > > > >
> > > > > > > > > > > * The SlotRequestID was introduced because the SlotPool was a
> > > > > > > separate
> > > > > > > > > > > RpcEndpoint a while ago. With this no longer being the case I
> > > > > > > think we
> > > > > > > > > > > could remove the SlotRequestID and replace it with the
> > > > > > > AllocationID.
> > > > > > > > > > > * Instead of creating new SlotRequestIDs for multi task slots
> > > > > one
> > > > > > > could
> > > > > > > > > > > derive them from the SlotRequestID used for requesting the
> > > > > > > underlying
> > > > > > > > > > > AllocatedSlot.
> > > > > > > > > > >
> > > > > > > > > > > Given that the slot sharing logic will most likely be
> > > > reworked
> > > > > > > with the
> > > > > > > > > > > pipelined region scheduling, we might be able to resolve
> > > > these
> > > > > two
> > > > > > > > > points
> > > > > > > > > > > as part of the pipelined region scheduling effort.
> > > > > > > > > > >
> > > > > > > > > > > Cheers,
> > > > > > > > > > > Till
> > > > > > > > > > >
> > > > > > > > > > > On Thu, Mar 26, 2020 at 10:51 AM Yangze Guo <
> > > > > karmagyz@gmail.com>
> > > > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > > Hi everyone,
> > > > > > > > > > > >
> > > > > > > > > > > > We would like to start a discussion thread on "FLIP-118:
> > > > > Improve
> > > > > > > > > > > > Flink’s ID system"[1].
> > > > > > > > > > > >
> > > > > > > > > > > > This FLIP mainly discusses the following issues, target to
> > > > > > > enhance
> > > > > > > > > the
> > > > > > > > > > > > readability of IDs in log and help user to debug in case of
> > > > > > > failures:
> > > > > > > > > > > >
> > > > > > > > > > > > - Enhance the readability of the string literals of IDs.
> > > > > Most of
> > > > > > > them
> > > > > > > > > > > > are hashcodes, e.g. ExecutionAttemptID, which do not
> > > > provide
> > > > > much
> > > > > > > > > > > > meaningful information and are hard to recognize and
> > > > compare
> > > > > for
> > > > > > > > > > > > users.
> > > > > > > > > > > > - Log the ID’s lineage information to make debugging more
> > > > > > > convenient.
> > > > > > > > > > > > Currently, the log fails to always show the lineage
> > > > > information
> > > > > > > > > > > > between IDs. Finding out relationships between entities
> > > > > > > identified by
> > > > > > > > > > > > given IDs is a common demand, e.g., slot of which
> > > > > AllocationID is
> > > > > > > > > > > > assigned to satisfy slot request of with SlotRequestID.
> > > > > Absence
> > > > > > > of
> > > > > > > > > > > > such lineage information, it’s impossible to track the end
> > > > > to end
> > > > > > > > > > > > lifecycle of an Execution or a Task now, which makes
> > > > > debugging
> > > > > > > > > > > > difficult.
> > > > > > > > > > > >
> > > > > > > > > > > > Key changes proposed in the FLIP are as follows:
> > > > > > > > > > > >
> > > > > > > > > > > > - Add location information to distributed components
> > > > > > > > > > > > - Add topology information to graph components
> > > > > > > > > > > > - Log the ID’s lineage information
> > > > > > > > > > > > - Expose the identifier of distributing component to user
> > > > > > > > > > > >
> > > > > > > > > > > > Please find more details in the FLIP wiki document [1].
> > > > > Looking
> > > > > > > > > forward
> > > > > > > > > > > to
> > > > > > > > > > > > your feedbacks.
> > > > > > > > > > > >
> > > > > > > > > > > > [1]
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > >
> > > > >
> > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=148643521
> > > > > > > > > > > >
> > > > > > > > > > > > Best,
> > > > > > > > > > > > Yangze Guo
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > --
> > > > > > > > > >
> > > > > > > > > > Konstantin Knauf | Head of Product
> > > > > > > > > >
> > > > > > > > > > +49 160 91394525
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > Follow us @VervericaData Ververica <https://www.ververica.com/
> > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > --
> > > > > > > > > >
> > > > > > > > > > Join Flink Forward <https://flink-forward.org/> - The Apache
> > > > > Flink
> > > > > > > > > > Conference
> > > > > > > > > >
> > > > > > > > > > Stream Processing | Event Driven | Real Time
> > > > > > > > > >
> > > > > > > > > > --
> > > > > > > > > >
> > > > > > > > > > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
> > > > > > > > > >
> > > > > > > > > > --
> > > > > > > > > > Ververica GmbH
> > > > > > > > > > Registered at Amtsgericht Charlottenburg: HRB 158244 B
> > > > > > > > > > Managing Directors: Timothy Alexander Steinert, Yip Park Tung
> > > > > Jason,
> > > > > > > Ji
> > > > > > > > > > (Tony) Cheng
> > > > > > > > > >
> > > > > > > > >
> > > > > > >
> > > > >
> > > >

Re: [DISCUSS] FLIP-118: Improve Flink’s ID system

Posted by Yangze Guo <ka...@gmail.com>.
Hi everyone,
After an offline discussion with ZhuZhu, I have some comments on this
investigation.

Regarding the maximum parallelism went from 760 to 685, it may because
of that the tasks are not scheduled evenly. The result is inconsistent
in multiple experiments. So, this phenomenon would be irrelevant to
our changes.

I think what we really care about is the framesize for Akka(Should
user enlarge it after our change for the same job). The size of TDD
after serialization seems to be smaller after change. I don't know the
root reason of this phenomenon at the moment. The way I measure it is:
```
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(bos);
oos.writeObject(deployment);
oos.flush();
LOG.info("BENCHMARK TDD_SERIAL_SIZE {}.", bos.toByteArray().length);
```
Please correct me if I'm wrong.

I'll experiment with higher parallelism to see if there is any
regression regarding Akka framesize.

Regarding the TDD building time, the parallelism in my investigation
might be too small. Meanwhile, this time might be influence by many
factors. Thus, I'll
- experiment with higher parallelism.
- measure the time spent from "Starting scheduling" to the last task
change state to running.

Best,
Yangze Guo


On Fri, Apr 10, 2020 at 12:53 PM Yangze Guo <ka...@gmail.com> wrote:
>
> Hi there,
>
> Sorry for the belated reply. I just make a preliminary investigation
> of the infect of refactoring IntermediateResultPartitionID.
>
> The key change is making it being composed of IntermediateDataSetID
> and a partitionNum.
> public class IntermediateResultPartitionID {
>    private final IntermediateDataSetID intermediateDataSetID;
>    private final int partitionNum;
> }
>
> In this benchmark, I use examples/streaming/WordCount.jar as the test
> job and run Flink on Yarn. All the configurations are kept default
> except for "taskmanager.numberOfTaskSlots", which is set to 2.
>
> The result shows it does have an impact on performance.
> - After this change, the maximum parallelism went from 760 to 685,
> which limited by the total number of network buffers. For the same
> parallelism, user needs more network buffer than before.
> - The netty message "PartitionRequest" and "TaskEventRequest" increase
> by 4 bytes. For "PartitionRequest", it means 7% increase.
> - The TDD takes longer to construct. With 600 parallelisms, it takes
> twice as long to construct TDD than before.
>
> Details record in
> https://docs.google.com/spreadsheets/d/13lt6J29uikWoux79047lkvbGi2fS4ioWHSbvT1kTL2M/edit?usp=sharing
>
> The same issue could happen in ExecutionAttemptID, which will increase
> the "PartitionRequest" and "TaskEventRequest" by 8 bytes(subtaskIndex
> and attemptNumber). But it may not infect the TDD as much as
> IntermediateResultPartitionID, since there is only one
> ExecutionAttemptID per TDD.
>
> After that preliminary investigation, I tend to not refactor
> ExecutionAttemptID and IntermediateResultPartitionID at the moment or
> treat it as future work.
>
> WDYT? @ZhuZhu @Till
>
> Best,
> Yangze Guo
>
> On Wed, Apr 1, 2020 at 11:53 AM Zhu Zhu <re...@gmail.com> wrote:
> >
> > >> However, it seems the JobVertexID is derived from hashcode ...
> > You are right. JobVertexID is widely used and reworking it may affect the
> > public
> > interfaces, e.g. REST api. We can take it as a long term goal and exclude
> > it from this FLIP.
> > This same applies to IntermediateDataSetID, which can be also composed of a
> > JobID
> > and an index as Till proposed.
> >
> > Thanks,
> > Zhu Zhu
> >
> > Till Rohrmann <tr...@apache.org> 于2020年3月31日周二 下午8:36写道:
> >
> > > For the IntermediateDataSetID I was just thinking that it might actually be
> > > interesting to know which job produced the result which, by using cluster
> > > partitions, could be used across different jobs. Not saying that we have to
> > > do it, though.
> > >
> > > A small addition to Zhu Zhu's comment about TDD sizes: For the problem with
> > > too large TDDs there is already an issue [1]. The current suspicion is that
> > > the size of TDDs for jobs with a large parallelism can indeed become
> > > problematic for Flink. Hence, it would be great to investigate the impacts
> > > of the proposed changes.
> > >
> > > [1] https://issues.apache.org/jira/browse/FLINK-16069
> > >
> > > Cheers,
> > > Till
> > >
> > > On Tue, Mar 31, 2020 at 11:50 AM Yangze Guo <ka...@gmail.com> wrote:
> > >
> > > > Hi, Zhu,
> > > >
> > > > Thanks for the feedback.
> > > >
> > > > > make JobVertexID a composition of JobID and a topology index
> > > > I think it is a good idea. However, it seems the JobVertexID is
> > > > derived from hashcode which used to identify them across submission.
> > > > I'm not familiar with that component though. I prefer to keep this
> > > > idea out the scope of this FLIP if no one could help us to figure it
> > > > out.
> > > >
> > > > > How about we still keep IntermediateDataSetID independent from
> > > > JobVertexID,
> > > > > but just print the producing relationships in logs? I think keeping
> > > > > IntermediateDataSetID independent may be better considering the cross
> > > job
> > > > > result usages in interactive query cases.
> > > > I think you are right. I'll keep IntermediateDataSetID independent
> > > > from JobVertexID.
> > > >
> > > > > The new IDs will become larger with this rework.
> > > > Yes, I also have the same concern. Benchmark is necessary, I'll try to
> > > > provide one during the implementation phase.
> > > >
> > > >
> > > > Best,
> > > > Yangze Guo
> > > >
> > > > On Tue, Mar 31, 2020 at 4:55 PM Zhu Zhu <re...@gmail.com> wrote:
> > > > >
> > > > > Thanks for proposing this improvement Yangze. Big +1 for the overall
> > > > > proposal. It can help a lot in troubleshooting.
> > > > >
> > > > > Here are a few questions for it:
> > > > > 1. Shall we make JobVertexID a composition of JobID and a topology
> > > index?
> > > > > This would help in the session cluster case, so that we can identify
> > > > which
> > > > > tasks are from which jobs along with the rework of ExecutionAttemptID.
> > > > >
> > > > > 2. You mentioned that "Add the producer info to the string literal of
> > > > > IntermediateDataSetID". Do you mean to make IntermediateDataSetID a
> > > > > composition of JobVertexID and a consumer index?
> > > > > How about we still keep IntermediateDataSetID independent from
> > > > JobVertexID,
> > > > > but just print the producing relationships in logs? I think keeping
> > > > > IntermediateDataSetID independent may be better considering the cross
> > > job
> > > > > result usages in interactive query cases.
> > > > >
> > > > > 3. The new IDs will become larger with this rework. The
> > > > > TaskDeploymentDescriptor can become much larger since it is mainly
> > > > composed
> > > > > of a variety DIs. I'm not sure how much it would be but there can be
> > > more
> > > > > memory and CPU cost for it, and results in more frequent GCs, message
> > > > size
> > > > > exceeding akka frame limits, and a longer blocked time of main thread.
> > > > > This should not be a problem in most cases but might be a problem for
> > > > large
> > > > > scale jobs. Shall we have an benchmark for it?
> > > > >
> > > > > Thanks,
> > > > > Zhu Zhu
> > > > >
> > > > > Yangze Guo <ka...@gmail.com> 于2020年3月31日周二 下午2:19写道:
> > > > >
> > > > > > Thank you all for the feedback! Sorry for the belated reply.
> > > > > >
> > > > > > @Till
> > > > > > I'm +1 for your two ideas and I'd like to move these two out of the
> > > > > > scope of this FLIP since the pipelined region scheduling is an
> > > ongoing
> > > > > > work now.
> > > > > > I also agree that we should not make the InstanceID in
> > > > > > TaskExecutorConnection being composed of the ResourceID plus a
> > > > > > monotonically increasing value. Thanks a lot for your explanation.
> > > > > >
> > > > > > @Konstantin @Yang
> > > > > > Regarding the PodName of TaskExecutor on K8s, I second Yang's
> > > > > > suggestion. It makes sense to me to let user export RESOURCE_ID and
> > > > > > make TM respect it. User needs to guarantee there is no collision for
> > > > > > different TM.
> > > > > >
> > > > > > Best,
> > > > > > Yangze Guo
> > > > > >
> > > > > >
> > > > > > On Tue, Mar 31, 2020 at 12:25 AM Steven Wu <st...@gmail.com>
> > > > wrote:
> > > > > > >
> > > > > > > +1 on allowing user defined resourceId for taskmanager
> > > > > > >
> > > > > > > On Sun, Mar 29, 2020 at 7:24 PM Yang Wang <da...@gmail.com>
> > > > wrote:
> > > > > > >
> > > > > > > > Hi Konstantin,
> > > > > > > >
> > > > > > > > I think it is a good idea. Currently, our users also report a
> > > > similar
> > > > > > issue
> > > > > > > > with
> > > > > > > > resourceId of standalone cluster. When we start a standalone
> > > > cluster
> > > > > > now,
> > > > > > > > the `TaskManagerRunner` always generates a uuid for the
> > > > resourceId. It
> > > > > > will
> > > > > > > > be used to register to the jobmanager and not convenient to match
> > > > with
> > > > > > the
> > > > > > > > real
> > > > > > > > taskmanager, especially in container environment.
> > > > > > > >
> > > > > > > > I think a probably solution is we could support the user defined
> > > > > > > > resourceId.
> > > > > > > > We could get it from the environment. For standalone on K8s, we
> > > > could
> > > > > > set
> > > > > > > > the "RESOURCE_ID" env to the pod name so that it is easier to
> > > > match the
> > > > > > > > taskmanager with K8s pod.
> > > > > > > >
> > > > > > > > Moreover, i am afraid we could not set the pod name to the
> > > > resourceId.
> > > > > > I
> > > > > > > > think
> > > > > > > > you could set the "deployment.meta.name". Since the pod name is
> > > > > > generated
> > > > > > > > by
> > > > > > > > K8s in the pattern {deployment.meta.nane}-{rc.uuid}-{uuid}. On
> > > the
> > > > > > > > contrary, we
> > > > > > > > will set the resourceId to the pod name.
> > > > > > > >
> > > > > > > >
> > > > > > > > Best,
> > > > > > > > Yang
> > > > > > > >
> > > > > > > > Konstantin Knauf <ko...@ververica.com> 于2020年3月29日周日
> > > > 下午8:06写道:
> > > > > > > >
> > > > > > > > > Hi Yangze, Hi Till,
> > > > > > > > >
> > > > > > > > > thanks you for working on this topic. I believe it will make
> > > > > > debugging
> > > > > > > > > large Apache Flink deployments much more feasible.
> > > > > > > > >
> > > > > > > > > I was wondering whether it would make sense to allow the user
> > > to
> > > > > > specify
> > > > > > > > > the Resource ID in standalone setups?  For example, many users
> > > > still
> > > > > > > > > implicitly use standalone clusters on Kubernetes (the native
> > > > support
> > > > > > is
> > > > > > > > > still experimental) and in these cases it would be interesting
> > > to
> > > > > > also
> > > > > > > > set
> > > > > > > > > the PodName as the ResourceID. What do you think?
> > > > > > > > >
> > > > > > > > > Cheers,
> > > > > > > > >
> > > > > > > > > Kosntantin
> > > > > > > > >
> > > > > > > > > On Thu, Mar 26, 2020 at 6:49 PM Till Rohrmann <
> > > > trohrmann@apache.org>
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hi Yangze,
> > > > > > > > > >
> > > > > > > > > > thanks for creating this FLIP. I think it is a very good
> > > > > > improvement
> > > > > > > > > > helping our users and ourselves understanding better what's
> > > > going
> > > > > > on in
> > > > > > > > > > Flink.
> > > > > > > > > >
> > > > > > > > > > Creating the ResourceIDs with host information/pod name is a
> > > > good
> > > > > > idea.
> > > > > > > > > >
> > > > > > > > > > Also deriving ExecutionGraph IDs from their superset ID is a
> > > > good
> > > > > > idea.
> > > > > > > > > >
> > > > > > > > > > The InstanceID is used for fencing purposes. I would not make
> > > > it a
> > > > > > > > > > composition of the ResourceID + a monotonically increasing
> > > > number.
> > > > > > The
> > > > > > > > > > problem is that in case of a RM failure the InstanceIDs would
> > > > start
> > > > > > > > from
> > > > > > > > > 0
> > > > > > > > > > again and this could lead to collisions.
> > > > > > > > > >
> > > > > > > > > > Logging more information on how the different runtime IDs are
> > > > > > > > correlated
> > > > > > > > > is
> > > > > > > > > > also a good idea.
> > > > > > > > > >
> > > > > > > > > > Two other ideas for simplifying the ids are the following:
> > > > > > > > > >
> > > > > > > > > > * The SlotRequestID was introduced because the SlotPool was a
> > > > > > separate
> > > > > > > > > > RpcEndpoint a while ago. With this no longer being the case I
> > > > > > think we
> > > > > > > > > > could remove the SlotRequestID and replace it with the
> > > > > > AllocationID.
> > > > > > > > > > * Instead of creating new SlotRequestIDs for multi task slots
> > > > one
> > > > > > could
> > > > > > > > > > derive them from the SlotRequestID used for requesting the
> > > > > > underlying
> > > > > > > > > > AllocatedSlot.
> > > > > > > > > >
> > > > > > > > > > Given that the slot sharing logic will most likely be
> > > reworked
> > > > > > with the
> > > > > > > > > > pipelined region scheduling, we might be able to resolve
> > > these
> > > > two
> > > > > > > > points
> > > > > > > > > > as part of the pipelined region scheduling effort.
> > > > > > > > > >
> > > > > > > > > > Cheers,
> > > > > > > > > > Till
> > > > > > > > > >
> > > > > > > > > > On Thu, Mar 26, 2020 at 10:51 AM Yangze Guo <
> > > > karmagyz@gmail.com>
> > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Hi everyone,
> > > > > > > > > > >
> > > > > > > > > > > We would like to start a discussion thread on "FLIP-118:
> > > > Improve
> > > > > > > > > > > Flink’s ID system"[1].
> > > > > > > > > > >
> > > > > > > > > > > This FLIP mainly discusses the following issues, target to
> > > > > > enhance
> > > > > > > > the
> > > > > > > > > > > readability of IDs in log and help user to debug in case of
> > > > > > failures:
> > > > > > > > > > >
> > > > > > > > > > > - Enhance the readability of the string literals of IDs.
> > > > Most of
> > > > > > them
> > > > > > > > > > > are hashcodes, e.g. ExecutionAttemptID, which do not
> > > provide
> > > > much
> > > > > > > > > > > meaningful information and are hard to recognize and
> > > compare
> > > > for
> > > > > > > > > > > users.
> > > > > > > > > > > - Log the ID’s lineage information to make debugging more
> > > > > > convenient.
> > > > > > > > > > > Currently, the log fails to always show the lineage
> > > > information
> > > > > > > > > > > between IDs. Finding out relationships between entities
> > > > > > identified by
> > > > > > > > > > > given IDs is a common demand, e.g., slot of which
> > > > AllocationID is
> > > > > > > > > > > assigned to satisfy slot request of with SlotRequestID.
> > > > Absence
> > > > > > of
> > > > > > > > > > > such lineage information, it’s impossible to track the end
> > > > to end
> > > > > > > > > > > lifecycle of an Execution or a Task now, which makes
> > > > debugging
> > > > > > > > > > > difficult.
> > > > > > > > > > >
> > > > > > > > > > > Key changes proposed in the FLIP are as follows:
> > > > > > > > > > >
> > > > > > > > > > > - Add location information to distributed components
> > > > > > > > > > > - Add topology information to graph components
> > > > > > > > > > > - Log the ID’s lineage information
> > > > > > > > > > > - Expose the identifier of distributing component to user
> > > > > > > > > > >
> > > > > > > > > > > Please find more details in the FLIP wiki document [1].
> > > > Looking
> > > > > > > > forward
> > > > > > > > > > to
> > > > > > > > > > > your feedbacks.
> > > > > > > > > > >
> > > > > > > > > > > [1]
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > >
> > > >
> > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=148643521
> > > > > > > > > > >
> > > > > > > > > > > Best,
> > > > > > > > > > > Yangze Guo
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > --
> > > > > > > > >
> > > > > > > > > Konstantin Knauf | Head of Product
> > > > > > > > >
> > > > > > > > > +49 160 91394525
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > Follow us @VervericaData Ververica <https://www.ververica.com/
> > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > --
> > > > > > > > >
> > > > > > > > > Join Flink Forward <https://flink-forward.org/> - The Apache
> > > > Flink
> > > > > > > > > Conference
> > > > > > > > >
> > > > > > > > > Stream Processing | Event Driven | Real Time
> > > > > > > > >
> > > > > > > > > --
> > > > > > > > >
> > > > > > > > > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
> > > > > > > > >
> > > > > > > > > --
> > > > > > > > > Ververica GmbH
> > > > > > > > > Registered at Amtsgericht Charlottenburg: HRB 158244 B
> > > > > > > > > Managing Directors: Timothy Alexander Steinert, Yip Park Tung
> > > > Jason,
> > > > > > Ji
> > > > > > > > > (Tony) Cheng
> > > > > > > > >
> > > > > > > >
> > > > > >
> > > >
> > >

Re: [DISCUSS] FLIP-118: Improve Flink’s ID system

Posted by Yangze Guo <ka...@gmail.com>.
Hi there,

Sorry for the belated reply. I just make a preliminary investigation
of the infect of refactoring IntermediateResultPartitionID.

The key change is making it being composed of IntermediateDataSetID
and a partitionNum.
public class IntermediateResultPartitionID {
   private final IntermediateDataSetID intermediateDataSetID;
   private final int partitionNum;
}

In this benchmark, I use examples/streaming/WordCount.jar as the test
job and run Flink on Yarn. All the configurations are kept default
except for "taskmanager.numberOfTaskSlots", which is set to 2.

The result shows it does have an impact on performance.
- After this change, the maximum parallelism went from 760 to 685,
which limited by the total number of network buffers. For the same
parallelism, user needs more network buffer than before.
- The netty message "PartitionRequest" and "TaskEventRequest" increase
by 4 bytes. For "PartitionRequest", it means 7% increase.
- The TDD takes longer to construct. With 600 parallelisms, it takes
twice as long to construct TDD than before.

Details record in
https://docs.google.com/spreadsheets/d/13lt6J29uikWoux79047lkvbGi2fS4ioWHSbvT1kTL2M/edit?usp=sharing

The same issue could happen in ExecutionAttemptID, which will increase
the "PartitionRequest" and "TaskEventRequest" by 8 bytes(subtaskIndex
and attemptNumber). But it may not infect the TDD as much as
IntermediateResultPartitionID, since there is only one
ExecutionAttemptID per TDD.

After that preliminary investigation, I tend to not refactor
ExecutionAttemptID and IntermediateResultPartitionID at the moment or
treat it as future work.

WDYT? @ZhuZhu @Till

Best,
Yangze Guo

On Wed, Apr 1, 2020 at 11:53 AM Zhu Zhu <re...@gmail.com> wrote:
>
> >> However, it seems the JobVertexID is derived from hashcode ...
> You are right. JobVertexID is widely used and reworking it may affect the
> public
> interfaces, e.g. REST api. We can take it as a long term goal and exclude
> it from this FLIP.
> This same applies to IntermediateDataSetID, which can be also composed of a
> JobID
> and an index as Till proposed.
>
> Thanks,
> Zhu Zhu
>
> Till Rohrmann <tr...@apache.org> 于2020年3月31日周二 下午8:36写道:
>
> > For the IntermediateDataSetID I was just thinking that it might actually be
> > interesting to know which job produced the result which, by using cluster
> > partitions, could be used across different jobs. Not saying that we have to
> > do it, though.
> >
> > A small addition to Zhu Zhu's comment about TDD sizes: For the problem with
> > too large TDDs there is already an issue [1]. The current suspicion is that
> > the size of TDDs for jobs with a large parallelism can indeed become
> > problematic for Flink. Hence, it would be great to investigate the impacts
> > of the proposed changes.
> >
> > [1] https://issues.apache.org/jira/browse/FLINK-16069
> >
> > Cheers,
> > Till
> >
> > On Tue, Mar 31, 2020 at 11:50 AM Yangze Guo <ka...@gmail.com> wrote:
> >
> > > Hi, Zhu,
> > >
> > > Thanks for the feedback.
> > >
> > > > make JobVertexID a composition of JobID and a topology index
> > > I think it is a good idea. However, it seems the JobVertexID is
> > > derived from hashcode which used to identify them across submission.
> > > I'm not familiar with that component though. I prefer to keep this
> > > idea out the scope of this FLIP if no one could help us to figure it
> > > out.
> > >
> > > > How about we still keep IntermediateDataSetID independent from
> > > JobVertexID,
> > > > but just print the producing relationships in logs? I think keeping
> > > > IntermediateDataSetID independent may be better considering the cross
> > job
> > > > result usages in interactive query cases.
> > > I think you are right. I'll keep IntermediateDataSetID independent
> > > from JobVertexID.
> > >
> > > > The new IDs will become larger with this rework.
> > > Yes, I also have the same concern. Benchmark is necessary, I'll try to
> > > provide one during the implementation phase.
> > >
> > >
> > > Best,
> > > Yangze Guo
> > >
> > > On Tue, Mar 31, 2020 at 4:55 PM Zhu Zhu <re...@gmail.com> wrote:
> > > >
> > > > Thanks for proposing this improvement Yangze. Big +1 for the overall
> > > > proposal. It can help a lot in troubleshooting.
> > > >
> > > > Here are a few questions for it:
> > > > 1. Shall we make JobVertexID a composition of JobID and a topology
> > index?
> > > > This would help in the session cluster case, so that we can identify
> > > which
> > > > tasks are from which jobs along with the rework of ExecutionAttemptID.
> > > >
> > > > 2. You mentioned that "Add the producer info to the string literal of
> > > > IntermediateDataSetID". Do you mean to make IntermediateDataSetID a
> > > > composition of JobVertexID and a consumer index?
> > > > How about we still keep IntermediateDataSetID independent from
> > > JobVertexID,
> > > > but just print the producing relationships in logs? I think keeping
> > > > IntermediateDataSetID independent may be better considering the cross
> > job
> > > > result usages in interactive query cases.
> > > >
> > > > 3. The new IDs will become larger with this rework. The
> > > > TaskDeploymentDescriptor can become much larger since it is mainly
> > > composed
> > > > of a variety DIs. I'm not sure how much it would be but there can be
> > more
> > > > memory and CPU cost for it, and results in more frequent GCs, message
> > > size
> > > > exceeding akka frame limits, and a longer blocked time of main thread.
> > > > This should not be a problem in most cases but might be a problem for
> > > large
> > > > scale jobs. Shall we have an benchmark for it?
> > > >
> > > > Thanks,
> > > > Zhu Zhu
> > > >
> > > > Yangze Guo <ka...@gmail.com> 于2020年3月31日周二 下午2:19写道:
> > > >
> > > > > Thank you all for the feedback! Sorry for the belated reply.
> > > > >
> > > > > @Till
> > > > > I'm +1 for your two ideas and I'd like to move these two out of the
> > > > > scope of this FLIP since the pipelined region scheduling is an
> > ongoing
> > > > > work now.
> > > > > I also agree that we should not make the InstanceID in
> > > > > TaskExecutorConnection being composed of the ResourceID plus a
> > > > > monotonically increasing value. Thanks a lot for your explanation.
> > > > >
> > > > > @Konstantin @Yang
> > > > > Regarding the PodName of TaskExecutor on K8s, I second Yang's
> > > > > suggestion. It makes sense to me to let user export RESOURCE_ID and
> > > > > make TM respect it. User needs to guarantee there is no collision for
> > > > > different TM.
> > > > >
> > > > > Best,
> > > > > Yangze Guo
> > > > >
> > > > >
> > > > > On Tue, Mar 31, 2020 at 12:25 AM Steven Wu <st...@gmail.com>
> > > wrote:
> > > > > >
> > > > > > +1 on allowing user defined resourceId for taskmanager
> > > > > >
> > > > > > On Sun, Mar 29, 2020 at 7:24 PM Yang Wang <da...@gmail.com>
> > > wrote:
> > > > > >
> > > > > > > Hi Konstantin,
> > > > > > >
> > > > > > > I think it is a good idea. Currently, our users also report a
> > > similar
> > > > > issue
> > > > > > > with
> > > > > > > resourceId of standalone cluster. When we start a standalone
> > > cluster
> > > > > now,
> > > > > > > the `TaskManagerRunner` always generates a uuid for the
> > > resourceId. It
> > > > > will
> > > > > > > be used to register to the jobmanager and not convenient to match
> > > with
> > > > > the
> > > > > > > real
> > > > > > > taskmanager, especially in container environment.
> > > > > > >
> > > > > > > I think a probably solution is we could support the user defined
> > > > > > > resourceId.
> > > > > > > We could get it from the environment. For standalone on K8s, we
> > > could
> > > > > set
> > > > > > > the "RESOURCE_ID" env to the pod name so that it is easier to
> > > match the
> > > > > > > taskmanager with K8s pod.
> > > > > > >
> > > > > > > Moreover, i am afraid we could not set the pod name to the
> > > resourceId.
> > > > > I
> > > > > > > think
> > > > > > > you could set the "deployment.meta.name". Since the pod name is
> > > > > generated
> > > > > > > by
> > > > > > > K8s in the pattern {deployment.meta.nane}-{rc.uuid}-{uuid}. On
> > the
> > > > > > > contrary, we
> > > > > > > will set the resourceId to the pod name.
> > > > > > >
> > > > > > >
> > > > > > > Best,
> > > > > > > Yang
> > > > > > >
> > > > > > > Konstantin Knauf <ko...@ververica.com> 于2020年3月29日周日
> > > 下午8:06写道:
> > > > > > >
> > > > > > > > Hi Yangze, Hi Till,
> > > > > > > >
> > > > > > > > thanks you for working on this topic. I believe it will make
> > > > > debugging
> > > > > > > > large Apache Flink deployments much more feasible.
> > > > > > > >
> > > > > > > > I was wondering whether it would make sense to allow the user
> > to
> > > > > specify
> > > > > > > > the Resource ID in standalone setups?  For example, many users
> > > still
> > > > > > > > implicitly use standalone clusters on Kubernetes (the native
> > > support
> > > > > is
> > > > > > > > still experimental) and in these cases it would be interesting
> > to
> > > > > also
> > > > > > > set
> > > > > > > > the PodName as the ResourceID. What do you think?
> > > > > > > >
> > > > > > > > Cheers,
> > > > > > > >
> > > > > > > > Kosntantin
> > > > > > > >
> > > > > > > > On Thu, Mar 26, 2020 at 6:49 PM Till Rohrmann <
> > > trohrmann@apache.org>
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi Yangze,
> > > > > > > > >
> > > > > > > > > thanks for creating this FLIP. I think it is a very good
> > > > > improvement
> > > > > > > > > helping our users and ourselves understanding better what's
> > > going
> > > > > on in
> > > > > > > > > Flink.
> > > > > > > > >
> > > > > > > > > Creating the ResourceIDs with host information/pod name is a
> > > good
> > > > > idea.
> > > > > > > > >
> > > > > > > > > Also deriving ExecutionGraph IDs from their superset ID is a
> > > good
> > > > > idea.
> > > > > > > > >
> > > > > > > > > The InstanceID is used for fencing purposes. I would not make
> > > it a
> > > > > > > > > composition of the ResourceID + a monotonically increasing
> > > number.
> > > > > The
> > > > > > > > > problem is that in case of a RM failure the InstanceIDs would
> > > start
> > > > > > > from
> > > > > > > > 0
> > > > > > > > > again and this could lead to collisions.
> > > > > > > > >
> > > > > > > > > Logging more information on how the different runtime IDs are
> > > > > > > correlated
> > > > > > > > is
> > > > > > > > > also a good idea.
> > > > > > > > >
> > > > > > > > > Two other ideas for simplifying the ids are the following:
> > > > > > > > >
> > > > > > > > > * The SlotRequestID was introduced because the SlotPool was a
> > > > > separate
> > > > > > > > > RpcEndpoint a while ago. With this no longer being the case I
> > > > > think we
> > > > > > > > > could remove the SlotRequestID and replace it with the
> > > > > AllocationID.
> > > > > > > > > * Instead of creating new SlotRequestIDs for multi task slots
> > > one
> > > > > could
> > > > > > > > > derive them from the SlotRequestID used for requesting the
> > > > > underlying
> > > > > > > > > AllocatedSlot.
> > > > > > > > >
> > > > > > > > > Given that the slot sharing logic will most likely be
> > reworked
> > > > > with the
> > > > > > > > > pipelined region scheduling, we might be able to resolve
> > these
> > > two
> > > > > > > points
> > > > > > > > > as part of the pipelined region scheduling effort.
> > > > > > > > >
> > > > > > > > > Cheers,
> > > > > > > > > Till
> > > > > > > > >
> > > > > > > > > On Thu, Mar 26, 2020 at 10:51 AM Yangze Guo <
> > > karmagyz@gmail.com>
> > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hi everyone,
> > > > > > > > > >
> > > > > > > > > > We would like to start a discussion thread on "FLIP-118:
> > > Improve
> > > > > > > > > > Flink’s ID system"[1].
> > > > > > > > > >
> > > > > > > > > > This FLIP mainly discusses the following issues, target to
> > > > > enhance
> > > > > > > the
> > > > > > > > > > readability of IDs in log and help user to debug in case of
> > > > > failures:
> > > > > > > > > >
> > > > > > > > > > - Enhance the readability of the string literals of IDs.
> > > Most of
> > > > > them
> > > > > > > > > > are hashcodes, e.g. ExecutionAttemptID, which do not
> > provide
> > > much
> > > > > > > > > > meaningful information and are hard to recognize and
> > compare
> > > for
> > > > > > > > > > users.
> > > > > > > > > > - Log the ID’s lineage information to make debugging more
> > > > > convenient.
> > > > > > > > > > Currently, the log fails to always show the lineage
> > > information
> > > > > > > > > > between IDs. Finding out relationships between entities
> > > > > identified by
> > > > > > > > > > given IDs is a common demand, e.g., slot of which
> > > AllocationID is
> > > > > > > > > > assigned to satisfy slot request of with SlotRequestID.
> > > Absence
> > > > > of
> > > > > > > > > > such lineage information, it’s impossible to track the end
> > > to end
> > > > > > > > > > lifecycle of an Execution or a Task now, which makes
> > > debugging
> > > > > > > > > > difficult.
> > > > > > > > > >
> > > > > > > > > > Key changes proposed in the FLIP are as follows:
> > > > > > > > > >
> > > > > > > > > > - Add location information to distributed components
> > > > > > > > > > - Add topology information to graph components
> > > > > > > > > > - Log the ID’s lineage information
> > > > > > > > > > - Expose the identifier of distributing component to user
> > > > > > > > > >
> > > > > > > > > > Please find more details in the FLIP wiki document [1].
> > > Looking
> > > > > > > forward
> > > > > > > > > to
> > > > > > > > > > your feedbacks.
> > > > > > > > > >
> > > > > > > > > > [1]
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > >
> > >
> > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=148643521
> > > > > > > > > >
> > > > > > > > > > Best,
> > > > > > > > > > Yangze Guo
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > --
> > > > > > > >
> > > > > > > > Konstantin Knauf | Head of Product
> > > > > > > >
> > > > > > > > +49 160 91394525
> > > > > > > >
> > > > > > > >
> > > > > > > > Follow us @VervericaData Ververica <https://www.ververica.com/
> > >
> > > > > > > >
> > > > > > > >
> > > > > > > > --
> > > > > > > >
> > > > > > > > Join Flink Forward <https://flink-forward.org/> - The Apache
> > > Flink
> > > > > > > > Conference
> > > > > > > >
> > > > > > > > Stream Processing | Event Driven | Real Time
> > > > > > > >
> > > > > > > > --
> > > > > > > >
> > > > > > > > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
> > > > > > > >
> > > > > > > > --
> > > > > > > > Ververica GmbH
> > > > > > > > Registered at Amtsgericht Charlottenburg: HRB 158244 B
> > > > > > > > Managing Directors: Timothy Alexander Steinert, Yip Park Tung
> > > Jason,
> > > > > Ji
> > > > > > > > (Tony) Cheng
> > > > > > > >
> > > > > > >
> > > > >
> > >
> >

Re: [DISCUSS] FLIP-118: Improve Flink’s ID system

Posted by Zhu Zhu <re...@gmail.com>.
>> However, it seems the JobVertexID is derived from hashcode ...
You are right. JobVertexID is widely used and reworking it may affect the
public
interfaces, e.g. REST api. We can take it as a long term goal and exclude
it from this FLIP.
This same applies to IntermediateDataSetID, which can be also composed of a
JobID
and an index as Till proposed.

Thanks,
Zhu Zhu

Till Rohrmann <tr...@apache.org> 于2020年3月31日周二 下午8:36写道:

> For the IntermediateDataSetID I was just thinking that it might actually be
> interesting to know which job produced the result which, by using cluster
> partitions, could be used across different jobs. Not saying that we have to
> do it, though.
>
> A small addition to Zhu Zhu's comment about TDD sizes: For the problem with
> too large TDDs there is already an issue [1]. The current suspicion is that
> the size of TDDs for jobs with a large parallelism can indeed become
> problematic for Flink. Hence, it would be great to investigate the impacts
> of the proposed changes.
>
> [1] https://issues.apache.org/jira/browse/FLINK-16069
>
> Cheers,
> Till
>
> On Tue, Mar 31, 2020 at 11:50 AM Yangze Guo <ka...@gmail.com> wrote:
>
> > Hi, Zhu,
> >
> > Thanks for the feedback.
> >
> > > make JobVertexID a composition of JobID and a topology index
> > I think it is a good idea. However, it seems the JobVertexID is
> > derived from hashcode which used to identify them across submission.
> > I'm not familiar with that component though. I prefer to keep this
> > idea out the scope of this FLIP if no one could help us to figure it
> > out.
> >
> > > How about we still keep IntermediateDataSetID independent from
> > JobVertexID,
> > > but just print the producing relationships in logs? I think keeping
> > > IntermediateDataSetID independent may be better considering the cross
> job
> > > result usages in interactive query cases.
> > I think you are right. I'll keep IntermediateDataSetID independent
> > from JobVertexID.
> >
> > > The new IDs will become larger with this rework.
> > Yes, I also have the same concern. Benchmark is necessary, I'll try to
> > provide one during the implementation phase.
> >
> >
> > Best,
> > Yangze Guo
> >
> > On Tue, Mar 31, 2020 at 4:55 PM Zhu Zhu <re...@gmail.com> wrote:
> > >
> > > Thanks for proposing this improvement Yangze. Big +1 for the overall
> > > proposal. It can help a lot in troubleshooting.
> > >
> > > Here are a few questions for it:
> > > 1. Shall we make JobVertexID a composition of JobID and a topology
> index?
> > > This would help in the session cluster case, so that we can identify
> > which
> > > tasks are from which jobs along with the rework of ExecutionAttemptID.
> > >
> > > 2. You mentioned that "Add the producer info to the string literal of
> > > IntermediateDataSetID". Do you mean to make IntermediateDataSetID a
> > > composition of JobVertexID and a consumer index?
> > > How about we still keep IntermediateDataSetID independent from
> > JobVertexID,
> > > but just print the producing relationships in logs? I think keeping
> > > IntermediateDataSetID independent may be better considering the cross
> job
> > > result usages in interactive query cases.
> > >
> > > 3. The new IDs will become larger with this rework. The
> > > TaskDeploymentDescriptor can become much larger since it is mainly
> > composed
> > > of a variety DIs. I'm not sure how much it would be but there can be
> more
> > > memory and CPU cost for it, and results in more frequent GCs, message
> > size
> > > exceeding akka frame limits, and a longer blocked time of main thread.
> > > This should not be a problem in most cases but might be a problem for
> > large
> > > scale jobs. Shall we have an benchmark for it?
> > >
> > > Thanks,
> > > Zhu Zhu
> > >
> > > Yangze Guo <ka...@gmail.com> 于2020年3月31日周二 下午2:19写道:
> > >
> > > > Thank you all for the feedback! Sorry for the belated reply.
> > > >
> > > > @Till
> > > > I'm +1 for your two ideas and I'd like to move these two out of the
> > > > scope of this FLIP since the pipelined region scheduling is an
> ongoing
> > > > work now.
> > > > I also agree that we should not make the InstanceID in
> > > > TaskExecutorConnection being composed of the ResourceID plus a
> > > > monotonically increasing value. Thanks a lot for your explanation.
> > > >
> > > > @Konstantin @Yang
> > > > Regarding the PodName of TaskExecutor on K8s, I second Yang's
> > > > suggestion. It makes sense to me to let user export RESOURCE_ID and
> > > > make TM respect it. User needs to guarantee there is no collision for
> > > > different TM.
> > > >
> > > > Best,
> > > > Yangze Guo
> > > >
> > > >
> > > > On Tue, Mar 31, 2020 at 12:25 AM Steven Wu <st...@gmail.com>
> > wrote:
> > > > >
> > > > > +1 on allowing user defined resourceId for taskmanager
> > > > >
> > > > > On Sun, Mar 29, 2020 at 7:24 PM Yang Wang <da...@gmail.com>
> > wrote:
> > > > >
> > > > > > Hi Konstantin,
> > > > > >
> > > > > > I think it is a good idea. Currently, our users also report a
> > similar
> > > > issue
> > > > > > with
> > > > > > resourceId of standalone cluster. When we start a standalone
> > cluster
> > > > now,
> > > > > > the `TaskManagerRunner` always generates a uuid for the
> > resourceId. It
> > > > will
> > > > > > be used to register to the jobmanager and not convenient to match
> > with
> > > > the
> > > > > > real
> > > > > > taskmanager, especially in container environment.
> > > > > >
> > > > > > I think a probably solution is we could support the user defined
> > > > > > resourceId.
> > > > > > We could get it from the environment. For standalone on K8s, we
> > could
> > > > set
> > > > > > the "RESOURCE_ID" env to the pod name so that it is easier to
> > match the
> > > > > > taskmanager with K8s pod.
> > > > > >
> > > > > > Moreover, i am afraid we could not set the pod name to the
> > resourceId.
> > > > I
> > > > > > think
> > > > > > you could set the "deployment.meta.name". Since the pod name is
> > > > generated
> > > > > > by
> > > > > > K8s in the pattern {deployment.meta.nane}-{rc.uuid}-{uuid}. On
> the
> > > > > > contrary, we
> > > > > > will set the resourceId to the pod name.
> > > > > >
> > > > > >
> > > > > > Best,
> > > > > > Yang
> > > > > >
> > > > > > Konstantin Knauf <ko...@ververica.com> 于2020年3月29日周日
> > 下午8:06写道:
> > > > > >
> > > > > > > Hi Yangze, Hi Till,
> > > > > > >
> > > > > > > thanks you for working on this topic. I believe it will make
> > > > debugging
> > > > > > > large Apache Flink deployments much more feasible.
> > > > > > >
> > > > > > > I was wondering whether it would make sense to allow the user
> to
> > > > specify
> > > > > > > the Resource ID in standalone setups?  For example, many users
> > still
> > > > > > > implicitly use standalone clusters on Kubernetes (the native
> > support
> > > > is
> > > > > > > still experimental) and in these cases it would be interesting
> to
> > > > also
> > > > > > set
> > > > > > > the PodName as the ResourceID. What do you think?
> > > > > > >
> > > > > > > Cheers,
> > > > > > >
> > > > > > > Kosntantin
> > > > > > >
> > > > > > > On Thu, Mar 26, 2020 at 6:49 PM Till Rohrmann <
> > trohrmann@apache.org>
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Hi Yangze,
> > > > > > > >
> > > > > > > > thanks for creating this FLIP. I think it is a very good
> > > > improvement
> > > > > > > > helping our users and ourselves understanding better what's
> > going
> > > > on in
> > > > > > > > Flink.
> > > > > > > >
> > > > > > > > Creating the ResourceIDs with host information/pod name is a
> > good
> > > > idea.
> > > > > > > >
> > > > > > > > Also deriving ExecutionGraph IDs from their superset ID is a
> > good
> > > > idea.
> > > > > > > >
> > > > > > > > The InstanceID is used for fencing purposes. I would not make
> > it a
> > > > > > > > composition of the ResourceID + a monotonically increasing
> > number.
> > > > The
> > > > > > > > problem is that in case of a RM failure the InstanceIDs would
> > start
> > > > > > from
> > > > > > > 0
> > > > > > > > again and this could lead to collisions.
> > > > > > > >
> > > > > > > > Logging more information on how the different runtime IDs are
> > > > > > correlated
> > > > > > > is
> > > > > > > > also a good idea.
> > > > > > > >
> > > > > > > > Two other ideas for simplifying the ids are the following:
> > > > > > > >
> > > > > > > > * The SlotRequestID was introduced because the SlotPool was a
> > > > separate
> > > > > > > > RpcEndpoint a while ago. With this no longer being the case I
> > > > think we
> > > > > > > > could remove the SlotRequestID and replace it with the
> > > > AllocationID.
> > > > > > > > * Instead of creating new SlotRequestIDs for multi task slots
> > one
> > > > could
> > > > > > > > derive them from the SlotRequestID used for requesting the
> > > > underlying
> > > > > > > > AllocatedSlot.
> > > > > > > >
> > > > > > > > Given that the slot sharing logic will most likely be
> reworked
> > > > with the
> > > > > > > > pipelined region scheduling, we might be able to resolve
> these
> > two
> > > > > > points
> > > > > > > > as part of the pipelined region scheduling effort.
> > > > > > > >
> > > > > > > > Cheers,
> > > > > > > > Till
> > > > > > > >
> > > > > > > > On Thu, Mar 26, 2020 at 10:51 AM Yangze Guo <
> > karmagyz@gmail.com>
> > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi everyone,
> > > > > > > > >
> > > > > > > > > We would like to start a discussion thread on "FLIP-118:
> > Improve
> > > > > > > > > Flink’s ID system"[1].
> > > > > > > > >
> > > > > > > > > This FLIP mainly discusses the following issues, target to
> > > > enhance
> > > > > > the
> > > > > > > > > readability of IDs in log and help user to debug in case of
> > > > failures:
> > > > > > > > >
> > > > > > > > > - Enhance the readability of the string literals of IDs.
> > Most of
> > > > them
> > > > > > > > > are hashcodes, e.g. ExecutionAttemptID, which do not
> provide
> > much
> > > > > > > > > meaningful information and are hard to recognize and
> compare
> > for
> > > > > > > > > users.
> > > > > > > > > - Log the ID’s lineage information to make debugging more
> > > > convenient.
> > > > > > > > > Currently, the log fails to always show the lineage
> > information
> > > > > > > > > between IDs. Finding out relationships between entities
> > > > identified by
> > > > > > > > > given IDs is a common demand, e.g., slot of which
> > AllocationID is
> > > > > > > > > assigned to satisfy slot request of with SlotRequestID.
> > Absence
> > > > of
> > > > > > > > > such lineage information, it’s impossible to track the end
> > to end
> > > > > > > > > lifecycle of an Execution or a Task now, which makes
> > debugging
> > > > > > > > > difficult.
> > > > > > > > >
> > > > > > > > > Key changes proposed in the FLIP are as follows:
> > > > > > > > >
> > > > > > > > > - Add location information to distributed components
> > > > > > > > > - Add topology information to graph components
> > > > > > > > > - Log the ID’s lineage information
> > > > > > > > > - Expose the identifier of distributing component to user
> > > > > > > > >
> > > > > > > > > Please find more details in the FLIP wiki document [1].
> > Looking
> > > > > > forward
> > > > > > > > to
> > > > > > > > > your feedbacks.
> > > > > > > > >
> > > > > > > > > [1]
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=148643521
> > > > > > > > >
> > > > > > > > > Best,
> > > > > > > > > Yangze Guo
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > --
> > > > > > >
> > > > > > > Konstantin Knauf | Head of Product
> > > > > > >
> > > > > > > +49 160 91394525
> > > > > > >
> > > > > > >
> > > > > > > Follow us @VervericaData Ververica <https://www.ververica.com/
> >
> > > > > > >
> > > > > > >
> > > > > > > --
> > > > > > >
> > > > > > > Join Flink Forward <https://flink-forward.org/> - The Apache
> > Flink
> > > > > > > Conference
> > > > > > >
> > > > > > > Stream Processing | Event Driven | Real Time
> > > > > > >
> > > > > > > --
> > > > > > >
> > > > > > > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
> > > > > > >
> > > > > > > --
> > > > > > > Ververica GmbH
> > > > > > > Registered at Amtsgericht Charlottenburg: HRB 158244 B
> > > > > > > Managing Directors: Timothy Alexander Steinert, Yip Park Tung
> > Jason,
> > > > Ji
> > > > > > > (Tony) Cheng
> > > > > > >
> > > > > >
> > > >
> >
>

Re: [DISCUSS] FLIP-118: Improve Flink’s ID system

Posted by Till Rohrmann <tr...@apache.org>.
For the IntermediateDataSetID I was just thinking that it might actually be
interesting to know which job produced the result which, by using cluster
partitions, could be used across different jobs. Not saying that we have to
do it, though.

A small addition to Zhu Zhu's comment about TDD sizes: For the problem with
too large TDDs there is already an issue [1]. The current suspicion is that
the size of TDDs for jobs with a large parallelism can indeed become
problematic for Flink. Hence, it would be great to investigate the impacts
of the proposed changes.

[1] https://issues.apache.org/jira/browse/FLINK-16069

Cheers,
Till

On Tue, Mar 31, 2020 at 11:50 AM Yangze Guo <ka...@gmail.com> wrote:

> Hi, Zhu,
>
> Thanks for the feedback.
>
> > make JobVertexID a composition of JobID and a topology index
> I think it is a good idea. However, it seems the JobVertexID is
> derived from hashcode which used to identify them across submission.
> I'm not familiar with that component though. I prefer to keep this
> idea out the scope of this FLIP if no one could help us to figure it
> out.
>
> > How about we still keep IntermediateDataSetID independent from
> JobVertexID,
> > but just print the producing relationships in logs? I think keeping
> > IntermediateDataSetID independent may be better considering the cross job
> > result usages in interactive query cases.
> I think you are right. I'll keep IntermediateDataSetID independent
> from JobVertexID.
>
> > The new IDs will become larger with this rework.
> Yes, I also have the same concern. Benchmark is necessary, I'll try to
> provide one during the implementation phase.
>
>
> Best,
> Yangze Guo
>
> On Tue, Mar 31, 2020 at 4:55 PM Zhu Zhu <re...@gmail.com> wrote:
> >
> > Thanks for proposing this improvement Yangze. Big +1 for the overall
> > proposal. It can help a lot in troubleshooting.
> >
> > Here are a few questions for it:
> > 1. Shall we make JobVertexID a composition of JobID and a topology index?
> > This would help in the session cluster case, so that we can identify
> which
> > tasks are from which jobs along with the rework of ExecutionAttemptID.
> >
> > 2. You mentioned that "Add the producer info to the string literal of
> > IntermediateDataSetID". Do you mean to make IntermediateDataSetID a
> > composition of JobVertexID and a consumer index?
> > How about we still keep IntermediateDataSetID independent from
> JobVertexID,
> > but just print the producing relationships in logs? I think keeping
> > IntermediateDataSetID independent may be better considering the cross job
> > result usages in interactive query cases.
> >
> > 3. The new IDs will become larger with this rework. The
> > TaskDeploymentDescriptor can become much larger since it is mainly
> composed
> > of a variety DIs. I'm not sure how much it would be but there can be more
> > memory and CPU cost for it, and results in more frequent GCs, message
> size
> > exceeding akka frame limits, and a longer blocked time of main thread.
> > This should not be a problem in most cases but might be a problem for
> large
> > scale jobs. Shall we have an benchmark for it?
> >
> > Thanks,
> > Zhu Zhu
> >
> > Yangze Guo <ka...@gmail.com> 于2020年3月31日周二 下午2:19写道:
> >
> > > Thank you all for the feedback! Sorry for the belated reply.
> > >
> > > @Till
> > > I'm +1 for your two ideas and I'd like to move these two out of the
> > > scope of this FLIP since the pipelined region scheduling is an ongoing
> > > work now.
> > > I also agree that we should not make the InstanceID in
> > > TaskExecutorConnection being composed of the ResourceID plus a
> > > monotonically increasing value. Thanks a lot for your explanation.
> > >
> > > @Konstantin @Yang
> > > Regarding the PodName of TaskExecutor on K8s, I second Yang's
> > > suggestion. It makes sense to me to let user export RESOURCE_ID and
> > > make TM respect it. User needs to guarantee there is no collision for
> > > different TM.
> > >
> > > Best,
> > > Yangze Guo
> > >
> > >
> > > On Tue, Mar 31, 2020 at 12:25 AM Steven Wu <st...@gmail.com>
> wrote:
> > > >
> > > > +1 on allowing user defined resourceId for taskmanager
> > > >
> > > > On Sun, Mar 29, 2020 at 7:24 PM Yang Wang <da...@gmail.com>
> wrote:
> > > >
> > > > > Hi Konstantin,
> > > > >
> > > > > I think it is a good idea. Currently, our users also report a
> similar
> > > issue
> > > > > with
> > > > > resourceId of standalone cluster. When we start a standalone
> cluster
> > > now,
> > > > > the `TaskManagerRunner` always generates a uuid for the
> resourceId. It
> > > will
> > > > > be used to register to the jobmanager and not convenient to match
> with
> > > the
> > > > > real
> > > > > taskmanager, especially in container environment.
> > > > >
> > > > > I think a probably solution is we could support the user defined
> > > > > resourceId.
> > > > > We could get it from the environment. For standalone on K8s, we
> could
> > > set
> > > > > the "RESOURCE_ID" env to the pod name so that it is easier to
> match the
> > > > > taskmanager with K8s pod.
> > > > >
> > > > > Moreover, i am afraid we could not set the pod name to the
> resourceId.
> > > I
> > > > > think
> > > > > you could set the "deployment.meta.name". Since the pod name is
> > > generated
> > > > > by
> > > > > K8s in the pattern {deployment.meta.nane}-{rc.uuid}-{uuid}. On the
> > > > > contrary, we
> > > > > will set the resourceId to the pod name.
> > > > >
> > > > >
> > > > > Best,
> > > > > Yang
> > > > >
> > > > > Konstantin Knauf <ko...@ververica.com> 于2020年3月29日周日
> 下午8:06写道:
> > > > >
> > > > > > Hi Yangze, Hi Till,
> > > > > >
> > > > > > thanks you for working on this topic. I believe it will make
> > > debugging
> > > > > > large Apache Flink deployments much more feasible.
> > > > > >
> > > > > > I was wondering whether it would make sense to allow the user to
> > > specify
> > > > > > the Resource ID in standalone setups?  For example, many users
> still
> > > > > > implicitly use standalone clusters on Kubernetes (the native
> support
> > > is
> > > > > > still experimental) and in these cases it would be interesting to
> > > also
> > > > > set
> > > > > > the PodName as the ResourceID. What do you think?
> > > > > >
> > > > > > Cheers,
> > > > > >
> > > > > > Kosntantin
> > > > > >
> > > > > > On Thu, Mar 26, 2020 at 6:49 PM Till Rohrmann <
> trohrmann@apache.org>
> > > > > > wrote:
> > > > > >
> > > > > > > Hi Yangze,
> > > > > > >
> > > > > > > thanks for creating this FLIP. I think it is a very good
> > > improvement
> > > > > > > helping our users and ourselves understanding better what's
> going
> > > on in
> > > > > > > Flink.
> > > > > > >
> > > > > > > Creating the ResourceIDs with host information/pod name is a
> good
> > > idea.
> > > > > > >
> > > > > > > Also deriving ExecutionGraph IDs from their superset ID is a
> good
> > > idea.
> > > > > > >
> > > > > > > The InstanceID is used for fencing purposes. I would not make
> it a
> > > > > > > composition of the ResourceID + a monotonically increasing
> number.
> > > The
> > > > > > > problem is that in case of a RM failure the InstanceIDs would
> start
> > > > > from
> > > > > > 0
> > > > > > > again and this could lead to collisions.
> > > > > > >
> > > > > > > Logging more information on how the different runtime IDs are
> > > > > correlated
> > > > > > is
> > > > > > > also a good idea.
> > > > > > >
> > > > > > > Two other ideas for simplifying the ids are the following:
> > > > > > >
> > > > > > > * The SlotRequestID was introduced because the SlotPool was a
> > > separate
> > > > > > > RpcEndpoint a while ago. With this no longer being the case I
> > > think we
> > > > > > > could remove the SlotRequestID and replace it with the
> > > AllocationID.
> > > > > > > * Instead of creating new SlotRequestIDs for multi task slots
> one
> > > could
> > > > > > > derive them from the SlotRequestID used for requesting the
> > > underlying
> > > > > > > AllocatedSlot.
> > > > > > >
> > > > > > > Given that the slot sharing logic will most likely be reworked
> > > with the
> > > > > > > pipelined region scheduling, we might be able to resolve these
> two
> > > > > points
> > > > > > > as part of the pipelined region scheduling effort.
> > > > > > >
> > > > > > > Cheers,
> > > > > > > Till
> > > > > > >
> > > > > > > On Thu, Mar 26, 2020 at 10:51 AM Yangze Guo <
> karmagyz@gmail.com>
> > > > > wrote:
> > > > > > >
> > > > > > > > Hi everyone,
> > > > > > > >
> > > > > > > > We would like to start a discussion thread on "FLIP-118:
> Improve
> > > > > > > > Flink’s ID system"[1].
> > > > > > > >
> > > > > > > > This FLIP mainly discusses the following issues, target to
> > > enhance
> > > > > the
> > > > > > > > readability of IDs in log and help user to debug in case of
> > > failures:
> > > > > > > >
> > > > > > > > - Enhance the readability of the string literals of IDs.
> Most of
> > > them
> > > > > > > > are hashcodes, e.g. ExecutionAttemptID, which do not provide
> much
> > > > > > > > meaningful information and are hard to recognize and compare
> for
> > > > > > > > users.
> > > > > > > > - Log the ID’s lineage information to make debugging more
> > > convenient.
> > > > > > > > Currently, the log fails to always show the lineage
> information
> > > > > > > > between IDs. Finding out relationships between entities
> > > identified by
> > > > > > > > given IDs is a common demand, e.g., slot of which
> AllocationID is
> > > > > > > > assigned to satisfy slot request of with SlotRequestID.
> Absence
> > > of
> > > > > > > > such lineage information, it’s impossible to track the end
> to end
> > > > > > > > lifecycle of an Execution or a Task now, which makes
> debugging
> > > > > > > > difficult.
> > > > > > > >
> > > > > > > > Key changes proposed in the FLIP are as follows:
> > > > > > > >
> > > > > > > > - Add location information to distributed components
> > > > > > > > - Add topology information to graph components
> > > > > > > > - Log the ID’s lineage information
> > > > > > > > - Expose the identifier of distributing component to user
> > > > > > > >
> > > > > > > > Please find more details in the FLIP wiki document [1].
> Looking
> > > > > forward
> > > > > > > to
> > > > > > > > your feedbacks.
> > > > > > > >
> > > > > > > > [1]
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=148643521
> > > > > > > >
> > > > > > > > Best,
> > > > > > > > Yangze Guo
> > > > > > > >
> > > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > >
> > > > > > Konstantin Knauf | Head of Product
> > > > > >
> > > > > > +49 160 91394525
> > > > > >
> > > > > >
> > > > > > Follow us @VervericaData Ververica <https://www.ververica.com/>
> > > > > >
> > > > > >
> > > > > > --
> > > > > >
> > > > > > Join Flink Forward <https://flink-forward.org/> - The Apache
> Flink
> > > > > > Conference
> > > > > >
> > > > > > Stream Processing | Event Driven | Real Time
> > > > > >
> > > > > > --
> > > > > >
> > > > > > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
> > > > > >
> > > > > > --
> > > > > > Ververica GmbH
> > > > > > Registered at Amtsgericht Charlottenburg: HRB 158244 B
> > > > > > Managing Directors: Timothy Alexander Steinert, Yip Park Tung
> Jason,
> > > Ji
> > > > > > (Tony) Cheng
> > > > > >
> > > > >
> > >
>

Re: [DISCUSS] FLIP-118: Improve Flink’s ID system

Posted by Yangze Guo <ka...@gmail.com>.
Hi, Zhu,

Thanks for the feedback.

> make JobVertexID a composition of JobID and a topology index
I think it is a good idea. However, it seems the JobVertexID is
derived from hashcode which used to identify them across submission.
I'm not familiar with that component though. I prefer to keep this
idea out the scope of this FLIP if no one could help us to figure it
out.

> How about we still keep IntermediateDataSetID independent from JobVertexID,
> but just print the producing relationships in logs? I think keeping
> IntermediateDataSetID independent may be better considering the cross job
> result usages in interactive query cases.
I think you are right. I'll keep IntermediateDataSetID independent
from JobVertexID.

> The new IDs will become larger with this rework.
Yes, I also have the same concern. Benchmark is necessary, I'll try to
provide one during the implementation phase.


Best,
Yangze Guo

On Tue, Mar 31, 2020 at 4:55 PM Zhu Zhu <re...@gmail.com> wrote:
>
> Thanks for proposing this improvement Yangze. Big +1 for the overall
> proposal. It can help a lot in troubleshooting.
>
> Here are a few questions for it:
> 1. Shall we make JobVertexID a composition of JobID and a topology index?
> This would help in the session cluster case, so that we can identify which
> tasks are from which jobs along with the rework of ExecutionAttemptID.
>
> 2. You mentioned that "Add the producer info to the string literal of
> IntermediateDataSetID". Do you mean to make IntermediateDataSetID a
> composition of JobVertexID and a consumer index?
> How about we still keep IntermediateDataSetID independent from JobVertexID,
> but just print the producing relationships in logs? I think keeping
> IntermediateDataSetID independent may be better considering the cross job
> result usages in interactive query cases.
>
> 3. The new IDs will become larger with this rework. The
> TaskDeploymentDescriptor can become much larger since it is mainly composed
> of a variety DIs. I'm not sure how much it would be but there can be more
> memory and CPU cost for it, and results in more frequent GCs, message size
> exceeding akka frame limits, and a longer blocked time of main thread.
> This should not be a problem in most cases but might be a problem for large
> scale jobs. Shall we have an benchmark for it?
>
> Thanks,
> Zhu Zhu
>
> Yangze Guo <ka...@gmail.com> 于2020年3月31日周二 下午2:19写道:
>
> > Thank you all for the feedback! Sorry for the belated reply.
> >
> > @Till
> > I'm +1 for your two ideas and I'd like to move these two out of the
> > scope of this FLIP since the pipelined region scheduling is an ongoing
> > work now.
> > I also agree that we should not make the InstanceID in
> > TaskExecutorConnection being composed of the ResourceID plus a
> > monotonically increasing value. Thanks a lot for your explanation.
> >
> > @Konstantin @Yang
> > Regarding the PodName of TaskExecutor on K8s, I second Yang's
> > suggestion. It makes sense to me to let user export RESOURCE_ID and
> > make TM respect it. User needs to guarantee there is no collision for
> > different TM.
> >
> > Best,
> > Yangze Guo
> >
> >
> > On Tue, Mar 31, 2020 at 12:25 AM Steven Wu <st...@gmail.com> wrote:
> > >
> > > +1 on allowing user defined resourceId for taskmanager
> > >
> > > On Sun, Mar 29, 2020 at 7:24 PM Yang Wang <da...@gmail.com> wrote:
> > >
> > > > Hi Konstantin,
> > > >
> > > > I think it is a good idea. Currently, our users also report a similar
> > issue
> > > > with
> > > > resourceId of standalone cluster. When we start a standalone cluster
> > now,
> > > > the `TaskManagerRunner` always generates a uuid for the resourceId. It
> > will
> > > > be used to register to the jobmanager and not convenient to match with
> > the
> > > > real
> > > > taskmanager, especially in container environment.
> > > >
> > > > I think a probably solution is we could support the user defined
> > > > resourceId.
> > > > We could get it from the environment. For standalone on K8s, we could
> > set
> > > > the "RESOURCE_ID" env to the pod name so that it is easier to match the
> > > > taskmanager with K8s pod.
> > > >
> > > > Moreover, i am afraid we could not set the pod name to the resourceId.
> > I
> > > > think
> > > > you could set the "deployment.meta.name". Since the pod name is
> > generated
> > > > by
> > > > K8s in the pattern {deployment.meta.nane}-{rc.uuid}-{uuid}. On the
> > > > contrary, we
> > > > will set the resourceId to the pod name.
> > > >
> > > >
> > > > Best,
> > > > Yang
> > > >
> > > > Konstantin Knauf <ko...@ververica.com> 于2020年3月29日周日 下午8:06写道:
> > > >
> > > > > Hi Yangze, Hi Till,
> > > > >
> > > > > thanks you for working on this topic. I believe it will make
> > debugging
> > > > > large Apache Flink deployments much more feasible.
> > > > >
> > > > > I was wondering whether it would make sense to allow the user to
> > specify
> > > > > the Resource ID in standalone setups?  For example, many users still
> > > > > implicitly use standalone clusters on Kubernetes (the native support
> > is
> > > > > still experimental) and in these cases it would be interesting to
> > also
> > > > set
> > > > > the PodName as the ResourceID. What do you think?
> > > > >
> > > > > Cheers,
> > > > >
> > > > > Kosntantin
> > > > >
> > > > > On Thu, Mar 26, 2020 at 6:49 PM Till Rohrmann <tr...@apache.org>
> > > > > wrote:
> > > > >
> > > > > > Hi Yangze,
> > > > > >
> > > > > > thanks for creating this FLIP. I think it is a very good
> > improvement
> > > > > > helping our users and ourselves understanding better what's going
> > on in
> > > > > > Flink.
> > > > > >
> > > > > > Creating the ResourceIDs with host information/pod name is a good
> > idea.
> > > > > >
> > > > > > Also deriving ExecutionGraph IDs from their superset ID is a good
> > idea.
> > > > > >
> > > > > > The InstanceID is used for fencing purposes. I would not make it a
> > > > > > composition of the ResourceID + a monotonically increasing number.
> > The
> > > > > > problem is that in case of a RM failure the InstanceIDs would start
> > > > from
> > > > > 0
> > > > > > again and this could lead to collisions.
> > > > > >
> > > > > > Logging more information on how the different runtime IDs are
> > > > correlated
> > > > > is
> > > > > > also a good idea.
> > > > > >
> > > > > > Two other ideas for simplifying the ids are the following:
> > > > > >
> > > > > > * The SlotRequestID was introduced because the SlotPool was a
> > separate
> > > > > > RpcEndpoint a while ago. With this no longer being the case I
> > think we
> > > > > > could remove the SlotRequestID and replace it with the
> > AllocationID.
> > > > > > * Instead of creating new SlotRequestIDs for multi task slots one
> > could
> > > > > > derive them from the SlotRequestID used for requesting the
> > underlying
> > > > > > AllocatedSlot.
> > > > > >
> > > > > > Given that the slot sharing logic will most likely be reworked
> > with the
> > > > > > pipelined region scheduling, we might be able to resolve these two
> > > > points
> > > > > > as part of the pipelined region scheduling effort.
> > > > > >
> > > > > > Cheers,
> > > > > > Till
> > > > > >
> > > > > > On Thu, Mar 26, 2020 at 10:51 AM Yangze Guo <ka...@gmail.com>
> > > > wrote:
> > > > > >
> > > > > > > Hi everyone,
> > > > > > >
> > > > > > > We would like to start a discussion thread on "FLIP-118: Improve
> > > > > > > Flink’s ID system"[1].
> > > > > > >
> > > > > > > This FLIP mainly discusses the following issues, target to
> > enhance
> > > > the
> > > > > > > readability of IDs in log and help user to debug in case of
> > failures:
> > > > > > >
> > > > > > > - Enhance the readability of the string literals of IDs. Most of
> > them
> > > > > > > are hashcodes, e.g. ExecutionAttemptID, which do not provide much
> > > > > > > meaningful information and are hard to recognize and compare for
> > > > > > > users.
> > > > > > > - Log the ID’s lineage information to make debugging more
> > convenient.
> > > > > > > Currently, the log fails to always show the lineage information
> > > > > > > between IDs. Finding out relationships between entities
> > identified by
> > > > > > > given IDs is a common demand, e.g., slot of which AllocationID is
> > > > > > > assigned to satisfy slot request of with SlotRequestID. Absence
> > of
> > > > > > > such lineage information, it’s impossible to track the end to end
> > > > > > > lifecycle of an Execution or a Task now, which makes debugging
> > > > > > > difficult.
> > > > > > >
> > > > > > > Key changes proposed in the FLIP are as follows:
> > > > > > >
> > > > > > > - Add location information to distributed components
> > > > > > > - Add topology information to graph components
> > > > > > > - Log the ID’s lineage information
> > > > > > > - Expose the identifier of distributing component to user
> > > > > > >
> > > > > > > Please find more details in the FLIP wiki document [1]. Looking
> > > > forward
> > > > > > to
> > > > > > > your feedbacks.
> > > > > > >
> > > > > > > [1]
> > > > > > >
> > > > > >
> > > > >
> > > >
> > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=148643521
> > > > > > >
> > > > > > > Best,
> > > > > > > Yangze Guo
> > > > > > >
> > > > > >
> > > > >
> > > > >
> > > > > --
> > > > >
> > > > > Konstantin Knauf | Head of Product
> > > > >
> > > > > +49 160 91394525
> > > > >
> > > > >
> > > > > Follow us @VervericaData Ververica <https://www.ververica.com/>
> > > > >
> > > > >
> > > > > --
> > > > >
> > > > > Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> > > > > Conference
> > > > >
> > > > > Stream Processing | Event Driven | Real Time
> > > > >
> > > > > --
> > > > >
> > > > > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
> > > > >
> > > > > --
> > > > > Ververica GmbH
> > > > > Registered at Amtsgericht Charlottenburg: HRB 158244 B
> > > > > Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason,
> > Ji
> > > > > (Tony) Cheng
> > > > >
> > > >
> >

Re: [DISCUSS] FLIP-118: Improve Flink’s ID system

Posted by Zhu Zhu <re...@gmail.com>.
Thanks for proposing this improvement Yangze. Big +1 for the overall
proposal. It can help a lot in troubleshooting.

Here are a few questions for it:
1. Shall we make JobVertexID a composition of JobID and a topology index?
This would help in the session cluster case, so that we can identify which
tasks are from which jobs along with the rework of ExecutionAttemptID.

2. You mentioned that "Add the producer info to the string literal of
IntermediateDataSetID". Do you mean to make IntermediateDataSetID a
composition of JobVertexID and a consumer index?
How about we still keep IntermediateDataSetID independent from JobVertexID,
but just print the producing relationships in logs? I think keeping
IntermediateDataSetID independent may be better considering the cross job
result usages in interactive query cases.

3. The new IDs will become larger with this rework. The
TaskDeploymentDescriptor can become much larger since it is mainly composed
of a variety DIs. I'm not sure how much it would be but there can be more
memory and CPU cost for it, and results in more frequent GCs, message size
exceeding akka frame limits, and a longer blocked time of main thread.
This should not be a problem in most cases but might be a problem for large
scale jobs. Shall we have an benchmark for it?

Thanks,
Zhu Zhu

Yangze Guo <ka...@gmail.com> 于2020年3月31日周二 下午2:19写道:

> Thank you all for the feedback! Sorry for the belated reply.
>
> @Till
> I'm +1 for your two ideas and I'd like to move these two out of the
> scope of this FLIP since the pipelined region scheduling is an ongoing
> work now.
> I also agree that we should not make the InstanceID in
> TaskExecutorConnection being composed of the ResourceID plus a
> monotonically increasing value. Thanks a lot for your explanation.
>
> @Konstantin @Yang
> Regarding the PodName of TaskExecutor on K8s, I second Yang's
> suggestion. It makes sense to me to let user export RESOURCE_ID and
> make TM respect it. User needs to guarantee there is no collision for
> different TM.
>
> Best,
> Yangze Guo
>
>
> On Tue, Mar 31, 2020 at 12:25 AM Steven Wu <st...@gmail.com> wrote:
> >
> > +1 on allowing user defined resourceId for taskmanager
> >
> > On Sun, Mar 29, 2020 at 7:24 PM Yang Wang <da...@gmail.com> wrote:
> >
> > > Hi Konstantin,
> > >
> > > I think it is a good idea. Currently, our users also report a similar
> issue
> > > with
> > > resourceId of standalone cluster. When we start a standalone cluster
> now,
> > > the `TaskManagerRunner` always generates a uuid for the resourceId. It
> will
> > > be used to register to the jobmanager and not convenient to match with
> the
> > > real
> > > taskmanager, especially in container environment.
> > >
> > > I think a probably solution is we could support the user defined
> > > resourceId.
> > > We could get it from the environment. For standalone on K8s, we could
> set
> > > the "RESOURCE_ID" env to the pod name so that it is easier to match the
> > > taskmanager with K8s pod.
> > >
> > > Moreover, i am afraid we could not set the pod name to the resourceId.
> I
> > > think
> > > you could set the "deployment.meta.name". Since the pod name is
> generated
> > > by
> > > K8s in the pattern {deployment.meta.nane}-{rc.uuid}-{uuid}. On the
> > > contrary, we
> > > will set the resourceId to the pod name.
> > >
> > >
> > > Best,
> > > Yang
> > >
> > > Konstantin Knauf <ko...@ververica.com> 于2020年3月29日周日 下午8:06写道:
> > >
> > > > Hi Yangze, Hi Till,
> > > >
> > > > thanks you for working on this topic. I believe it will make
> debugging
> > > > large Apache Flink deployments much more feasible.
> > > >
> > > > I was wondering whether it would make sense to allow the user to
> specify
> > > > the Resource ID in standalone setups?  For example, many users still
> > > > implicitly use standalone clusters on Kubernetes (the native support
> is
> > > > still experimental) and in these cases it would be interesting to
> also
> > > set
> > > > the PodName as the ResourceID. What do you think?
> > > >
> > > > Cheers,
> > > >
> > > > Kosntantin
> > > >
> > > > On Thu, Mar 26, 2020 at 6:49 PM Till Rohrmann <tr...@apache.org>
> > > > wrote:
> > > >
> > > > > Hi Yangze,
> > > > >
> > > > > thanks for creating this FLIP. I think it is a very good
> improvement
> > > > > helping our users and ourselves understanding better what's going
> on in
> > > > > Flink.
> > > > >
> > > > > Creating the ResourceIDs with host information/pod name is a good
> idea.
> > > > >
> > > > > Also deriving ExecutionGraph IDs from their superset ID is a good
> idea.
> > > > >
> > > > > The InstanceID is used for fencing purposes. I would not make it a
> > > > > composition of the ResourceID + a monotonically increasing number.
> The
> > > > > problem is that in case of a RM failure the InstanceIDs would start
> > > from
> > > > 0
> > > > > again and this could lead to collisions.
> > > > >
> > > > > Logging more information on how the different runtime IDs are
> > > correlated
> > > > is
> > > > > also a good idea.
> > > > >
> > > > > Two other ideas for simplifying the ids are the following:
> > > > >
> > > > > * The SlotRequestID was introduced because the SlotPool was a
> separate
> > > > > RpcEndpoint a while ago. With this no longer being the case I
> think we
> > > > > could remove the SlotRequestID and replace it with the
> AllocationID.
> > > > > * Instead of creating new SlotRequestIDs for multi task slots one
> could
> > > > > derive them from the SlotRequestID used for requesting the
> underlying
> > > > > AllocatedSlot.
> > > > >
> > > > > Given that the slot sharing logic will most likely be reworked
> with the
> > > > > pipelined region scheduling, we might be able to resolve these two
> > > points
> > > > > as part of the pipelined region scheduling effort.
> > > > >
> > > > > Cheers,
> > > > > Till
> > > > >
> > > > > On Thu, Mar 26, 2020 at 10:51 AM Yangze Guo <ka...@gmail.com>
> > > wrote:
> > > > >
> > > > > > Hi everyone,
> > > > > >
> > > > > > We would like to start a discussion thread on "FLIP-118: Improve
> > > > > > Flink’s ID system"[1].
> > > > > >
> > > > > > This FLIP mainly discusses the following issues, target to
> enhance
> > > the
> > > > > > readability of IDs in log and help user to debug in case of
> failures:
> > > > > >
> > > > > > - Enhance the readability of the string literals of IDs. Most of
> them
> > > > > > are hashcodes, e.g. ExecutionAttemptID, which do not provide much
> > > > > > meaningful information and are hard to recognize and compare for
> > > > > > users.
> > > > > > - Log the ID’s lineage information to make debugging more
> convenient.
> > > > > > Currently, the log fails to always show the lineage information
> > > > > > between IDs. Finding out relationships between entities
> identified by
> > > > > > given IDs is a common demand, e.g., slot of which AllocationID is
> > > > > > assigned to satisfy slot request of with SlotRequestID. Absence
> of
> > > > > > such lineage information, it’s impossible to track the end to end
> > > > > > lifecycle of an Execution or a Task now, which makes debugging
> > > > > > difficult.
> > > > > >
> > > > > > Key changes proposed in the FLIP are as follows:
> > > > > >
> > > > > > - Add location information to distributed components
> > > > > > - Add topology information to graph components
> > > > > > - Log the ID’s lineage information
> > > > > > - Expose the identifier of distributing component to user
> > > > > >
> > > > > > Please find more details in the FLIP wiki document [1]. Looking
> > > forward
> > > > > to
> > > > > > your feedbacks.
> > > > > >
> > > > > > [1]
> > > > > >
> > > > >
> > > >
> > >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=148643521
> > > > > >
> > > > > > Best,
> > > > > > Yangze Guo
> > > > > >
> > > > >
> > > >
> > > >
> > > > --
> > > >
> > > > Konstantin Knauf | Head of Product
> > > >
> > > > +49 160 91394525
> > > >
> > > >
> > > > Follow us @VervericaData Ververica <https://www.ververica.com/>
> > > >
> > > >
> > > > --
> > > >
> > > > Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> > > > Conference
> > > >
> > > > Stream Processing | Event Driven | Real Time
> > > >
> > > > --
> > > >
> > > > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
> > > >
> > > > --
> > > > Ververica GmbH
> > > > Registered at Amtsgericht Charlottenburg: HRB 158244 B
> > > > Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason,
> Ji
> > > > (Tony) Cheng
> > > >
> > >
>

Re: [DISCUSS] FLIP-118: Improve Flink’s ID system

Posted by Yangze Guo <ka...@gmail.com>.
Thank you all for the feedback! Sorry for the belated reply.

@Till
I'm +1 for your two ideas and I'd like to move these two out of the
scope of this FLIP since the pipelined region scheduling is an ongoing
work now.
I also agree that we should not make the InstanceID in
TaskExecutorConnection being composed of the ResourceID plus a
monotonically increasing value. Thanks a lot for your explanation.

@Konstantin @Yang
Regarding the PodName of TaskExecutor on K8s, I second Yang's
suggestion. It makes sense to me to let user export RESOURCE_ID and
make TM respect it. User needs to guarantee there is no collision for
different TM.

Best,
Yangze Guo


On Tue, Mar 31, 2020 at 12:25 AM Steven Wu <st...@gmail.com> wrote:
>
> +1 on allowing user defined resourceId for taskmanager
>
> On Sun, Mar 29, 2020 at 7:24 PM Yang Wang <da...@gmail.com> wrote:
>
> > Hi Konstantin,
> >
> > I think it is a good idea. Currently, our users also report a similar issue
> > with
> > resourceId of standalone cluster. When we start a standalone cluster now,
> > the `TaskManagerRunner` always generates a uuid for the resourceId. It will
> > be used to register to the jobmanager and not convenient to match with the
> > real
> > taskmanager, especially in container environment.
> >
> > I think a probably solution is we could support the user defined
> > resourceId.
> > We could get it from the environment. For standalone on K8s, we could set
> > the "RESOURCE_ID" env to the pod name so that it is easier to match the
> > taskmanager with K8s pod.
> >
> > Moreover, i am afraid we could not set the pod name to the resourceId. I
> > think
> > you could set the "deployment.meta.name". Since the pod name is generated
> > by
> > K8s in the pattern {deployment.meta.nane}-{rc.uuid}-{uuid}. On the
> > contrary, we
> > will set the resourceId to the pod name.
> >
> >
> > Best,
> > Yang
> >
> > Konstantin Knauf <ko...@ververica.com> 于2020年3月29日周日 下午8:06写道:
> >
> > > Hi Yangze, Hi Till,
> > >
> > > thanks you for working on this topic. I believe it will make debugging
> > > large Apache Flink deployments much more feasible.
> > >
> > > I was wondering whether it would make sense to allow the user to specify
> > > the Resource ID in standalone setups?  For example, many users still
> > > implicitly use standalone clusters on Kubernetes (the native support is
> > > still experimental) and in these cases it would be interesting to also
> > set
> > > the PodName as the ResourceID. What do you think?
> > >
> > > Cheers,
> > >
> > > Kosntantin
> > >
> > > On Thu, Mar 26, 2020 at 6:49 PM Till Rohrmann <tr...@apache.org>
> > > wrote:
> > >
> > > > Hi Yangze,
> > > >
> > > > thanks for creating this FLIP. I think it is a very good improvement
> > > > helping our users and ourselves understanding better what's going on in
> > > > Flink.
> > > >
> > > > Creating the ResourceIDs with host information/pod name is a good idea.
> > > >
> > > > Also deriving ExecutionGraph IDs from their superset ID is a good idea.
> > > >
> > > > The InstanceID is used for fencing purposes. I would not make it a
> > > > composition of the ResourceID + a monotonically increasing number. The
> > > > problem is that in case of a RM failure the InstanceIDs would start
> > from
> > > 0
> > > > again and this could lead to collisions.
> > > >
> > > > Logging more information on how the different runtime IDs are
> > correlated
> > > is
> > > > also a good idea.
> > > >
> > > > Two other ideas for simplifying the ids are the following:
> > > >
> > > > * The SlotRequestID was introduced because the SlotPool was a separate
> > > > RpcEndpoint a while ago. With this no longer being the case I think we
> > > > could remove the SlotRequestID and replace it with the AllocationID.
> > > > * Instead of creating new SlotRequestIDs for multi task slots one could
> > > > derive them from the SlotRequestID used for requesting the underlying
> > > > AllocatedSlot.
> > > >
> > > > Given that the slot sharing logic will most likely be reworked with the
> > > > pipelined region scheduling, we might be able to resolve these two
> > points
> > > > as part of the pipelined region scheduling effort.
> > > >
> > > > Cheers,
> > > > Till
> > > >
> > > > On Thu, Mar 26, 2020 at 10:51 AM Yangze Guo <ka...@gmail.com>
> > wrote:
> > > >
> > > > > Hi everyone,
> > > > >
> > > > > We would like to start a discussion thread on "FLIP-118: Improve
> > > > > Flink’s ID system"[1].
> > > > >
> > > > > This FLIP mainly discusses the following issues, target to enhance
> > the
> > > > > readability of IDs in log and help user to debug in case of failures:
> > > > >
> > > > > - Enhance the readability of the string literals of IDs. Most of them
> > > > > are hashcodes, e.g. ExecutionAttemptID, which do not provide much
> > > > > meaningful information and are hard to recognize and compare for
> > > > > users.
> > > > > - Log the ID’s lineage information to make debugging more convenient.
> > > > > Currently, the log fails to always show the lineage information
> > > > > between IDs. Finding out relationships between entities identified by
> > > > > given IDs is a common demand, e.g., slot of which AllocationID is
> > > > > assigned to satisfy slot request of with SlotRequestID. Absence of
> > > > > such lineage information, it’s impossible to track the end to end
> > > > > lifecycle of an Execution or a Task now, which makes debugging
> > > > > difficult.
> > > > >
> > > > > Key changes proposed in the FLIP are as follows:
> > > > >
> > > > > - Add location information to distributed components
> > > > > - Add topology information to graph components
> > > > > - Log the ID’s lineage information
> > > > > - Expose the identifier of distributing component to user
> > > > >
> > > > > Please find more details in the FLIP wiki document [1]. Looking
> > forward
> > > > to
> > > > > your feedbacks.
> > > > >
> > > > > [1]
> > > > >
> > > >
> > >
> > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=148643521
> > > > >
> > > > > Best,
> > > > > Yangze Guo
> > > > >
> > > >
> > >
> > >
> > > --
> > >
> > > Konstantin Knauf | Head of Product
> > >
> > > +49 160 91394525
> > >
> > >
> > > Follow us @VervericaData Ververica <https://www.ververica.com/>
> > >
> > >
> > > --
> > >
> > > Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> > > Conference
> > >
> > > Stream Processing | Event Driven | Real Time
> > >
> > > --
> > >
> > > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
> > >
> > > --
> > > Ververica GmbH
> > > Registered at Amtsgericht Charlottenburg: HRB 158244 B
> > > Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> > > (Tony) Cheng
> > >
> >

Re: [DISCUSS] FLIP-118: Improve Flink’s ID system

Posted by Steven Wu <st...@gmail.com>.
+1 on allowing user defined resourceId for taskmanager

On Sun, Mar 29, 2020 at 7:24 PM Yang Wang <da...@gmail.com> wrote:

> Hi Konstantin,
>
> I think it is a good idea. Currently, our users also report a similar issue
> with
> resourceId of standalone cluster. When we start a standalone cluster now,
> the `TaskManagerRunner` always generates a uuid for the resourceId. It will
> be used to register to the jobmanager and not convenient to match with the
> real
> taskmanager, especially in container environment.
>
> I think a probably solution is we could support the user defined
> resourceId.
> We could get it from the environment. For standalone on K8s, we could set
> the "RESOURCE_ID" env to the pod name so that it is easier to match the
> taskmanager with K8s pod.
>
> Moreover, i am afraid we could not set the pod name to the resourceId. I
> think
> you could set the "deployment.meta.name". Since the pod name is generated
> by
> K8s in the pattern {deployment.meta.nane}-{rc.uuid}-{uuid}. On the
> contrary, we
> will set the resourceId to the pod name.
>
>
> Best,
> Yang
>
> Konstantin Knauf <ko...@ververica.com> 于2020年3月29日周日 下午8:06写道:
>
> > Hi Yangze, Hi Till,
> >
> > thanks you for working on this topic. I believe it will make debugging
> > large Apache Flink deployments much more feasible.
> >
> > I was wondering whether it would make sense to allow the user to specify
> > the Resource ID in standalone setups?  For example, many users still
> > implicitly use standalone clusters on Kubernetes (the native support is
> > still experimental) and in these cases it would be interesting to also
> set
> > the PodName as the ResourceID. What do you think?
> >
> > Cheers,
> >
> > Kosntantin
> >
> > On Thu, Mar 26, 2020 at 6:49 PM Till Rohrmann <tr...@apache.org>
> > wrote:
> >
> > > Hi Yangze,
> > >
> > > thanks for creating this FLIP. I think it is a very good improvement
> > > helping our users and ourselves understanding better what's going on in
> > > Flink.
> > >
> > > Creating the ResourceIDs with host information/pod name is a good idea.
> > >
> > > Also deriving ExecutionGraph IDs from their superset ID is a good idea.
> > >
> > > The InstanceID is used for fencing purposes. I would not make it a
> > > composition of the ResourceID + a monotonically increasing number. The
> > > problem is that in case of a RM failure the InstanceIDs would start
> from
> > 0
> > > again and this could lead to collisions.
> > >
> > > Logging more information on how the different runtime IDs are
> correlated
> > is
> > > also a good idea.
> > >
> > > Two other ideas for simplifying the ids are the following:
> > >
> > > * The SlotRequestID was introduced because the SlotPool was a separate
> > > RpcEndpoint a while ago. With this no longer being the case I think we
> > > could remove the SlotRequestID and replace it with the AllocationID.
> > > * Instead of creating new SlotRequestIDs for multi task slots one could
> > > derive them from the SlotRequestID used for requesting the underlying
> > > AllocatedSlot.
> > >
> > > Given that the slot sharing logic will most likely be reworked with the
> > > pipelined region scheduling, we might be able to resolve these two
> points
> > > as part of the pipelined region scheduling effort.
> > >
> > > Cheers,
> > > Till
> > >
> > > On Thu, Mar 26, 2020 at 10:51 AM Yangze Guo <ka...@gmail.com>
> wrote:
> > >
> > > > Hi everyone,
> > > >
> > > > We would like to start a discussion thread on "FLIP-118: Improve
> > > > Flink’s ID system"[1].
> > > >
> > > > This FLIP mainly discusses the following issues, target to enhance
> the
> > > > readability of IDs in log and help user to debug in case of failures:
> > > >
> > > > - Enhance the readability of the string literals of IDs. Most of them
> > > > are hashcodes, e.g. ExecutionAttemptID, which do not provide much
> > > > meaningful information and are hard to recognize and compare for
> > > > users.
> > > > - Log the ID’s lineage information to make debugging more convenient.
> > > > Currently, the log fails to always show the lineage information
> > > > between IDs. Finding out relationships between entities identified by
> > > > given IDs is a common demand, e.g., slot of which AllocationID is
> > > > assigned to satisfy slot request of with SlotRequestID. Absence of
> > > > such lineage information, it’s impossible to track the end to end
> > > > lifecycle of an Execution or a Task now, which makes debugging
> > > > difficult.
> > > >
> > > > Key changes proposed in the FLIP are as follows:
> > > >
> > > > - Add location information to distributed components
> > > > - Add topology information to graph components
> > > > - Log the ID’s lineage information
> > > > - Expose the identifier of distributing component to user
> > > >
> > > > Please find more details in the FLIP wiki document [1]. Looking
> forward
> > > to
> > > > your feedbacks.
> > > >
> > > > [1]
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=148643521
> > > >
> > > > Best,
> > > > Yangze Guo
> > > >
> > >
> >
> >
> > --
> >
> > Konstantin Knauf | Head of Product
> >
> > +49 160 91394525
> >
> >
> > Follow us @VervericaData Ververica <https://www.ververica.com/>
> >
> >
> > --
> >
> > Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> > Conference
> >
> > Stream Processing | Event Driven | Real Time
> >
> > --
> >
> > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
> >
> > --
> > Ververica GmbH
> > Registered at Amtsgericht Charlottenburg: HRB 158244 B
> > Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> > (Tony) Cheng
> >
>

Re: [DISCUSS] FLIP-118: Improve Flink’s ID system

Posted by Yang Wang <da...@gmail.com>.
Hi Konstantin,

I think it is a good idea. Currently, our users also report a similar issue
with
resourceId of standalone cluster. When we start a standalone cluster now,
the `TaskManagerRunner` always generates a uuid for the resourceId. It will
be used to register to the jobmanager and not convenient to match with the
real
taskmanager, especially in container environment.

I think a probably solution is we could support the user defined resourceId.
We could get it from the environment. For standalone on K8s, we could set
the "RESOURCE_ID" env to the pod name so that it is easier to match the
taskmanager with K8s pod.

Moreover, i am afraid we could not set the pod name to the resourceId. I
think
you could set the "deployment.meta.name". Since the pod name is generated by
K8s in the pattern {deployment.meta.nane}-{rc.uuid}-{uuid}. On the
contrary, we
will set the resourceId to the pod name.


Best,
Yang

Konstantin Knauf <ko...@ververica.com> 于2020年3月29日周日 下午8:06写道:

> Hi Yangze, Hi Till,
>
> thanks you for working on this topic. I believe it will make debugging
> large Apache Flink deployments much more feasible.
>
> I was wondering whether it would make sense to allow the user to specify
> the Resource ID in standalone setups?  For example, many users still
> implicitly use standalone clusters on Kubernetes (the native support is
> still experimental) and in these cases it would be interesting to also set
> the PodName as the ResourceID. What do you think?
>
> Cheers,
>
> Kosntantin
>
> On Thu, Mar 26, 2020 at 6:49 PM Till Rohrmann <tr...@apache.org>
> wrote:
>
> > Hi Yangze,
> >
> > thanks for creating this FLIP. I think it is a very good improvement
> > helping our users and ourselves understanding better what's going on in
> > Flink.
> >
> > Creating the ResourceIDs with host information/pod name is a good idea.
> >
> > Also deriving ExecutionGraph IDs from their superset ID is a good idea.
> >
> > The InstanceID is used for fencing purposes. I would not make it a
> > composition of the ResourceID + a monotonically increasing number. The
> > problem is that in case of a RM failure the InstanceIDs would start from
> 0
> > again and this could lead to collisions.
> >
> > Logging more information on how the different runtime IDs are correlated
> is
> > also a good idea.
> >
> > Two other ideas for simplifying the ids are the following:
> >
> > * The SlotRequestID was introduced because the SlotPool was a separate
> > RpcEndpoint a while ago. With this no longer being the case I think we
> > could remove the SlotRequestID and replace it with the AllocationID.
> > * Instead of creating new SlotRequestIDs for multi task slots one could
> > derive them from the SlotRequestID used for requesting the underlying
> > AllocatedSlot.
> >
> > Given that the slot sharing logic will most likely be reworked with the
> > pipelined region scheduling, we might be able to resolve these two points
> > as part of the pipelined region scheduling effort.
> >
> > Cheers,
> > Till
> >
> > On Thu, Mar 26, 2020 at 10:51 AM Yangze Guo <ka...@gmail.com> wrote:
> >
> > > Hi everyone,
> > >
> > > We would like to start a discussion thread on "FLIP-118: Improve
> > > Flink’s ID system"[1].
> > >
> > > This FLIP mainly discusses the following issues, target to enhance the
> > > readability of IDs in log and help user to debug in case of failures:
> > >
> > > - Enhance the readability of the string literals of IDs. Most of them
> > > are hashcodes, e.g. ExecutionAttemptID, which do not provide much
> > > meaningful information and are hard to recognize and compare for
> > > users.
> > > - Log the ID’s lineage information to make debugging more convenient.
> > > Currently, the log fails to always show the lineage information
> > > between IDs. Finding out relationships between entities identified by
> > > given IDs is a common demand, e.g., slot of which AllocationID is
> > > assigned to satisfy slot request of with SlotRequestID. Absence of
> > > such lineage information, it’s impossible to track the end to end
> > > lifecycle of an Execution or a Task now, which makes debugging
> > > difficult.
> > >
> > > Key changes proposed in the FLIP are as follows:
> > >
> > > - Add location information to distributed components
> > > - Add topology information to graph components
> > > - Log the ID’s lineage information
> > > - Expose the identifier of distributing component to user
> > >
> > > Please find more details in the FLIP wiki document [1]. Looking forward
> > to
> > > your feedbacks.
> > >
> > > [1]
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=148643521
> > >
> > > Best,
> > > Yangze Guo
> > >
> >
>
>
> --
>
> Konstantin Knauf | Head of Product
>
> +49 160 91394525
>
>
> Follow us @VervericaData Ververica <https://www.ververica.com/>
>
>
> --
>
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Tony) Cheng
>

Re: [DISCUSS] FLIP-118: Improve Flink’s ID system

Posted by Konstantin Knauf <ko...@ververica.com>.
Hi Yangze, Hi Till,

thanks you for working on this topic. I believe it will make debugging
large Apache Flink deployments much more feasible.

I was wondering whether it would make sense to allow the user to specify
the Resource ID in standalone setups?  For example, many users still
implicitly use standalone clusters on Kubernetes (the native support is
still experimental) and in these cases it would be interesting to also set
the PodName as the ResourceID. What do you think?

Cheers,

Kosntantin

On Thu, Mar 26, 2020 at 6:49 PM Till Rohrmann <tr...@apache.org> wrote:

> Hi Yangze,
>
> thanks for creating this FLIP. I think it is a very good improvement
> helping our users and ourselves understanding better what's going on in
> Flink.
>
> Creating the ResourceIDs with host information/pod name is a good idea.
>
> Also deriving ExecutionGraph IDs from their superset ID is a good idea.
>
> The InstanceID is used for fencing purposes. I would not make it a
> composition of the ResourceID + a monotonically increasing number. The
> problem is that in case of a RM failure the InstanceIDs would start from 0
> again and this could lead to collisions.
>
> Logging more information on how the different runtime IDs are correlated is
> also a good idea.
>
> Two other ideas for simplifying the ids are the following:
>
> * The SlotRequestID was introduced because the SlotPool was a separate
> RpcEndpoint a while ago. With this no longer being the case I think we
> could remove the SlotRequestID and replace it with the AllocationID.
> * Instead of creating new SlotRequestIDs for multi task slots one could
> derive them from the SlotRequestID used for requesting the underlying
> AllocatedSlot.
>
> Given that the slot sharing logic will most likely be reworked with the
> pipelined region scheduling, we might be able to resolve these two points
> as part of the pipelined region scheduling effort.
>
> Cheers,
> Till
>
> On Thu, Mar 26, 2020 at 10:51 AM Yangze Guo <ka...@gmail.com> wrote:
>
> > Hi everyone,
> >
> > We would like to start a discussion thread on "FLIP-118: Improve
> > Flink’s ID system"[1].
> >
> > This FLIP mainly discusses the following issues, target to enhance the
> > readability of IDs in log and help user to debug in case of failures:
> >
> > - Enhance the readability of the string literals of IDs. Most of them
> > are hashcodes, e.g. ExecutionAttemptID, which do not provide much
> > meaningful information and are hard to recognize and compare for
> > users.
> > - Log the ID’s lineage information to make debugging more convenient.
> > Currently, the log fails to always show the lineage information
> > between IDs. Finding out relationships between entities identified by
> > given IDs is a common demand, e.g., slot of which AllocationID is
> > assigned to satisfy slot request of with SlotRequestID. Absence of
> > such lineage information, it’s impossible to track the end to end
> > lifecycle of an Execution or a Task now, which makes debugging
> > difficult.
> >
> > Key changes proposed in the FLIP are as follows:
> >
> > - Add location information to distributed components
> > - Add topology information to graph components
> > - Log the ID’s lineage information
> > - Expose the identifier of distributing component to user
> >
> > Please find more details in the FLIP wiki document [1]. Looking forward
> to
> > your feedbacks.
> >
> > [1]
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=148643521
> >
> > Best,
> > Yangze Guo
> >
>


-- 

Konstantin Knauf | Head of Product

+49 160 91394525


Follow us @VervericaData Ververica <https://www.ververica.com/>


--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Tony) Cheng

Re: [DISCUSS] FLIP-118: Improve Flink’s ID system

Posted by Till Rohrmann <tr...@apache.org>.
Hi Yangze,

thanks for creating this FLIP. I think it is a very good improvement
helping our users and ourselves understanding better what's going on in
Flink.

Creating the ResourceIDs with host information/pod name is a good idea.

Also deriving ExecutionGraph IDs from their superset ID is a good idea.

The InstanceID is used for fencing purposes. I would not make it a
composition of the ResourceID + a monotonically increasing number. The
problem is that in case of a RM failure the InstanceIDs would start from 0
again and this could lead to collisions.

Logging more information on how the different runtime IDs are correlated is
also a good idea.

Two other ideas for simplifying the ids are the following:

* The SlotRequestID was introduced because the SlotPool was a separate
RpcEndpoint a while ago. With this no longer being the case I think we
could remove the SlotRequestID and replace it with the AllocationID.
* Instead of creating new SlotRequestIDs for multi task slots one could
derive them from the SlotRequestID used for requesting the underlying
AllocatedSlot.

Given that the slot sharing logic will most likely be reworked with the
pipelined region scheduling, we might be able to resolve these two points
as part of the pipelined region scheduling effort.

Cheers,
Till

On Thu, Mar 26, 2020 at 10:51 AM Yangze Guo <ka...@gmail.com> wrote:

> Hi everyone,
>
> We would like to start a discussion thread on "FLIP-118: Improve
> Flink’s ID system"[1].
>
> This FLIP mainly discusses the following issues, target to enhance the
> readability of IDs in log and help user to debug in case of failures:
>
> - Enhance the readability of the string literals of IDs. Most of them
> are hashcodes, e.g. ExecutionAttemptID, which do not provide much
> meaningful information and are hard to recognize and compare for
> users.
> - Log the ID’s lineage information to make debugging more convenient.
> Currently, the log fails to always show the lineage information
> between IDs. Finding out relationships between entities identified by
> given IDs is a common demand, e.g., slot of which AllocationID is
> assigned to satisfy slot request of with SlotRequestID. Absence of
> such lineage information, it’s impossible to track the end to end
> lifecycle of an Execution or a Task now, which makes debugging
> difficult.
>
> Key changes proposed in the FLIP are as follows:
>
> - Add location information to distributed components
> - Add topology information to graph components
> - Log the ID’s lineage information
> - Expose the identifier of distributing component to user
>
> Please find more details in the FLIP wiki document [1]. Looking forward to
> your feedbacks.
>
> [1]
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=148643521
>
> Best,
> Yangze Guo
>