You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Yingjie Cao <ke...@gmail.com> on 2021/06/11 08:14:47 UTC

[DISCUSS] Lifecycle of ShuffleMaster and its Relationship with JobMaster and PartitionTracker

Hi devs,

I'd like to start a discussion about "Lifecycle of ShuffleMaster and its
Relationship with JobMaster and PartitionTracker". (These are things we
found when moving our external shuffle to the pluggable shuffle service
framework.)

The mail client may fail to display the right format. If so, please refer
to this document:
https://docs.google.com/document/d/1_cHoapNbx_fJ7ZNraSqw4ZK1hMRiWWJDITuSZrdMDDs/edit?usp=sharing
.
Lifecycle of ShuffleMaster

Currently, the lifecycle of ShuffleMaster seems unclear.  The
ShuffleServiceFactory is loaded for each JobMaster instance and then
ShuffleServiceFactory#createShuffleMaster will be called to create a
ShuffleMaser instance. However, the default NettyShuffleServiceFactory
always returns the same ShuffleMaser singleton instance for all jobs. Based
on the current implementation, the lifecycle of ShuffleMaster seems open
and depends on the shuffle plugin themselves. However, at the TM side,
the ShuffleEnvironment
is a part of the TaskManagerServices whose lifecycle is decoupled with jobs
which is more like a service. It means there is also an inconsistency
between the TM side and the JM side.

From my understanding, the reason for this is that the pluggable shuffle
framework is still not completely finished yet, for example, there is a
follow up umbrella ticket  FLINK-19551
<https://issues.apache.org/jira/browse/FLINK-19551> for the pluggable
shuffle service framework and in its subtasks, there is one task (
FLINK-12731 <https://issues.apache.org/jira/browse/FLINK-12731>) which aims
to load shuffle plugin with the PluginManager. I think this can solve the
issue mentioned above. After the corresponding factory  loaded by the
PluginManager, all ShuffleMaster instances can be stored in a map indexed
by the corresponding factory class name  which can be shared by all jobs.
After that, the ShuffleMaster becomes a cluster level service which is
consistent with the ShuffleEnvironment at the TM side.

As a summary, we propose to finish FLINK-12731
<https://issues.apache.org/jira/browse/FLINK-12731> and make the shuffle
service a real cluster level service first. Furthermore, we add two
lifecycle methods to the ShuffleMaster interface, including start and
close responding
for initialization (for example, contacting the external system) and
graceful shutdown (for example, releasing the resources) respectively
(these methods already exist in the ShuffleEnvironment interface at the TM
side). What do you think?
Relationship of ShuffleMaster & JobMaster

Currently, JobMaster holds the reference to the corresponding ShuffleMaster
and it can register partitions (allocate ShuffleDescriptor from) to
ShuffleMaster
by the registerPartitionWithProducer method. To support use cases like
allocating external resources when a job starts and releasing all allocated
resources when a job terminates, we may also need some job level
initialization and finalization. These job level initialization and
finalization are also helpful when serving multiple jobs simultaneously.

As a summary,  we propose to add two job level lifecycle methods registerJob
and unregisterJob responding for job level shuffle initialization and
finalization, for example, releasing all external resources occupied by the
corresponding job. What do you think?
Relationship of ShuffleMaster & PartitionTracker

Currently, the JobMasterPartitionTracker can release external result
partitions through the releasePartitionExternally method of ShuffleMaster.
However, the shuffle plugin (ShuffleMaster) may also need the ability of
stopping  tracking some partitions depending on the status of the external
services, for example, if the external storage node which stores some
partitions crashes, we need to stop tracking all partitions in it to avoid
reproducing the lost partitions one by one. By introducing something like
ShuffleContext which delegates to the partition tracker, this requirement
can be easily satisfied. Besides, for cluster partitions, we also need to
have the ability to release them.

As a summary, we propose to add a releaseDataSetExternally method to
the ShuffleMaster
interface which is responsible for releasing cluster partitions. Besides,
we propose to add a ShuffleContext which can delegate to the
PartitionTracker and stop tracking partitions. For the cluster partitions
and job partitions, two separated ShuffleContext abstracts are needed.
What do you think?
Interface Change Summary

As discussed in the above sections, we propose to make some interface
changes around the ShuffleMaster interface. The first change is to
pass a ShuffleMasterContex
instance to the ShuffleServiceFactory when creating the ShuffleMaster just
like the ShuffleEnvironment creation at the TM side. Changes are marked
with bold texts (the same below).

public interface ShuffleServiceFactory<
        SD extends ShuffleDescriptor, P extends ResultPartitionWriter, G
extends IndexedInputGate> {

    /**
    * Factory method to create a specific {@link ShuffleMaster}
implementation.
    */
    ShuffleMaster<SD> createShuffleMaster(ShuffleMasterContext
shuffleMasterContext);

    /**
    * Factory method to create a specific local {@link ShuffleEnvironment}
implementation.
    */
    ShuffleEnvironment<P, G> createShuffleEnvironment(
            ShuffleEnvironmentContext shuffleEnvironmentContext);
}

The following  is the ShuffleMasterContext interface. It will be
implemented by the pluggable shuffle framework itself and can be used by
the shuffle plugin. A context Interface is more friendly if we want to
extend it in the future.

public interface ShuffleMasterContext {

    /** Gets the cluster configuration. */
    Configuration getConfiguration();

    /** Handles the fatal error if any. */
    void onFatalError(Throwable throwable);

    /**
    * Stops tracking the target dataset (cluster partitions), which means
these data can not be reused anymore.
    */
    CompletableFuture<Void> stopTrackingDataSet(IntermediateDataSetID
dataSetID);

    /** Returns IDs of all datasets (cluster partitions) being tracked by
this cluster currently. */
    CompletableFuture<List<IntermediateDataSetID>> listDataSets();
}

The second part to be enhanced is the ShuffleMaster interface. Methods to
be added include start, close, registerJob, unregisterJob and
releaseDataSetExternally. In addition, because each ShuffleMaster instance
can serve multiple jobs simultaneously, when registering partitions, one
should also provide the corresponding JobID. The following shows the
updated ShuffleMaster interface:

public interface ShuffleMaster<T extends ShuffleDescriptor> extends
AutoCloseable {

    /**
    * Starts this shuffle master, for example getting the access and
connecting to the external
    * system.
    */
    void start() throws Exception;

    /** Closes this shuffle master which releases all resources. */
    void close() throws Exception;

    /** Registers the target job to this shuffle master. */
    void registerJob(JobShuffleContext context);

    /** Unregisters the target job from this shuffle master. */
    void unregisterJob(JobID jobID);

    /** Asynchronously register a partition and its producer with the
shuffle service. */
    CompletableFuture<T> registerPartitionWithProducer(
            JobID jobID,
            PartitionDescriptor partitionDescriptor,
            ProducerDescriptor producerDescriptor);

    /** Releases any external resources occupied by the given partition. */
    void releasePartitionExternally(ShuffleDescriptor shuffleDescriptor);

    /** Releases the target cluster partitions stored externally if any. */
    void releaseDataSetExternally(IntermediateDataSetID dataSetID);
}

The following  is the JobShuffleContext interface. It will be implemented
by the pluggable shuffle framework itself and can be used by the shuffle
plugin.

public interface JobShuffleContext {

    /** Gets the corresponding job configuration. */
    Configuration getConfiguration();

    /** Gets the corresponding {@link JobID}. */
    JobID getJobID();

    /**
    * Stops tracking the target result partitions, which means these
partitions will be reproduced if used afterwards.
    */
    CompletableFuture<Void>
stopTrackingPartitions(Collection<ResultPartitionID>
partitionIDS);

    /** Returns information of all partitions being tracked for the current
job. */
    CompletableFuture<List<ResultPartitionDeploymentDescriptor>>
listPartitions();
}

What do you think of these changes? Any feedback is highly appreciated.

Re: [DISCUSS] Lifecycle of ShuffleMaster and its Relationship with JobMaster and PartitionTracker

Posted by Yingjie Cao <ke...@gmail.com>.
Hi Zhu,

Thanks for the reply.

> One question which might be out of the scope is that whether we should do
similar things for ShuffleEnvironment?

I agree we should also consider fatal error handling for ShuffleEnvironment
eventually.

> I think the proposal does not conflict with this target. One idea in my
mind is to maintain multiple different ShuffleServices in the
Dispatcher/JobManagerSharedServices and let them be shared between
different jobs. Each job should be configured with a key which
points to a ShuffleService. The key should be used by both the scheduler
and tasks on task managers to select their respective
ShuffleMaster/ShuffleEnvironment. This will need work both on the master
and on the worker. Currently the worker will launch a ShuffleEnvironment
shared between different tasks which can be from different jobs. But only
one single ShuffleEnvironment will be created on each task manager.

Yes, exactly. We need to do more to support that and the proposal mentioned
in you comment makes a lot of sense.

Best,
Yingjie

Zhu Zhu <re...@gmail.com> 于2021年7月9日周五 上午11:53写道:

> Thanks for starting this discussion.
> Here are some of my thoughts regarding the proposal and discussions
> above.
>
> *+1 to enable ShuffleMaster to stop track partitions proactively*
> In production we have encountered problems that it needs *hours* to
> recover from a remote shuffle worker lost problem. Because the lost
> finished partitions cannot be detected and reproduced all at once.
> This improvement can help to solve this problem.
>
>
> *+1 to make ShuffleMaster a cluster level component*
> This helps to avoid maintain multiple clients and connections to the
> same remote shuffle service. It also makes it possible to support
> external cluster partitions in the future.
>
> *+1 to enable ShuffleMaster to notify master about its internal *
> *non-recoverable error*
> The scheduler can keep failing or hang due to ShuffleMaster internal
> non-recoverable error. Currently this kind of problem cannot be
> auto-recovered and are hard to diagnose.
> One question which might be out of the scope is that whether we
> should do similar things for ShuffleEnvironment?
>
>
> *+1 that the abstraction should be able to support different jobs to*
> *use different ShuffleServices eventually*
> I think the proposal does not conflict with this target.
> One idea in my mind is to maintain multiple different ShuffleServices
> in the Dispatcher/JobManagerSharedServices and let them be shared
> between different jobs. Each job should be configured with a key which
> points to a ShuffleService. The key should be used by both the scheduler
> and tasks on task managers to select their respective
> ShuffleMaster/ShuffleEnvironment. This will need work both on the master
> and on the worker. Currently the worker will launch a ShuffleEnvironment
> shared between different tasks which can be from different jobs. But only
> one single ShuffleEnvironment will be created on each task manager.
>
> Thanks,
> Zhu
>
> Yingjie Cao <ke...@gmail.com> 于2021年7月8日周四 上午11:43写道:
>
> > Hi,
> >
> > Thanks for the reply.
> >
> > @Guowei
> > I agree that we can move forward step by step and start from the most
> > important part. Apart from the two points mentioned in your reply,
> > initializing and shutting down some external resources gracefully is also
> > important which is a reason for the open/close method.
> > About the cluster partitions and the ShuffleMasterContext, I agree that
> we
> > can postpone handling the cluster partitions because we need to do more
> to
> > support it, for ShuffleMasterContext, I think we still need it even we do
> > not support the cluster partitions in the first step. Currently, the
> > shuffle master can only access the cluster configuration, except that, I
> > think we also need need the ability of handling the fatal errors
> occurring
> > in the  ShuffleMaster gracefully by propagate the errors to the
> framework.
> > By introducing the ShuffleMasterContext, we can give ShuffleMaster the
> > ability to access both the  cluster configuration and the fatal error
> > handler. Instead of passing these components directly to the
> ShuffleMaster,
> > a ShuffleMasterContext interface can keep compatibility easily in the
> > future. Even we add some new method in the future, we can offer default
> > empty implementation in the interface which can keep compatibility.
> > About the JobShuffleContext::getConfiguration/listPartitions methods, I
> > agree that we can remove them in the first step and we can add them back
> > latter. As mentioned above, we can easily keep compatibility based on the
> > Context interface.
> >
> > @Till
> > I totally agree that we should support that different jobs use different
> > shuffle services and the proposed solution will support this use case
> > eventually.
> >
> > Best,
> > Yingjie
> >
> > Till Rohrmann <tr...@apache.org> 于2021年7月7日周三 下午8:15写道:
> >
> > > One quick comment: When developing the ShuffleService abstraction we
> also
> > > thought that different jobs might want to use different ShuffleServices
> > > depending on their workload (e.g. batch vs. streaming workload). So
> > > ideally, the chosen solution here can also support this use case
> > > eventually.
> > >
> > > Cheers,
> > > Till
> > >
> > > On Wed, Jul 7, 2021 at 12:50 PM Guowei Ma <gu...@gmail.com>
> wrote:
> > >
> > > > Hi,
> > > > Thank Yingjie for initiating this discussion. What I understand that
> > the
> > > > document[1] actually mainly discusses two issues:
> > > > 1. ShuffleMaster should be at the cluster level instead of the job
> > level
> > > > 2. ShuffleMaster should notify PartitionTracker that some data has
> been
> > > > lost
> > > >
> > > > Relatively speaking, I think the second problem is more serious.
> > Because
> > > > for external or remote batch shuffling services, after the machine
> > > storing
> > > > shuffled data goes offline, PartitionTracker needs to be notified in
> > time
> > > > to avoid repeated failures of the job. Therefore, it is hoped that
> when
> > > > shuffle data goes offline due to a machine error, ShuffleMaster can
> > > notify
> > > > the PartitionTracker in time. This requires ShuffleMaster to notify
> the
> > > > PartitionTracker with a handle such as JobShuffleContext.
> > > >
> > > > So how to pass JobShuffleContext to ShuffleMaster? There are two
> > options:
> > > > 1. After ShuffleMaster is created, pass JobShuffleContext to
> > > ShuffleMaster,
> > > > such as ShuffleMaster::register(JobShuffleContext)
> > > > 2. Pass ShuffleServiceFactory when creating ShuffleMaster, such as
> > > > ShuffleServiceFatroctry.createcreateShuffleMaster(JobShuffleContext).
> > > >
> > > > Which one to choose is actually related to issue 1. Because if
> > > > ShuffleMaster is a cluster level, you should choose option 1,
> > otherwise,
> > > > choose option 2. I think ShuffleMaster should be at the cluster
> level,
> > > for
> > > > example, because we don't need to maintain a ShuffleMaster for each
> job
> > > in
> > > > a SessionCluster; in addition, this ShuffleMaster should also be used
> > by
> > > > RM's PartitionTracker in the future. Therefore, I think Option 1 is
> > more
> > > > appropriate.
> > > >
> > > > To sum up, we may give priority to solving problem 2, while taking
> into
> > > > account that ShuffleMaster should be a cluster-level component.
> > > Therefore,
> > > > I think we could ignore the date ShuffleMasterContext at the
> beginning;
> > > at
> > > > the same time, JobShuffleContext::getConfiguration/listPartitions
> > should
> > > > not be needed.
> > > >
> > > > [1]
> > > >
> > > >
> > >
> >
> https://docs.google.com/document/d/1_cHoapNbx_fJ7ZNraSqw4ZK1hMRiWWJDITuSZrdMDDs/edit
> > > >
> > > > Best,
> > > > Guowei
> > > >
> > > >
> > > > On Fri, Jun 11, 2021 at 4:15 PM Yingjie Cao <kevin.yingjie@gmail.com
> >
> > > > wrote:
> > > >
> > > > > Hi devs,
> > > > >
> > > > > I'd like to start a discussion about "Lifecycle of ShuffleMaster
> and
> > > its
> > > > > Relationship with JobMaster and PartitionTracker". (These are
> things
> > we
> > > > > found when moving our external shuffle to the pluggable shuffle
> > service
> > > > > framework.)
> > > > >
> > > > > The mail client may fail to display the right format. If so, please
> > > refer
> > > > > to this document:
> > > > >
> > > > >
> > > >
> > >
> >
> https://docs.google.com/document/d/1_cHoapNbx_fJ7ZNraSqw4ZK1hMRiWWJDITuSZrdMDDs/edit?usp=sharing
> > > > > .
> > > > > Lifecycle of ShuffleMaster
> > > > >
> > > > > Currently, the lifecycle of ShuffleMaster seems unclear.  The
> > > > > ShuffleServiceFactory is loaded for each JobMaster instance and
> then
> > > > > ShuffleServiceFactory#createShuffleMaster will be called to create
> a
> > > > > ShuffleMaser instance. However, the default
> > NettyShuffleServiceFactory
> > > > > always returns the same ShuffleMaser singleton instance for all
> jobs.
> > > > Based
> > > > > on the current implementation, the lifecycle of ShuffleMaster seems
> > > open
> > > > > and depends on the shuffle plugin themselves. However, at the TM
> > side,
> > > > > the ShuffleEnvironment
> > > > > is a part of the TaskManagerServices whose lifecycle is decoupled
> > with
> > > > jobs
> > > > > which is more like a service. It means there is also an
> inconsistency
> > > > > between the TM side and the JM side.
> > > > >
> > > > > From my understanding, the reason for this is that the pluggable
> > > shuffle
> > > > > framework is still not completely finished yet, for example, there
> > is a
> > > > > follow up umbrella ticket  FLINK-19551
> > > > > <https://issues.apache.org/jira/browse/FLINK-19551> for the
> > pluggable
> > > > > shuffle service framework and in its subtasks, there is one task (
> > > > > FLINK-12731 <https://issues.apache.org/jira/browse/FLINK-12731>)
> > which
> > > > > aims
> > > > > to load shuffle plugin with the PluginManager. I think this can
> solve
> > > the
> > > > > issue mentioned above. After the corresponding factory  loaded by
> the
> > > > > PluginManager, all ShuffleMaster instances can be stored in a map
> > > indexed
> > > > > by the corresponding factory class name  which can be shared by all
> > > jobs.
> > > > > After that, the ShuffleMaster becomes a cluster level service which
> > is
> > > > > consistent with the ShuffleEnvironment at the TM side.
> > > > >
> > > > > As a summary, we propose to finish FLINK-12731
> > > > > <https://issues.apache.org/jira/browse/FLINK-12731> and make the
> > > shuffle
> > > > > service a real cluster level service first. Furthermore, we add two
> > > > > lifecycle methods to the ShuffleMaster interface, including start
> and
> > > > > close responding
> > > > > for initialization (for example, contacting the external system)
> and
> > > > > graceful shutdown (for example, releasing the resources)
> respectively
> > > > > (these methods already exist in the ShuffleEnvironment interface at
> > the
> > > > TM
> > > > > side). What do you think?
> > > > > Relationship of ShuffleMaster & JobMaster
> > > > >
> > > > > Currently, JobMaster holds the reference to the corresponding
> > > > ShuffleMaster
> > > > > and it can register partitions (allocate ShuffleDescriptor from) to
> > > > > ShuffleMaster
> > > > > by the registerPartitionWithProducer method. To support use cases
> > like
> > > > > allocating external resources when a job starts and releasing all
> > > > allocated
> > > > > resources when a job terminates, we may also need some job level
> > > > > initialization and finalization. These job level initialization and
> > > > > finalization are also helpful when serving multiple jobs
> > > simultaneously.
> > > > >
> > > > > As a summary,  we propose to add two job level lifecycle methods
> > > > > registerJob
> > > > > and unregisterJob responding for job level shuffle initialization
> and
> > > > > finalization, for example, releasing all external resources
> occupied
> > by
> > > > the
> > > > > corresponding job. What do you think?
> > > > > Relationship of ShuffleMaster & PartitionTracker
> > > > >
> > > > > Currently, the JobMasterPartitionTracker can release external
> result
> > > > > partitions through the releasePartitionExternally method of
> > > > ShuffleMaster.
> > > > > However, the shuffle plugin (ShuffleMaster) may also need the
> ability
> > > of
> > > > > stopping  tracking some partitions depending on the status of the
> > > > external
> > > > > services, for example, if the external storage node which stores
> some
> > > > > partitions crashes, we need to stop tracking all partitions in it
> to
> > > > avoid
> > > > > reproducing the lost partitions one by one. By introducing
> something
> > > like
> > > > > ShuffleContext which delegates to the partition tracker, this
> > > requirement
> > > > > can be easily satisfied. Besides, for cluster partitions, we also
> > need
> > > to
> > > > > have the ability to release them.
> > > > >
> > > > > As a summary, we propose to add a releaseDataSetExternally method
> to
> > > > > the ShuffleMaster
> > > > > interface which is responsible for releasing cluster partitions.
> > > Besides,
> > > > > we propose to add a ShuffleContext which can delegate to the
> > > > > PartitionTracker and stop tracking partitions. For the cluster
> > > partitions
> > > > > and job partitions, two separated ShuffleContext abstracts are
> > needed.
> > > > > What do you think?
> > > > > Interface Change Summary
> > > > >
> > > > > As discussed in the above sections, we propose to make some
> interface
> > > > > changes around the ShuffleMaster interface. The first change is to
> > > > > pass a ShuffleMasterContex
> > > > > instance to the ShuffleServiceFactory when creating the
> ShuffleMaster
> > > > just
> > > > > like the ShuffleEnvironment creation at the TM side. Changes are
> > marked
> > > > > with bold texts (the same below).
> > > > >
> > > > > public interface ShuffleServiceFactory<
> > > > >         SD extends ShuffleDescriptor, P extends
> > ResultPartitionWriter,
> > > G
> > > > > extends IndexedInputGate> {
> > > > >
> > > > >     /**
> > > > >     * Factory method to create a specific {@link ShuffleMaster}
> > > > > implementation.
> > > > >     */
> > > > >     ShuffleMaster<SD> createShuffleMaster(ShuffleMasterContext
> > > > > shuffleMasterContext);
> > > > >
> > > > >     /**
> > > > >     * Factory method to create a specific local {@link
> > > > ShuffleEnvironment}
> > > > > implementation.
> > > > >     */
> > > > >     ShuffleEnvironment<P, G> createShuffleEnvironment(
> > > > >             ShuffleEnvironmentContext shuffleEnvironmentContext);
> > > > > }
> > > > >
> > > > > The following  is the ShuffleMasterContext interface. It will be
> > > > > implemented by the pluggable shuffle framework itself and can be
> used
> > > by
> > > > > the shuffle plugin. A context Interface is more friendly if we want
> > to
> > > > > extend it in the future.
> > > > >
> > > > > public interface ShuffleMasterContext {
> > > > >
> > > > >     /** Gets the cluster configuration. */
> > > > >     Configuration getConfiguration();
> > > > >
> > > > >     /** Handles the fatal error if any. */
> > > > >     void onFatalError(Throwable throwable);
> > > > >
> > > > >     /**
> > > > >     * Stops tracking the target dataset (cluster partitions), which
> > > means
> > > > > these data can not be reused anymore.
> > > > >     */
> > > > >     CompletableFuture<Void>
> stopTrackingDataSet(IntermediateDataSetID
> > > > > dataSetID);
> > > > >
> > > > >     /** Returns IDs of all datasets (cluster partitions) being
> > tracked
> > > by
> > > > > this cluster currently. */
> > > > >     CompletableFuture<List<IntermediateDataSetID>> listDataSets();
> > > > > }
> > > > >
> > > > > The second part to be enhanced is the ShuffleMaster interface.
> > Methods
> > > to
> > > > > be added include start, close, registerJob, unregisterJob and
> > > > > releaseDataSetExternally. In addition, because each ShuffleMaster
> > > > instance
> > > > > can serve multiple jobs simultaneously, when registering
> partitions,
> > > one
> > > > > should also provide the corresponding JobID. The following shows
> the
> > > > > updated ShuffleMaster interface:
> > > > >
> > > > > public interface ShuffleMaster<T extends ShuffleDescriptor> extends
> > > > > AutoCloseable {
> > > > >
> > > > >     /**
> > > > >     * Starts this shuffle master, for example getting the access
> and
> > > > > connecting to the external
> > > > >     * system.
> > > > >     */
> > > > >     void start() throws Exception;
> > > > >
> > > > >     /** Closes this shuffle master which releases all resources. */
> > > > >     void close() throws Exception;
> > > > >
> > > > >     /** Registers the target job to this shuffle master. */
> > > > >     void registerJob(JobShuffleContext context);
> > > > >
> > > > >     /** Unregisters the target job from this shuffle master. */
> > > > >     void unregisterJob(JobID jobID);
> > > > >
> > > > >     /** Asynchronously register a partition and its producer with
> the
> > > > > shuffle service. */
> > > > >     CompletableFuture<T> registerPartitionWithProducer(
> > > > >             JobID jobID,
> > > > >             PartitionDescriptor partitionDescriptor,
> > > > >             ProducerDescriptor producerDescriptor);
> > > > >
> > > > >     /** Releases any external resources occupied by the given
> > > partition.
> > > > */
> > > > >     void releasePartitionExternally(ShuffleDescriptor
> > > shuffleDescriptor);
> > > > >
> > > > >     /** Releases the target cluster partitions stored externally if
> > > any.
> > > > */
> > > > >     void releaseDataSetExternally(IntermediateDataSetID dataSetID);
> > > > > }
> > > > >
> > > > > The following  is the JobShuffleContext interface. It will be
> > > implemented
> > > > > by the pluggable shuffle framework itself and can be used by the
> > > shuffle
> > > > > plugin.
> > > > >
> > > > > public interface JobShuffleContext {
> > > > >
> > > > >     /** Gets the corresponding job configuration. */
> > > > >     Configuration getConfiguration();
> > > > >
> > > > >     /** Gets the corresponding {@link JobID}. */
> > > > >     JobID getJobID();
> > > > >
> > > > >     /**
> > > > >     * Stops tracking the target result partitions, which means
> these
> > > > > partitions will be reproduced if used afterwards.
> > > > >     */
> > > > >     CompletableFuture<Void>
> > > > > stopTrackingPartitions(Collection<ResultPartitionID>
> > > > > partitionIDS);
> > > > >
> > > > >     /** Returns information of all partitions being tracked for the
> > > > current
> > > > > job. */
> > > > >     CompletableFuture<List<ResultPartitionDeploymentDescriptor>>
> > > > > listPartitions();
> > > > > }
> > > > >
> > > > > What do you think of these changes? Any feedback is highly
> > appreciated.
> > > > >
> > > >
> > >
> >
>

Re: [DISCUSS] Lifecycle of ShuffleMaster and its Relationship with JobMaster and PartitionTracker

Posted by Zhu Zhu <re...@gmail.com>.
Thanks for starting this discussion.
Here are some of my thoughts regarding the proposal and discussions
above.

*+1 to enable ShuffleMaster to stop track partitions proactively*
In production we have encountered problems that it needs *hours* to
recover from a remote shuffle worker lost problem. Because the lost
finished partitions cannot be detected and reproduced all at once.
This improvement can help to solve this problem.


*+1 to make ShuffleMaster a cluster level component*
This helps to avoid maintain multiple clients and connections to the
same remote shuffle service. It also makes it possible to support
external cluster partitions in the future.

*+1 to enable ShuffleMaster to notify master about its internal *
*non-recoverable error*
The scheduler can keep failing or hang due to ShuffleMaster internal
non-recoverable error. Currently this kind of problem cannot be
auto-recovered and are hard to diagnose.
One question which might be out of the scope is that whether we
should do similar things for ShuffleEnvironment?


*+1 that the abstraction should be able to support different jobs to*
*use different ShuffleServices eventually*
I think the proposal does not conflict with this target.
One idea in my mind is to maintain multiple different ShuffleServices
in the Dispatcher/JobManagerSharedServices and let them be shared
between different jobs. Each job should be configured with a key which
points to a ShuffleService. The key should be used by both the scheduler
and tasks on task managers to select their respective
ShuffleMaster/ShuffleEnvironment. This will need work both on the master
and on the worker. Currently the worker will launch a ShuffleEnvironment
shared between different tasks which can be from different jobs. But only
one single ShuffleEnvironment will be created on each task manager.

Thanks,
Zhu

Yingjie Cao <ke...@gmail.com> 于2021年7月8日周四 上午11:43写道:

> Hi,
>
> Thanks for the reply.
>
> @Guowei
> I agree that we can move forward step by step and start from the most
> important part. Apart from the two points mentioned in your reply,
> initializing and shutting down some external resources gracefully is also
> important which is a reason for the open/close method.
> About the cluster partitions and the ShuffleMasterContext, I agree that we
> can postpone handling the cluster partitions because we need to do more to
> support it, for ShuffleMasterContext, I think we still need it even we do
> not support the cluster partitions in the first step. Currently, the
> shuffle master can only access the cluster configuration, except that, I
> think we also need need the ability of handling the fatal errors occurring
> in the  ShuffleMaster gracefully by propagate the errors to the framework.
> By introducing the ShuffleMasterContext, we can give ShuffleMaster the
> ability to access both the  cluster configuration and the fatal error
> handler. Instead of passing these components directly to the ShuffleMaster,
> a ShuffleMasterContext interface can keep compatibility easily in the
> future. Even we add some new method in the future, we can offer default
> empty implementation in the interface which can keep compatibility.
> About the JobShuffleContext::getConfiguration/listPartitions methods, I
> agree that we can remove them in the first step and we can add them back
> latter. As mentioned above, we can easily keep compatibility based on the
> Context interface.
>
> @Till
> I totally agree that we should support that different jobs use different
> shuffle services and the proposed solution will support this use case
> eventually.
>
> Best,
> Yingjie
>
> Till Rohrmann <tr...@apache.org> 于2021年7月7日周三 下午8:15写道:
>
> > One quick comment: When developing the ShuffleService abstraction we also
> > thought that different jobs might want to use different ShuffleServices
> > depending on their workload (e.g. batch vs. streaming workload). So
> > ideally, the chosen solution here can also support this use case
> > eventually.
> >
> > Cheers,
> > Till
> >
> > On Wed, Jul 7, 2021 at 12:50 PM Guowei Ma <gu...@gmail.com> wrote:
> >
> > > Hi,
> > > Thank Yingjie for initiating this discussion. What I understand that
> the
> > > document[1] actually mainly discusses two issues:
> > > 1. ShuffleMaster should be at the cluster level instead of the job
> level
> > > 2. ShuffleMaster should notify PartitionTracker that some data has been
> > > lost
> > >
> > > Relatively speaking, I think the second problem is more serious.
> Because
> > > for external or remote batch shuffling services, after the machine
> > storing
> > > shuffled data goes offline, PartitionTracker needs to be notified in
> time
> > > to avoid repeated failures of the job. Therefore, it is hoped that when
> > > shuffle data goes offline due to a machine error, ShuffleMaster can
> > notify
> > > the PartitionTracker in time. This requires ShuffleMaster to notify the
> > > PartitionTracker with a handle such as JobShuffleContext.
> > >
> > > So how to pass JobShuffleContext to ShuffleMaster? There are two
> options:
> > > 1. After ShuffleMaster is created, pass JobShuffleContext to
> > ShuffleMaster,
> > > such as ShuffleMaster::register(JobShuffleContext)
> > > 2. Pass ShuffleServiceFactory when creating ShuffleMaster, such as
> > > ShuffleServiceFatroctry.createcreateShuffleMaster(JobShuffleContext).
> > >
> > > Which one to choose is actually related to issue 1. Because if
> > > ShuffleMaster is a cluster level, you should choose option 1,
> otherwise,
> > > choose option 2. I think ShuffleMaster should be at the cluster level,
> > for
> > > example, because we don't need to maintain a ShuffleMaster for each job
> > in
> > > a SessionCluster; in addition, this ShuffleMaster should also be used
> by
> > > RM's PartitionTracker in the future. Therefore, I think Option 1 is
> more
> > > appropriate.
> > >
> > > To sum up, we may give priority to solving problem 2, while taking into
> > > account that ShuffleMaster should be a cluster-level component.
> > Therefore,
> > > I think we could ignore the date ShuffleMasterContext at the beginning;
> > at
> > > the same time, JobShuffleContext::getConfiguration/listPartitions
> should
> > > not be needed.
> > >
> > > [1]
> > >
> > >
> >
> https://docs.google.com/document/d/1_cHoapNbx_fJ7ZNraSqw4ZK1hMRiWWJDITuSZrdMDDs/edit
> > >
> > > Best,
> > > Guowei
> > >
> > >
> > > On Fri, Jun 11, 2021 at 4:15 PM Yingjie Cao <ke...@gmail.com>
> > > wrote:
> > >
> > > > Hi devs,
> > > >
> > > > I'd like to start a discussion about "Lifecycle of ShuffleMaster and
> > its
> > > > Relationship with JobMaster and PartitionTracker". (These are things
> we
> > > > found when moving our external shuffle to the pluggable shuffle
> service
> > > > framework.)
> > > >
> > > > The mail client may fail to display the right format. If so, please
> > refer
> > > > to this document:
> > > >
> > > >
> > >
> >
> https://docs.google.com/document/d/1_cHoapNbx_fJ7ZNraSqw4ZK1hMRiWWJDITuSZrdMDDs/edit?usp=sharing
> > > > .
> > > > Lifecycle of ShuffleMaster
> > > >
> > > > Currently, the lifecycle of ShuffleMaster seems unclear.  The
> > > > ShuffleServiceFactory is loaded for each JobMaster instance and then
> > > > ShuffleServiceFactory#createShuffleMaster will be called to create a
> > > > ShuffleMaser instance. However, the default
> NettyShuffleServiceFactory
> > > > always returns the same ShuffleMaser singleton instance for all jobs.
> > > Based
> > > > on the current implementation, the lifecycle of ShuffleMaster seems
> > open
> > > > and depends on the shuffle plugin themselves. However, at the TM
> side,
> > > > the ShuffleEnvironment
> > > > is a part of the TaskManagerServices whose lifecycle is decoupled
> with
> > > jobs
> > > > which is more like a service. It means there is also an inconsistency
> > > > between the TM side and the JM side.
> > > >
> > > > From my understanding, the reason for this is that the pluggable
> > shuffle
> > > > framework is still not completely finished yet, for example, there
> is a
> > > > follow up umbrella ticket  FLINK-19551
> > > > <https://issues.apache.org/jira/browse/FLINK-19551> for the
> pluggable
> > > > shuffle service framework and in its subtasks, there is one task (
> > > > FLINK-12731 <https://issues.apache.org/jira/browse/FLINK-12731>)
> which
> > > > aims
> > > > to load shuffle plugin with the PluginManager. I think this can solve
> > the
> > > > issue mentioned above. After the corresponding factory  loaded by the
> > > > PluginManager, all ShuffleMaster instances can be stored in a map
> > indexed
> > > > by the corresponding factory class name  which can be shared by all
> > jobs.
> > > > After that, the ShuffleMaster becomes a cluster level service which
> is
> > > > consistent with the ShuffleEnvironment at the TM side.
> > > >
> > > > As a summary, we propose to finish FLINK-12731
> > > > <https://issues.apache.org/jira/browse/FLINK-12731> and make the
> > shuffle
> > > > service a real cluster level service first. Furthermore, we add two
> > > > lifecycle methods to the ShuffleMaster interface, including start and
> > > > close responding
> > > > for initialization (for example, contacting the external system) and
> > > > graceful shutdown (for example, releasing the resources) respectively
> > > > (these methods already exist in the ShuffleEnvironment interface at
> the
> > > TM
> > > > side). What do you think?
> > > > Relationship of ShuffleMaster & JobMaster
> > > >
> > > > Currently, JobMaster holds the reference to the corresponding
> > > ShuffleMaster
> > > > and it can register partitions (allocate ShuffleDescriptor from) to
> > > > ShuffleMaster
> > > > by the registerPartitionWithProducer method. To support use cases
> like
> > > > allocating external resources when a job starts and releasing all
> > > allocated
> > > > resources when a job terminates, we may also need some job level
> > > > initialization and finalization. These job level initialization and
> > > > finalization are also helpful when serving multiple jobs
> > simultaneously.
> > > >
> > > > As a summary,  we propose to add two job level lifecycle methods
> > > > registerJob
> > > > and unregisterJob responding for job level shuffle initialization and
> > > > finalization, for example, releasing all external resources occupied
> by
> > > the
> > > > corresponding job. What do you think?
> > > > Relationship of ShuffleMaster & PartitionTracker
> > > >
> > > > Currently, the JobMasterPartitionTracker can release external result
> > > > partitions through the releasePartitionExternally method of
> > > ShuffleMaster.
> > > > However, the shuffle plugin (ShuffleMaster) may also need the ability
> > of
> > > > stopping  tracking some partitions depending on the status of the
> > > external
> > > > services, for example, if the external storage node which stores some
> > > > partitions crashes, we need to stop tracking all partitions in it to
> > > avoid
> > > > reproducing the lost partitions one by one. By introducing something
> > like
> > > > ShuffleContext which delegates to the partition tracker, this
> > requirement
> > > > can be easily satisfied. Besides, for cluster partitions, we also
> need
> > to
> > > > have the ability to release them.
> > > >
> > > > As a summary, we propose to add a releaseDataSetExternally method to
> > > > the ShuffleMaster
> > > > interface which is responsible for releasing cluster partitions.
> > Besides,
> > > > we propose to add a ShuffleContext which can delegate to the
> > > > PartitionTracker and stop tracking partitions. For the cluster
> > partitions
> > > > and job partitions, two separated ShuffleContext abstracts are
> needed.
> > > > What do you think?
> > > > Interface Change Summary
> > > >
> > > > As discussed in the above sections, we propose to make some interface
> > > > changes around the ShuffleMaster interface. The first change is to
> > > > pass a ShuffleMasterContex
> > > > instance to the ShuffleServiceFactory when creating the ShuffleMaster
> > > just
> > > > like the ShuffleEnvironment creation at the TM side. Changes are
> marked
> > > > with bold texts (the same below).
> > > >
> > > > public interface ShuffleServiceFactory<
> > > >         SD extends ShuffleDescriptor, P extends
> ResultPartitionWriter,
> > G
> > > > extends IndexedInputGate> {
> > > >
> > > >     /**
> > > >     * Factory method to create a specific {@link ShuffleMaster}
> > > > implementation.
> > > >     */
> > > >     ShuffleMaster<SD> createShuffleMaster(ShuffleMasterContext
> > > > shuffleMasterContext);
> > > >
> > > >     /**
> > > >     * Factory method to create a specific local {@link
> > > ShuffleEnvironment}
> > > > implementation.
> > > >     */
> > > >     ShuffleEnvironment<P, G> createShuffleEnvironment(
> > > >             ShuffleEnvironmentContext shuffleEnvironmentContext);
> > > > }
> > > >
> > > > The following  is the ShuffleMasterContext interface. It will be
> > > > implemented by the pluggable shuffle framework itself and can be used
> > by
> > > > the shuffle plugin. A context Interface is more friendly if we want
> to
> > > > extend it in the future.
> > > >
> > > > public interface ShuffleMasterContext {
> > > >
> > > >     /** Gets the cluster configuration. */
> > > >     Configuration getConfiguration();
> > > >
> > > >     /** Handles the fatal error if any. */
> > > >     void onFatalError(Throwable throwable);
> > > >
> > > >     /**
> > > >     * Stops tracking the target dataset (cluster partitions), which
> > means
> > > > these data can not be reused anymore.
> > > >     */
> > > >     CompletableFuture<Void> stopTrackingDataSet(IntermediateDataSetID
> > > > dataSetID);
> > > >
> > > >     /** Returns IDs of all datasets (cluster partitions) being
> tracked
> > by
> > > > this cluster currently. */
> > > >     CompletableFuture<List<IntermediateDataSetID>> listDataSets();
> > > > }
> > > >
> > > > The second part to be enhanced is the ShuffleMaster interface.
> Methods
> > to
> > > > be added include start, close, registerJob, unregisterJob and
> > > > releaseDataSetExternally. In addition, because each ShuffleMaster
> > > instance
> > > > can serve multiple jobs simultaneously, when registering partitions,
> > one
> > > > should also provide the corresponding JobID. The following shows the
> > > > updated ShuffleMaster interface:
> > > >
> > > > public interface ShuffleMaster<T extends ShuffleDescriptor> extends
> > > > AutoCloseable {
> > > >
> > > >     /**
> > > >     * Starts this shuffle master, for example getting the access and
> > > > connecting to the external
> > > >     * system.
> > > >     */
> > > >     void start() throws Exception;
> > > >
> > > >     /** Closes this shuffle master which releases all resources. */
> > > >     void close() throws Exception;
> > > >
> > > >     /** Registers the target job to this shuffle master. */
> > > >     void registerJob(JobShuffleContext context);
> > > >
> > > >     /** Unregisters the target job from this shuffle master. */
> > > >     void unregisterJob(JobID jobID);
> > > >
> > > >     /** Asynchronously register a partition and its producer with the
> > > > shuffle service. */
> > > >     CompletableFuture<T> registerPartitionWithProducer(
> > > >             JobID jobID,
> > > >             PartitionDescriptor partitionDescriptor,
> > > >             ProducerDescriptor producerDescriptor);
> > > >
> > > >     /** Releases any external resources occupied by the given
> > partition.
> > > */
> > > >     void releasePartitionExternally(ShuffleDescriptor
> > shuffleDescriptor);
> > > >
> > > >     /** Releases the target cluster partitions stored externally if
> > any.
> > > */
> > > >     void releaseDataSetExternally(IntermediateDataSetID dataSetID);
> > > > }
> > > >
> > > > The following  is the JobShuffleContext interface. It will be
> > implemented
> > > > by the pluggable shuffle framework itself and can be used by the
> > shuffle
> > > > plugin.
> > > >
> > > > public interface JobShuffleContext {
> > > >
> > > >     /** Gets the corresponding job configuration. */
> > > >     Configuration getConfiguration();
> > > >
> > > >     /** Gets the corresponding {@link JobID}. */
> > > >     JobID getJobID();
> > > >
> > > >     /**
> > > >     * Stops tracking the target result partitions, which means these
> > > > partitions will be reproduced if used afterwards.
> > > >     */
> > > >     CompletableFuture<Void>
> > > > stopTrackingPartitions(Collection<ResultPartitionID>
> > > > partitionIDS);
> > > >
> > > >     /** Returns information of all partitions being tracked for the
> > > current
> > > > job. */
> > > >     CompletableFuture<List<ResultPartitionDeploymentDescriptor>>
> > > > listPartitions();
> > > > }
> > > >
> > > > What do you think of these changes? Any feedback is highly
> appreciated.
> > > >
> > >
> >
>

Re: [DISCUSS] Lifecycle of ShuffleMaster and its Relationship with JobMaster and PartitionTracker

Posted by Yingjie Cao <ke...@gmail.com>.
Hi,

Thanks for the reply.

@Guowei
I agree that we can move forward step by step and start from the most
important part. Apart from the two points mentioned in your reply,
initializing and shutting down some external resources gracefully is also
important which is a reason for the open/close method.
About the cluster partitions and the ShuffleMasterContext, I agree that we
can postpone handling the cluster partitions because we need to do more to
support it, for ShuffleMasterContext, I think we still need it even we do
not support the cluster partitions in the first step. Currently, the
shuffle master can only access the cluster configuration, except that, I
think we also need need the ability of handling the fatal errors occurring
in the  ShuffleMaster gracefully by propagate the errors to the framework.
By introducing the ShuffleMasterContext, we can give ShuffleMaster the
ability to access both the  cluster configuration and the fatal error
handler. Instead of passing these components directly to the ShuffleMaster,
a ShuffleMasterContext interface can keep compatibility easily in the
future. Even we add some new method in the future, we can offer default
empty implementation in the interface which can keep compatibility.
About the JobShuffleContext::getConfiguration/listPartitions methods, I
agree that we can remove them in the first step and we can add them back
latter. As mentioned above, we can easily keep compatibility based on the
Context interface.

@Till
I totally agree that we should support that different jobs use different
shuffle services and the proposed solution will support this use case
eventually.

Best,
Yingjie

Till Rohrmann <tr...@apache.org> 于2021年7月7日周三 下午8:15写道:

> One quick comment: When developing the ShuffleService abstraction we also
> thought that different jobs might want to use different ShuffleServices
> depending on their workload (e.g. batch vs. streaming workload). So
> ideally, the chosen solution here can also support this use case
> eventually.
>
> Cheers,
> Till
>
> On Wed, Jul 7, 2021 at 12:50 PM Guowei Ma <gu...@gmail.com> wrote:
>
> > Hi,
> > Thank Yingjie for initiating this discussion. What I understand that the
> > document[1] actually mainly discusses two issues:
> > 1. ShuffleMaster should be at the cluster level instead of the job level
> > 2. ShuffleMaster should notify PartitionTracker that some data has been
> > lost
> >
> > Relatively speaking, I think the second problem is more serious. Because
> > for external or remote batch shuffling services, after the machine
> storing
> > shuffled data goes offline, PartitionTracker needs to be notified in time
> > to avoid repeated failures of the job. Therefore, it is hoped that when
> > shuffle data goes offline due to a machine error, ShuffleMaster can
> notify
> > the PartitionTracker in time. This requires ShuffleMaster to notify the
> > PartitionTracker with a handle such as JobShuffleContext.
> >
> > So how to pass JobShuffleContext to ShuffleMaster? There are two options:
> > 1. After ShuffleMaster is created, pass JobShuffleContext to
> ShuffleMaster,
> > such as ShuffleMaster::register(JobShuffleContext)
> > 2. Pass ShuffleServiceFactory when creating ShuffleMaster, such as
> > ShuffleServiceFatroctry.createcreateShuffleMaster(JobShuffleContext).
> >
> > Which one to choose is actually related to issue 1. Because if
> > ShuffleMaster is a cluster level, you should choose option 1, otherwise,
> > choose option 2. I think ShuffleMaster should be at the cluster level,
> for
> > example, because we don't need to maintain a ShuffleMaster for each job
> in
> > a SessionCluster; in addition, this ShuffleMaster should also be used by
> > RM's PartitionTracker in the future. Therefore, I think Option 1 is more
> > appropriate.
> >
> > To sum up, we may give priority to solving problem 2, while taking into
> > account that ShuffleMaster should be a cluster-level component.
> Therefore,
> > I think we could ignore the date ShuffleMasterContext at the beginning;
> at
> > the same time, JobShuffleContext::getConfiguration/listPartitions should
> > not be needed.
> >
> > [1]
> >
> >
> https://docs.google.com/document/d/1_cHoapNbx_fJ7ZNraSqw4ZK1hMRiWWJDITuSZrdMDDs/edit
> >
> > Best,
> > Guowei
> >
> >
> > On Fri, Jun 11, 2021 at 4:15 PM Yingjie Cao <ke...@gmail.com>
> > wrote:
> >
> > > Hi devs,
> > >
> > > I'd like to start a discussion about "Lifecycle of ShuffleMaster and
> its
> > > Relationship with JobMaster and PartitionTracker". (These are things we
> > > found when moving our external shuffle to the pluggable shuffle service
> > > framework.)
> > >
> > > The mail client may fail to display the right format. If so, please
> refer
> > > to this document:
> > >
> > >
> >
> https://docs.google.com/document/d/1_cHoapNbx_fJ7ZNraSqw4ZK1hMRiWWJDITuSZrdMDDs/edit?usp=sharing
> > > .
> > > Lifecycle of ShuffleMaster
> > >
> > > Currently, the lifecycle of ShuffleMaster seems unclear.  The
> > > ShuffleServiceFactory is loaded for each JobMaster instance and then
> > > ShuffleServiceFactory#createShuffleMaster will be called to create a
> > > ShuffleMaser instance. However, the default NettyShuffleServiceFactory
> > > always returns the same ShuffleMaser singleton instance for all jobs.
> > Based
> > > on the current implementation, the lifecycle of ShuffleMaster seems
> open
> > > and depends on the shuffle plugin themselves. However, at the TM side,
> > > the ShuffleEnvironment
> > > is a part of the TaskManagerServices whose lifecycle is decoupled with
> > jobs
> > > which is more like a service. It means there is also an inconsistency
> > > between the TM side and the JM side.
> > >
> > > From my understanding, the reason for this is that the pluggable
> shuffle
> > > framework is still not completely finished yet, for example, there is a
> > > follow up umbrella ticket  FLINK-19551
> > > <https://issues.apache.org/jira/browse/FLINK-19551> for the pluggable
> > > shuffle service framework and in its subtasks, there is one task (
> > > FLINK-12731 <https://issues.apache.org/jira/browse/FLINK-12731>) which
> > > aims
> > > to load shuffle plugin with the PluginManager. I think this can solve
> the
> > > issue mentioned above. After the corresponding factory  loaded by the
> > > PluginManager, all ShuffleMaster instances can be stored in a map
> indexed
> > > by the corresponding factory class name  which can be shared by all
> jobs.
> > > After that, the ShuffleMaster becomes a cluster level service which is
> > > consistent with the ShuffleEnvironment at the TM side.
> > >
> > > As a summary, we propose to finish FLINK-12731
> > > <https://issues.apache.org/jira/browse/FLINK-12731> and make the
> shuffle
> > > service a real cluster level service first. Furthermore, we add two
> > > lifecycle methods to the ShuffleMaster interface, including start and
> > > close responding
> > > for initialization (for example, contacting the external system) and
> > > graceful shutdown (for example, releasing the resources) respectively
> > > (these methods already exist in the ShuffleEnvironment interface at the
> > TM
> > > side). What do you think?
> > > Relationship of ShuffleMaster & JobMaster
> > >
> > > Currently, JobMaster holds the reference to the corresponding
> > ShuffleMaster
> > > and it can register partitions (allocate ShuffleDescriptor from) to
> > > ShuffleMaster
> > > by the registerPartitionWithProducer method. To support use cases like
> > > allocating external resources when a job starts and releasing all
> > allocated
> > > resources when a job terminates, we may also need some job level
> > > initialization and finalization. These job level initialization and
> > > finalization are also helpful when serving multiple jobs
> simultaneously.
> > >
> > > As a summary,  we propose to add two job level lifecycle methods
> > > registerJob
> > > and unregisterJob responding for job level shuffle initialization and
> > > finalization, for example, releasing all external resources occupied by
> > the
> > > corresponding job. What do you think?
> > > Relationship of ShuffleMaster & PartitionTracker
> > >
> > > Currently, the JobMasterPartitionTracker can release external result
> > > partitions through the releasePartitionExternally method of
> > ShuffleMaster.
> > > However, the shuffle plugin (ShuffleMaster) may also need the ability
> of
> > > stopping  tracking some partitions depending on the status of the
> > external
> > > services, for example, if the external storage node which stores some
> > > partitions crashes, we need to stop tracking all partitions in it to
> > avoid
> > > reproducing the lost partitions one by one. By introducing something
> like
> > > ShuffleContext which delegates to the partition tracker, this
> requirement
> > > can be easily satisfied. Besides, for cluster partitions, we also need
> to
> > > have the ability to release them.
> > >
> > > As a summary, we propose to add a releaseDataSetExternally method to
> > > the ShuffleMaster
> > > interface which is responsible for releasing cluster partitions.
> Besides,
> > > we propose to add a ShuffleContext which can delegate to the
> > > PartitionTracker and stop tracking partitions. For the cluster
> partitions
> > > and job partitions, two separated ShuffleContext abstracts are needed.
> > > What do you think?
> > > Interface Change Summary
> > >
> > > As discussed in the above sections, we propose to make some interface
> > > changes around the ShuffleMaster interface. The first change is to
> > > pass a ShuffleMasterContex
> > > instance to the ShuffleServiceFactory when creating the ShuffleMaster
> > just
> > > like the ShuffleEnvironment creation at the TM side. Changes are marked
> > > with bold texts (the same below).
> > >
> > > public interface ShuffleServiceFactory<
> > >         SD extends ShuffleDescriptor, P extends ResultPartitionWriter,
> G
> > > extends IndexedInputGate> {
> > >
> > >     /**
> > >     * Factory method to create a specific {@link ShuffleMaster}
> > > implementation.
> > >     */
> > >     ShuffleMaster<SD> createShuffleMaster(ShuffleMasterContext
> > > shuffleMasterContext);
> > >
> > >     /**
> > >     * Factory method to create a specific local {@link
> > ShuffleEnvironment}
> > > implementation.
> > >     */
> > >     ShuffleEnvironment<P, G> createShuffleEnvironment(
> > >             ShuffleEnvironmentContext shuffleEnvironmentContext);
> > > }
> > >
> > > The following  is the ShuffleMasterContext interface. It will be
> > > implemented by the pluggable shuffle framework itself and can be used
> by
> > > the shuffle plugin. A context Interface is more friendly if we want to
> > > extend it in the future.
> > >
> > > public interface ShuffleMasterContext {
> > >
> > >     /** Gets the cluster configuration. */
> > >     Configuration getConfiguration();
> > >
> > >     /** Handles the fatal error if any. */
> > >     void onFatalError(Throwable throwable);
> > >
> > >     /**
> > >     * Stops tracking the target dataset (cluster partitions), which
> means
> > > these data can not be reused anymore.
> > >     */
> > >     CompletableFuture<Void> stopTrackingDataSet(IntermediateDataSetID
> > > dataSetID);
> > >
> > >     /** Returns IDs of all datasets (cluster partitions) being tracked
> by
> > > this cluster currently. */
> > >     CompletableFuture<List<IntermediateDataSetID>> listDataSets();
> > > }
> > >
> > > The second part to be enhanced is the ShuffleMaster interface. Methods
> to
> > > be added include start, close, registerJob, unregisterJob and
> > > releaseDataSetExternally. In addition, because each ShuffleMaster
> > instance
> > > can serve multiple jobs simultaneously, when registering partitions,
> one
> > > should also provide the corresponding JobID. The following shows the
> > > updated ShuffleMaster interface:
> > >
> > > public interface ShuffleMaster<T extends ShuffleDescriptor> extends
> > > AutoCloseable {
> > >
> > >     /**
> > >     * Starts this shuffle master, for example getting the access and
> > > connecting to the external
> > >     * system.
> > >     */
> > >     void start() throws Exception;
> > >
> > >     /** Closes this shuffle master which releases all resources. */
> > >     void close() throws Exception;
> > >
> > >     /** Registers the target job to this shuffle master. */
> > >     void registerJob(JobShuffleContext context);
> > >
> > >     /** Unregisters the target job from this shuffle master. */
> > >     void unregisterJob(JobID jobID);
> > >
> > >     /** Asynchronously register a partition and its producer with the
> > > shuffle service. */
> > >     CompletableFuture<T> registerPartitionWithProducer(
> > >             JobID jobID,
> > >             PartitionDescriptor partitionDescriptor,
> > >             ProducerDescriptor producerDescriptor);
> > >
> > >     /** Releases any external resources occupied by the given
> partition.
> > */
> > >     void releasePartitionExternally(ShuffleDescriptor
> shuffleDescriptor);
> > >
> > >     /** Releases the target cluster partitions stored externally if
> any.
> > */
> > >     void releaseDataSetExternally(IntermediateDataSetID dataSetID);
> > > }
> > >
> > > The following  is the JobShuffleContext interface. It will be
> implemented
> > > by the pluggable shuffle framework itself and can be used by the
> shuffle
> > > plugin.
> > >
> > > public interface JobShuffleContext {
> > >
> > >     /** Gets the corresponding job configuration. */
> > >     Configuration getConfiguration();
> > >
> > >     /** Gets the corresponding {@link JobID}. */
> > >     JobID getJobID();
> > >
> > >     /**
> > >     * Stops tracking the target result partitions, which means these
> > > partitions will be reproduced if used afterwards.
> > >     */
> > >     CompletableFuture<Void>
> > > stopTrackingPartitions(Collection<ResultPartitionID>
> > > partitionIDS);
> > >
> > >     /** Returns information of all partitions being tracked for the
> > current
> > > job. */
> > >     CompletableFuture<List<ResultPartitionDeploymentDescriptor>>
> > > listPartitions();
> > > }
> > >
> > > What do you think of these changes? Any feedback is highly appreciated.
> > >
> >
>

Re: [DISCUSS] Lifecycle of ShuffleMaster and its Relationship with JobMaster and PartitionTracker

Posted by Till Rohrmann <tr...@apache.org>.
One quick comment: When developing the ShuffleService abstraction we also
thought that different jobs might want to use different ShuffleServices
depending on their workload (e.g. batch vs. streaming workload). So
ideally, the chosen solution here can also support this use case eventually.

Cheers,
Till

On Wed, Jul 7, 2021 at 12:50 PM Guowei Ma <gu...@gmail.com> wrote:

> Hi,
> Thank Yingjie for initiating this discussion. What I understand that the
> document[1] actually mainly discusses two issues:
> 1. ShuffleMaster should be at the cluster level instead of the job level
> 2. ShuffleMaster should notify PartitionTracker that some data has been
> lost
>
> Relatively speaking, I think the second problem is more serious. Because
> for external or remote batch shuffling services, after the machine storing
> shuffled data goes offline, PartitionTracker needs to be notified in time
> to avoid repeated failures of the job. Therefore, it is hoped that when
> shuffle data goes offline due to a machine error, ShuffleMaster can notify
> the PartitionTracker in time. This requires ShuffleMaster to notify the
> PartitionTracker with a handle such as JobShuffleContext.
>
> So how to pass JobShuffleContext to ShuffleMaster? There are two options:
> 1. After ShuffleMaster is created, pass JobShuffleContext to ShuffleMaster,
> such as ShuffleMaster::register(JobShuffleContext)
> 2. Pass ShuffleServiceFactory when creating ShuffleMaster, such as
> ShuffleServiceFatroctry.createcreateShuffleMaster(JobShuffleContext).
>
> Which one to choose is actually related to issue 1. Because if
> ShuffleMaster is a cluster level, you should choose option 1, otherwise,
> choose option 2. I think ShuffleMaster should be at the cluster level, for
> example, because we don't need to maintain a ShuffleMaster for each job in
> a SessionCluster; in addition, this ShuffleMaster should also be used by
> RM's PartitionTracker in the future. Therefore, I think Option 1 is more
> appropriate.
>
> To sum up, we may give priority to solving problem 2, while taking into
> account that ShuffleMaster should be a cluster-level component. Therefore,
> I think we could ignore the date ShuffleMasterContext at the beginning; at
> the same time, JobShuffleContext::getConfiguration/listPartitions should
> not be needed.
>
> [1]
>
> https://docs.google.com/document/d/1_cHoapNbx_fJ7ZNraSqw4ZK1hMRiWWJDITuSZrdMDDs/edit
>
> Best,
> Guowei
>
>
> On Fri, Jun 11, 2021 at 4:15 PM Yingjie Cao <ke...@gmail.com>
> wrote:
>
> > Hi devs,
> >
> > I'd like to start a discussion about "Lifecycle of ShuffleMaster and its
> > Relationship with JobMaster and PartitionTracker". (These are things we
> > found when moving our external shuffle to the pluggable shuffle service
> > framework.)
> >
> > The mail client may fail to display the right format. If so, please refer
> > to this document:
> >
> >
> https://docs.google.com/document/d/1_cHoapNbx_fJ7ZNraSqw4ZK1hMRiWWJDITuSZrdMDDs/edit?usp=sharing
> > .
> > Lifecycle of ShuffleMaster
> >
> > Currently, the lifecycle of ShuffleMaster seems unclear.  The
> > ShuffleServiceFactory is loaded for each JobMaster instance and then
> > ShuffleServiceFactory#createShuffleMaster will be called to create a
> > ShuffleMaser instance. However, the default NettyShuffleServiceFactory
> > always returns the same ShuffleMaser singleton instance for all jobs.
> Based
> > on the current implementation, the lifecycle of ShuffleMaster seems open
> > and depends on the shuffle plugin themselves. However, at the TM side,
> > the ShuffleEnvironment
> > is a part of the TaskManagerServices whose lifecycle is decoupled with
> jobs
> > which is more like a service. It means there is also an inconsistency
> > between the TM side and the JM side.
> >
> > From my understanding, the reason for this is that the pluggable shuffle
> > framework is still not completely finished yet, for example, there is a
> > follow up umbrella ticket  FLINK-19551
> > <https://issues.apache.org/jira/browse/FLINK-19551> for the pluggable
> > shuffle service framework and in its subtasks, there is one task (
> > FLINK-12731 <https://issues.apache.org/jira/browse/FLINK-12731>) which
> > aims
> > to load shuffle plugin with the PluginManager. I think this can solve the
> > issue mentioned above. After the corresponding factory  loaded by the
> > PluginManager, all ShuffleMaster instances can be stored in a map indexed
> > by the corresponding factory class name  which can be shared by all jobs.
> > After that, the ShuffleMaster becomes a cluster level service which is
> > consistent with the ShuffleEnvironment at the TM side.
> >
> > As a summary, we propose to finish FLINK-12731
> > <https://issues.apache.org/jira/browse/FLINK-12731> and make the shuffle
> > service a real cluster level service first. Furthermore, we add two
> > lifecycle methods to the ShuffleMaster interface, including start and
> > close responding
> > for initialization (for example, contacting the external system) and
> > graceful shutdown (for example, releasing the resources) respectively
> > (these methods already exist in the ShuffleEnvironment interface at the
> TM
> > side). What do you think?
> > Relationship of ShuffleMaster & JobMaster
> >
> > Currently, JobMaster holds the reference to the corresponding
> ShuffleMaster
> > and it can register partitions (allocate ShuffleDescriptor from) to
> > ShuffleMaster
> > by the registerPartitionWithProducer method. To support use cases like
> > allocating external resources when a job starts and releasing all
> allocated
> > resources when a job terminates, we may also need some job level
> > initialization and finalization. These job level initialization and
> > finalization are also helpful when serving multiple jobs simultaneously.
> >
> > As a summary,  we propose to add two job level lifecycle methods
> > registerJob
> > and unregisterJob responding for job level shuffle initialization and
> > finalization, for example, releasing all external resources occupied by
> the
> > corresponding job. What do you think?
> > Relationship of ShuffleMaster & PartitionTracker
> >
> > Currently, the JobMasterPartitionTracker can release external result
> > partitions through the releasePartitionExternally method of
> ShuffleMaster.
> > However, the shuffle plugin (ShuffleMaster) may also need the ability of
> > stopping  tracking some partitions depending on the status of the
> external
> > services, for example, if the external storage node which stores some
> > partitions crashes, we need to stop tracking all partitions in it to
> avoid
> > reproducing the lost partitions one by one. By introducing something like
> > ShuffleContext which delegates to the partition tracker, this requirement
> > can be easily satisfied. Besides, for cluster partitions, we also need to
> > have the ability to release them.
> >
> > As a summary, we propose to add a releaseDataSetExternally method to
> > the ShuffleMaster
> > interface which is responsible for releasing cluster partitions. Besides,
> > we propose to add a ShuffleContext which can delegate to the
> > PartitionTracker and stop tracking partitions. For the cluster partitions
> > and job partitions, two separated ShuffleContext abstracts are needed.
> > What do you think?
> > Interface Change Summary
> >
> > As discussed in the above sections, we propose to make some interface
> > changes around the ShuffleMaster interface. The first change is to
> > pass a ShuffleMasterContex
> > instance to the ShuffleServiceFactory when creating the ShuffleMaster
> just
> > like the ShuffleEnvironment creation at the TM side. Changes are marked
> > with bold texts (the same below).
> >
> > public interface ShuffleServiceFactory<
> >         SD extends ShuffleDescriptor, P extends ResultPartitionWriter, G
> > extends IndexedInputGate> {
> >
> >     /**
> >     * Factory method to create a specific {@link ShuffleMaster}
> > implementation.
> >     */
> >     ShuffleMaster<SD> createShuffleMaster(ShuffleMasterContext
> > shuffleMasterContext);
> >
> >     /**
> >     * Factory method to create a specific local {@link
> ShuffleEnvironment}
> > implementation.
> >     */
> >     ShuffleEnvironment<P, G> createShuffleEnvironment(
> >             ShuffleEnvironmentContext shuffleEnvironmentContext);
> > }
> >
> > The following  is the ShuffleMasterContext interface. It will be
> > implemented by the pluggable shuffle framework itself and can be used by
> > the shuffle plugin. A context Interface is more friendly if we want to
> > extend it in the future.
> >
> > public interface ShuffleMasterContext {
> >
> >     /** Gets the cluster configuration. */
> >     Configuration getConfiguration();
> >
> >     /** Handles the fatal error if any. */
> >     void onFatalError(Throwable throwable);
> >
> >     /**
> >     * Stops tracking the target dataset (cluster partitions), which means
> > these data can not be reused anymore.
> >     */
> >     CompletableFuture<Void> stopTrackingDataSet(IntermediateDataSetID
> > dataSetID);
> >
> >     /** Returns IDs of all datasets (cluster partitions) being tracked by
> > this cluster currently. */
> >     CompletableFuture<List<IntermediateDataSetID>> listDataSets();
> > }
> >
> > The second part to be enhanced is the ShuffleMaster interface. Methods to
> > be added include start, close, registerJob, unregisterJob and
> > releaseDataSetExternally. In addition, because each ShuffleMaster
> instance
> > can serve multiple jobs simultaneously, when registering partitions, one
> > should also provide the corresponding JobID. The following shows the
> > updated ShuffleMaster interface:
> >
> > public interface ShuffleMaster<T extends ShuffleDescriptor> extends
> > AutoCloseable {
> >
> >     /**
> >     * Starts this shuffle master, for example getting the access and
> > connecting to the external
> >     * system.
> >     */
> >     void start() throws Exception;
> >
> >     /** Closes this shuffle master which releases all resources. */
> >     void close() throws Exception;
> >
> >     /** Registers the target job to this shuffle master. */
> >     void registerJob(JobShuffleContext context);
> >
> >     /** Unregisters the target job from this shuffle master. */
> >     void unregisterJob(JobID jobID);
> >
> >     /** Asynchronously register a partition and its producer with the
> > shuffle service. */
> >     CompletableFuture<T> registerPartitionWithProducer(
> >             JobID jobID,
> >             PartitionDescriptor partitionDescriptor,
> >             ProducerDescriptor producerDescriptor);
> >
> >     /** Releases any external resources occupied by the given partition.
> */
> >     void releasePartitionExternally(ShuffleDescriptor shuffleDescriptor);
> >
> >     /** Releases the target cluster partitions stored externally if any.
> */
> >     void releaseDataSetExternally(IntermediateDataSetID dataSetID);
> > }
> >
> > The following  is the JobShuffleContext interface. It will be implemented
> > by the pluggable shuffle framework itself and can be used by the shuffle
> > plugin.
> >
> > public interface JobShuffleContext {
> >
> >     /** Gets the corresponding job configuration. */
> >     Configuration getConfiguration();
> >
> >     /** Gets the corresponding {@link JobID}. */
> >     JobID getJobID();
> >
> >     /**
> >     * Stops tracking the target result partitions, which means these
> > partitions will be reproduced if used afterwards.
> >     */
> >     CompletableFuture<Void>
> > stopTrackingPartitions(Collection<ResultPartitionID>
> > partitionIDS);
> >
> >     /** Returns information of all partitions being tracked for the
> current
> > job. */
> >     CompletableFuture<List<ResultPartitionDeploymentDescriptor>>
> > listPartitions();
> > }
> >
> > What do you think of these changes? Any feedback is highly appreciated.
> >
>

Re: [DISCUSS] Lifecycle of ShuffleMaster and its Relationship with JobMaster and PartitionTracker

Posted by Guowei Ma <gu...@gmail.com>.
Hi,
Thank Yingjie for initiating this discussion. What I understand that the
document[1] actually mainly discusses two issues:
1. ShuffleMaster should be at the cluster level instead of the job level
2. ShuffleMaster should notify PartitionTracker that some data has been lost

Relatively speaking, I think the second problem is more serious. Because
for external or remote batch shuffling services, after the machine storing
shuffled data goes offline, PartitionTracker needs to be notified in time
to avoid repeated failures of the job. Therefore, it is hoped that when
shuffle data goes offline due to a machine error, ShuffleMaster can notify
the PartitionTracker in time. This requires ShuffleMaster to notify the
PartitionTracker with a handle such as JobShuffleContext.

So how to pass JobShuffleContext to ShuffleMaster? There are two options:
1. After ShuffleMaster is created, pass JobShuffleContext to ShuffleMaster,
such as ShuffleMaster::register(JobShuffleContext)
2. Pass ShuffleServiceFactory when creating ShuffleMaster, such as
ShuffleServiceFatroctry.createcreateShuffleMaster(JobShuffleContext).

Which one to choose is actually related to issue 1. Because if
ShuffleMaster is a cluster level, you should choose option 1, otherwise,
choose option 2. I think ShuffleMaster should be at the cluster level, for
example, because we don't need to maintain a ShuffleMaster for each job in
a SessionCluster; in addition, this ShuffleMaster should also be used by
RM's PartitionTracker in the future. Therefore, I think Option 1 is more
appropriate.

To sum up, we may give priority to solving problem 2, while taking into
account that ShuffleMaster should be a cluster-level component. Therefore,
I think we could ignore the date ShuffleMasterContext at the beginning; at
the same time, JobShuffleContext::getConfiguration/listPartitions should
not be needed.

[1]
https://docs.google.com/document/d/1_cHoapNbx_fJ7ZNraSqw4ZK1hMRiWWJDITuSZrdMDDs/edit

Best,
Guowei


On Fri, Jun 11, 2021 at 4:15 PM Yingjie Cao <ke...@gmail.com> wrote:

> Hi devs,
>
> I'd like to start a discussion about "Lifecycle of ShuffleMaster and its
> Relationship with JobMaster and PartitionTracker". (These are things we
> found when moving our external shuffle to the pluggable shuffle service
> framework.)
>
> The mail client may fail to display the right format. If so, please refer
> to this document:
>
> https://docs.google.com/document/d/1_cHoapNbx_fJ7ZNraSqw4ZK1hMRiWWJDITuSZrdMDDs/edit?usp=sharing
> .
> Lifecycle of ShuffleMaster
>
> Currently, the lifecycle of ShuffleMaster seems unclear.  The
> ShuffleServiceFactory is loaded for each JobMaster instance and then
> ShuffleServiceFactory#createShuffleMaster will be called to create a
> ShuffleMaser instance. However, the default NettyShuffleServiceFactory
> always returns the same ShuffleMaser singleton instance for all jobs. Based
> on the current implementation, the lifecycle of ShuffleMaster seems open
> and depends on the shuffle plugin themselves. However, at the TM side,
> the ShuffleEnvironment
> is a part of the TaskManagerServices whose lifecycle is decoupled with jobs
> which is more like a service. It means there is also an inconsistency
> between the TM side and the JM side.
>
> From my understanding, the reason for this is that the pluggable shuffle
> framework is still not completely finished yet, for example, there is a
> follow up umbrella ticket  FLINK-19551
> <https://issues.apache.org/jira/browse/FLINK-19551> for the pluggable
> shuffle service framework and in its subtasks, there is one task (
> FLINK-12731 <https://issues.apache.org/jira/browse/FLINK-12731>) which
> aims
> to load shuffle plugin with the PluginManager. I think this can solve the
> issue mentioned above. After the corresponding factory  loaded by the
> PluginManager, all ShuffleMaster instances can be stored in a map indexed
> by the corresponding factory class name  which can be shared by all jobs.
> After that, the ShuffleMaster becomes a cluster level service which is
> consistent with the ShuffleEnvironment at the TM side.
>
> As a summary, we propose to finish FLINK-12731
> <https://issues.apache.org/jira/browse/FLINK-12731> and make the shuffle
> service a real cluster level service first. Furthermore, we add two
> lifecycle methods to the ShuffleMaster interface, including start and
> close responding
> for initialization (for example, contacting the external system) and
> graceful shutdown (for example, releasing the resources) respectively
> (these methods already exist in the ShuffleEnvironment interface at the TM
> side). What do you think?
> Relationship of ShuffleMaster & JobMaster
>
> Currently, JobMaster holds the reference to the corresponding ShuffleMaster
> and it can register partitions (allocate ShuffleDescriptor from) to
> ShuffleMaster
> by the registerPartitionWithProducer method. To support use cases like
> allocating external resources when a job starts and releasing all allocated
> resources when a job terminates, we may also need some job level
> initialization and finalization. These job level initialization and
> finalization are also helpful when serving multiple jobs simultaneously.
>
> As a summary,  we propose to add two job level lifecycle methods
> registerJob
> and unregisterJob responding for job level shuffle initialization and
> finalization, for example, releasing all external resources occupied by the
> corresponding job. What do you think?
> Relationship of ShuffleMaster & PartitionTracker
>
> Currently, the JobMasterPartitionTracker can release external result
> partitions through the releasePartitionExternally method of ShuffleMaster.
> However, the shuffle plugin (ShuffleMaster) may also need the ability of
> stopping  tracking some partitions depending on the status of the external
> services, for example, if the external storage node which stores some
> partitions crashes, we need to stop tracking all partitions in it to avoid
> reproducing the lost partitions one by one. By introducing something like
> ShuffleContext which delegates to the partition tracker, this requirement
> can be easily satisfied. Besides, for cluster partitions, we also need to
> have the ability to release them.
>
> As a summary, we propose to add a releaseDataSetExternally method to
> the ShuffleMaster
> interface which is responsible for releasing cluster partitions. Besides,
> we propose to add a ShuffleContext which can delegate to the
> PartitionTracker and stop tracking partitions. For the cluster partitions
> and job partitions, two separated ShuffleContext abstracts are needed.
> What do you think?
> Interface Change Summary
>
> As discussed in the above sections, we propose to make some interface
> changes around the ShuffleMaster interface. The first change is to
> pass a ShuffleMasterContex
> instance to the ShuffleServiceFactory when creating the ShuffleMaster just
> like the ShuffleEnvironment creation at the TM side. Changes are marked
> with bold texts (the same below).
>
> public interface ShuffleServiceFactory<
>         SD extends ShuffleDescriptor, P extends ResultPartitionWriter, G
> extends IndexedInputGate> {
>
>     /**
>     * Factory method to create a specific {@link ShuffleMaster}
> implementation.
>     */
>     ShuffleMaster<SD> createShuffleMaster(ShuffleMasterContext
> shuffleMasterContext);
>
>     /**
>     * Factory method to create a specific local {@link ShuffleEnvironment}
> implementation.
>     */
>     ShuffleEnvironment<P, G> createShuffleEnvironment(
>             ShuffleEnvironmentContext shuffleEnvironmentContext);
> }
>
> The following  is the ShuffleMasterContext interface. It will be
> implemented by the pluggable shuffle framework itself and can be used by
> the shuffle plugin. A context Interface is more friendly if we want to
> extend it in the future.
>
> public interface ShuffleMasterContext {
>
>     /** Gets the cluster configuration. */
>     Configuration getConfiguration();
>
>     /** Handles the fatal error if any. */
>     void onFatalError(Throwable throwable);
>
>     /**
>     * Stops tracking the target dataset (cluster partitions), which means
> these data can not be reused anymore.
>     */
>     CompletableFuture<Void> stopTrackingDataSet(IntermediateDataSetID
> dataSetID);
>
>     /** Returns IDs of all datasets (cluster partitions) being tracked by
> this cluster currently. */
>     CompletableFuture<List<IntermediateDataSetID>> listDataSets();
> }
>
> The second part to be enhanced is the ShuffleMaster interface. Methods to
> be added include start, close, registerJob, unregisterJob and
> releaseDataSetExternally. In addition, because each ShuffleMaster instance
> can serve multiple jobs simultaneously, when registering partitions, one
> should also provide the corresponding JobID. The following shows the
> updated ShuffleMaster interface:
>
> public interface ShuffleMaster<T extends ShuffleDescriptor> extends
> AutoCloseable {
>
>     /**
>     * Starts this shuffle master, for example getting the access and
> connecting to the external
>     * system.
>     */
>     void start() throws Exception;
>
>     /** Closes this shuffle master which releases all resources. */
>     void close() throws Exception;
>
>     /** Registers the target job to this shuffle master. */
>     void registerJob(JobShuffleContext context);
>
>     /** Unregisters the target job from this shuffle master. */
>     void unregisterJob(JobID jobID);
>
>     /** Asynchronously register a partition and its producer with the
> shuffle service. */
>     CompletableFuture<T> registerPartitionWithProducer(
>             JobID jobID,
>             PartitionDescriptor partitionDescriptor,
>             ProducerDescriptor producerDescriptor);
>
>     /** Releases any external resources occupied by the given partition. */
>     void releasePartitionExternally(ShuffleDescriptor shuffleDescriptor);
>
>     /** Releases the target cluster partitions stored externally if any. */
>     void releaseDataSetExternally(IntermediateDataSetID dataSetID);
> }
>
> The following  is the JobShuffleContext interface. It will be implemented
> by the pluggable shuffle framework itself and can be used by the shuffle
> plugin.
>
> public interface JobShuffleContext {
>
>     /** Gets the corresponding job configuration. */
>     Configuration getConfiguration();
>
>     /** Gets the corresponding {@link JobID}. */
>     JobID getJobID();
>
>     /**
>     * Stops tracking the target result partitions, which means these
> partitions will be reproduced if used afterwards.
>     */
>     CompletableFuture<Void>
> stopTrackingPartitions(Collection<ResultPartitionID>
> partitionIDS);
>
>     /** Returns information of all partitions being tracked for the current
> job. */
>     CompletableFuture<List<ResultPartitionDeploymentDescriptor>>
> listPartitions();
> }
>
> What do you think of these changes? Any feedback is highly appreciated.
>