You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Rex Fenley <Re...@remind101.com> on 2020/12/12 18:54:17 UTC

Optimizing for super long checkpoint times

Hi,

We're running a job with on the order of >100GiB of state. For our initial
run we wanted to keep things simple, so we allocated a single core node
with 1 Taskmanager and 1 parallelism and 1 TiB storage (split between 4
disks on that machine). Overall, things are actually moving pretty
smoothly, except for checkpointing. Checkpoints are set to be incremental,
yet they're all in the range of 10-20 GiB -- we do have a lot of data being
updated in real-time, retracts+appends -- and they take around 10-30 min.
We have the Taskmanager to set to checkpoint every 5 min which means we're
spending the majority of our time just checkpointing.

My question is, what sort of bottlenecks should we be investigating and
what are some things we can try to improve our checkpoint times?

Some things we're considering are:
Increasing parallelism, hoping that this will partition the data and each
operator will therefore checkpoint faster.
Changing time between checkpoints, though we don't have a good
understanding of how this might affect total time.

Also, we are hesitant to use unaligned checkpointing at the moment and are
hoping for some other options.

Thanks!

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>

Re: Optimizing for super long checkpoint times

Posted by Rex Fenley <Re...@remind101.com>.
Ah wow, we're using 1.11.2 so I'll try out using the UI. If for whatever
reason that doesn't work, I really appreciate you including some code to
work off of.

Thanks!

On Mon, Dec 14, 2020 at 10:34 AM Jaffe, Julian <Ju...@activision.com>
wrote:

> If you’re using Flink 1.11 or later
> <https://issues.apache.org/jira/browse/FLINK-14816>, you should be able
> to take thread dumps via the UI. If you have access to the machines that
> are running the actual Flink processes, you can use jcmd or the other
> various java CLI tools. If you’re using an earlier version of Flink and
> don’t have the necessary permissions on the physical hosts, you’ll have to
> add a little code to do this manually. Sample code cobbled together from
> stack overflow:
>
>     /**
>
>      * Print a thread dump and per-thread CPU usage information to the
> task manager log.
>
>      */
>
>     private void takeThreadDump() {
>
>         StringBuffer threadDump = new StringBuffer(System.lineSeparator());
>
>         long id = Thread.currentThread().getId();
>
>         threadDump.append(
>
>             String.format("******************************** Thread Dump in
> thread %d:\n\n", id));
>
>         ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
>
>         Map<Long, Long> usageMap = new HashMap<>();
>
>         for (ThreadInfo threadInfo : threadMXBean.dumpAllThreads(true,
> true)) {
>
>             usageMap.put(
>
>                 threadInfo.getThreadId(),
>
>                 threadMXBean.getThreadCpuTime(threadInfo.getThreadId())
>
>             );
>
>             threadDump.append(threadInfo.toString());
>
>         }
>
>         threadDump.append("\n\n");
>
>         threadDump.append(
>
>             usageMap
>
>                 .entrySet()
>
>                 .stream()
>
>                 .map(e -> e.getKey().toString() + ": " + e.getValue())
>
>                 .collect(Collectors.joining("\n"))
>
>         );
>
>         threadDump.append("\n\n********************************");
>
>         LOG.info(threadDump.toString());
>
>     }
>
>
>
>
>
> Note that you won’t get nids with this approach. If you need more context
> in the stack traces, you’ll have to replace the threadInfo.toString() call
> with a separate function that works deeper into the stack. For instance, to
> bump the hard-coded max frames from 8 to 32:
>
>
>
>     /**
>
>      * Copied from ThreadInfo.toString() but with MAX_FRAMES bumped to 32.
>
>      *
>
>      * @param thread The ThreadInfo object to generate a stack trace for.
>
>      * @return The stack trace for THREAD with up to 32 frames included.
>
>      */
>
>     private String longThreadStackTraceString(ThreadInfo thread) {
>
>         StringBuilder sb = new StringBuilder("\"" + thread.getThreadName()
> + "\"" +
>
>             " Id=" + thread.getThreadId() + " " +
>
>             thread.getThreadState());
>
>         if (thread.getLockName() != null) {
>
>             sb.append(" on " + thread.getLockName());
>
>         }
>
>         if (thread.getLockOwnerName() != null) {
>
>             sb.append(" owned by \"" + thread.getLockOwnerName() +
>
>                 "\" Id=" + thread.getLockOwnerId());
>
>         }
>
>         if (thread.isSuspended()) {
>
>             sb.append(" (suspended)");
>
>         }
>
>         if (thread.isInNative()) {
>
>             sb.append(" (in native)");
>
>         }
>
>         sb.append('\n');
>
>         int i = 0;
>
>         StackTraceElement[] stackTrace = thread.getStackTrace();
>
>         for (; i < stackTrace.length && i < 32; i++) {
>
>             StackTraceElement ste = stackTrace[i];
>
>             sb.append("\tat " + ste.toString());
>
>             sb.append('\n');
>
>             if (i == 0 && thread.getLockInfo() != null) {
>
>                 Thread.State ts = thread.getThreadState();
>
>                 switch (ts) {
>
>                     case BLOCKED:
>
>                         sb.append("\t-  blocked on " +
> thread.getLockInfo());
>
>                         sb.append('\n');
>
>                         break;
>
>                     case WAITING:
>
>                     case TIMED_WAITING:
>
>                         sb.append("\t-  waiting on " +
> thread.getLockInfo());
>
>                         sb.append('\n');
>
>                         break;
>
>                     default:
>
>                 }
>
>             }
>
>
>
>             for (MonitorInfo mi : thread.getLockedMonitors()) {
>
>                 if (mi.getLockedStackDepth() == i) {
>
>                     sb.append("\t-  locked " + mi);
>
>                     sb.append('\n');
>
>                 }
>
>             }
>
>         }
>
>         if (i < stackTrace.length) {
>
>             sb.append("\t...");
>
>             sb.append('\n');
>
>         }
>
>
>
>         LockInfo[] locks = thread.getLockedSynchronizers();
>
>         if (locks.length > 0) {
>
>             sb.append("\n\tNumber of locked synchronizers = " +
> locks.length);
>
>             sb.append('\n');
>
>             for (LockInfo li : locks) {
>
>                 sb.append("\t- " + li);
>
>                 sb.append('\n');
>
>             }
>
>         }
>
>         sb.append('\n');
>
>         return sb.toString();
>
>     }
>
>
>
> Then you can use longThreadStackTraceString(threadInfo) instead of
> threadInfo.toString().
>
>
>
>
>
> If you have some smart way to trigger the thread dumps (e.g. you have a
> control channel in your flink app or you have some way of detecting a
> long-running checkpoint), use that approach to capture the thread dump.
> Otherwise, just set up a new scheduled thread pool and call
> takeThreadDump() every n minutes. Make sure to take the thread dump on
> whichever manager group is actually doing the work (likely the task
> managers)!
>
>
>
> Julian
>
>
>
> *From: *Rex Fenley <Re...@remind101.com>
> *Date: *Sunday, December 13, 2020 at 11:24 AM
> *To: *Steven Wu <st...@gmail.com>
> *Cc: *user <us...@flink.apache.org>, Brad Davis <br...@remind101.com>
> *Subject: *Re: Optimizing for super long checkpoint times
>
>
>
> I like that idea though I'm not sure the best way to go about that. Do you
> have any suggestions?
>
>
>
> On Sun, Dec 13, 2020 at 11:06 AM Steven Wu <st...@gmail.com> wrote:
>
> maybe do a thread dump while the checkpoint is in progress?
>
>
>
> On Sat, Dec 12, 2020 at 5:23 PM Rex Fenley <Re...@remind101.com> wrote:
>
> As in, yes, other than checkpoint latency which is very long, everything
> is healthy with no lag.
>
>
>
> So a few observations, it appears that everything is waiting on 1 join to
> finish during a checkpoint, most operators finish between 50ms and 2min.
>
> The operator:
>
> Join(joinType=[LeftOuterJoin], where=[(id = user_id)], select=[id, uuid,
> first_name, last_name, signature, prefix, birthdate, user_roles, group_ids,
> organization_ids, teacher_organization_ids, admin_organization_ids,
> user_id, owner_teacher_or_admin_archived_group_ids],
> leftInputSpec=[JoinKeyContainsUniqueKey],
> rightInputSpec=[JoinKeyContainsUniqueKey]) -> Calc(select=[id, uuid,
> first_name, last_name, signature, prefix, birthdate, user_roles, group_ids,
> organization_ids, teacher_organization_ids, admin_organization_ids,
> owner_teacher_or_admin_archived_group_ids])
>
>
>
> Also, while everything is waiting on that operator to finish, all our
> resources seem basically idle, and then at the end there's a big spike in
> network bytes out before operators go back to executing. You can see in the
> first graph an initial spike where every operator but 1 presumably does its
> checkpointing, a long idle time, then a final spike, then back to work.
> Other graphs included as well to demonstrate idle time + disk latency.
> Checkpoint starts at 16:56 and ends at 17:13:
>
>
>
>
>
>
> On Sat, Dec 12, 2020 at 11:16 AM Steven Wu <st...@gmail.com> wrote:
>
> > things are actually moving pretty smoothly
>
>
>
> Do you mean the job is otherwise healthy? like there is no lag etc.
>
>
>
> Do you see any bottleneck at system level, like CPU, network, disk I/O
> etc.?
>
>
>
> On Sat, Dec 12, 2020 at 10:54 AM Rex Fenley <Re...@remind101.com> wrote:
>
> Hi,
>
>
>
> We're running a job with on the order of >100GiB of state. For our initial
> run we wanted to keep things simple, so we allocated a single core node
> with 1 Taskmanager and 1 parallelism and 1 TiB storage (split between 4
> disks on that machine). Overall, things are actually moving pretty
> smoothly, except for checkpointing. Checkpoints are set to be incremental,
> yet they're all in the range of 10-20 GiB -- we do have a lot of data being
> updated in real-time, retracts+appends -- and they take around 10-30 min.
> We have the Taskmanager to set to checkpoint every 5 min which means we're
> spending the majority of our time just checkpointing.
>
>
>
> My question is, what sort of bottlenecks should we be investigating and
> what are some things we can try to improve our checkpoint times?
>
>
>
> Some things we're considering are:
>
> Increasing parallelism, hoping that this will partition the data and each
> operator will therefore checkpoint faster.
>
> Changing time between checkpoints, though we don't have a good
> understanding of how this might affect total time.
>
>
>
> Also, we are hesitant to use unaligned checkpointing at the moment and are
> hoping for some other options.
>
>
>
> Thanks!
>
>
> --
>
> *Rex Fenley*  |  Software Engineer - Mobile and Backend
>
>
>
> *Remind.com*
> <https://urldefense.proofpoint.com/v2/url?u=https-3A__www.remind.com_&d=DwMFaQ&c=qE8EibqjfXM-zBfebVhd4gtjNZbrDcrKYXvb1gt38s4&r=zKznthi6OTKpoJID9dIcyiJ28NX59JIQ2bD246nnMac&m=TZcbH10ERZbhxougkJ_Y3l5XP28YNEnG-wLb3ieX4Oo&s=65xPt-fCUB-aoZSt8zbc8Fb0zSrTIKGz6g7ZDmGpiX8&e=> |
>  BLOG
> <https://urldefense.proofpoint.com/v2/url?u=http-3A__blog.remind.com_&d=DwMFaQ&c=qE8EibqjfXM-zBfebVhd4gtjNZbrDcrKYXvb1gt38s4&r=zKznthi6OTKpoJID9dIcyiJ28NX59JIQ2bD246nnMac&m=TZcbH10ERZbhxougkJ_Y3l5XP28YNEnG-wLb3ieX4Oo&s=Ck_zjxY0SnRmZlAIAbpFdfXP8Tq5bYyXoPlUBrLJXQM&e=>
>  |  FOLLOW US
> <https://urldefense.proofpoint.com/v2/url?u=https-3A__twitter.com_remindhq&d=DwMFaQ&c=qE8EibqjfXM-zBfebVhd4gtjNZbrDcrKYXvb1gt38s4&r=zKznthi6OTKpoJID9dIcyiJ28NX59JIQ2bD246nnMac&m=TZcbH10ERZbhxougkJ_Y3l5XP28YNEnG-wLb3ieX4Oo&s=p2gHIMdyhIklp4YQgb6TYQlWWJu-fizjkrRavs1soDE&e=>
>  |  *LIKE US
> <https://urldefense.proofpoint.com/v2/url?u=https-3A__www.facebook.com_remindhq&d=DwMFaQ&c=qE8EibqjfXM-zBfebVhd4gtjNZbrDcrKYXvb1gt38s4&r=zKznthi6OTKpoJID9dIcyiJ28NX59JIQ2bD246nnMac&m=TZcbH10ERZbhxougkJ_Y3l5XP28YNEnG-wLb3ieX4Oo&s=dp0eQ_6t64sA-V7D5VUIeakI7Fo9BhV_fAGaWlUfZXw&e=>*
>
>
>
> --
>
> *Rex Fenley*  |  Software Engineer - Mobile and Backend
>
>
>
> *Remind.com*
> <https://urldefense.proofpoint.com/v2/url?u=https-3A__www.remind.com_&d=DwMFaQ&c=qE8EibqjfXM-zBfebVhd4gtjNZbrDcrKYXvb1gt38s4&r=zKznthi6OTKpoJID9dIcyiJ28NX59JIQ2bD246nnMac&m=TZcbH10ERZbhxougkJ_Y3l5XP28YNEnG-wLb3ieX4Oo&s=65xPt-fCUB-aoZSt8zbc8Fb0zSrTIKGz6g7ZDmGpiX8&e=> |
>  BLOG
> <https://urldefense.proofpoint.com/v2/url?u=http-3A__blog.remind.com_&d=DwMFaQ&c=qE8EibqjfXM-zBfebVhd4gtjNZbrDcrKYXvb1gt38s4&r=zKznthi6OTKpoJID9dIcyiJ28NX59JIQ2bD246nnMac&m=TZcbH10ERZbhxougkJ_Y3l5XP28YNEnG-wLb3ieX4Oo&s=Ck_zjxY0SnRmZlAIAbpFdfXP8Tq5bYyXoPlUBrLJXQM&e=>
>  |  FOLLOW US
> <https://urldefense.proofpoint.com/v2/url?u=https-3A__twitter.com_remindhq&d=DwMFaQ&c=qE8EibqjfXM-zBfebVhd4gtjNZbrDcrKYXvb1gt38s4&r=zKznthi6OTKpoJID9dIcyiJ28NX59JIQ2bD246nnMac&m=TZcbH10ERZbhxougkJ_Y3l5XP28YNEnG-wLb3ieX4Oo&s=p2gHIMdyhIklp4YQgb6TYQlWWJu-fizjkrRavs1soDE&e=>
>  |  *LIKE US
> <https://urldefense.proofpoint.com/v2/url?u=https-3A__www.facebook.com_remindhq&d=DwMFaQ&c=qE8EibqjfXM-zBfebVhd4gtjNZbrDcrKYXvb1gt38s4&r=zKznthi6OTKpoJID9dIcyiJ28NX59JIQ2bD246nnMac&m=TZcbH10ERZbhxougkJ_Y3l5XP28YNEnG-wLb3ieX4Oo&s=dp0eQ_6t64sA-V7D5VUIeakI7Fo9BhV_fAGaWlUfZXw&e=>*
>
>
>
> --
>
> *Rex Fenley*  |  Software Engineer - Mobile and Backend
>
>
>
> *Remind.com*
> <https://urldefense.proofpoint.com/v2/url?u=https-3A__www.remind.com_&d=DwMFaQ&c=qE8EibqjfXM-zBfebVhd4gtjNZbrDcrKYXvb1gt38s4&r=zKznthi6OTKpoJID9dIcyiJ28NX59JIQ2bD246nnMac&m=TZcbH10ERZbhxougkJ_Y3l5XP28YNEnG-wLb3ieX4Oo&s=65xPt-fCUB-aoZSt8zbc8Fb0zSrTIKGz6g7ZDmGpiX8&e=> |
>  BLOG
> <https://urldefense.proofpoint.com/v2/url?u=http-3A__blog.remind.com_&d=DwMFaQ&c=qE8EibqjfXM-zBfebVhd4gtjNZbrDcrKYXvb1gt38s4&r=zKznthi6OTKpoJID9dIcyiJ28NX59JIQ2bD246nnMac&m=TZcbH10ERZbhxougkJ_Y3l5XP28YNEnG-wLb3ieX4Oo&s=Ck_zjxY0SnRmZlAIAbpFdfXP8Tq5bYyXoPlUBrLJXQM&e=>
>  |  FOLLOW US
> <https://urldefense.proofpoint.com/v2/url?u=https-3A__twitter.com_remindhq&d=DwMFaQ&c=qE8EibqjfXM-zBfebVhd4gtjNZbrDcrKYXvb1gt38s4&r=zKznthi6OTKpoJID9dIcyiJ28NX59JIQ2bD246nnMac&m=TZcbH10ERZbhxougkJ_Y3l5XP28YNEnG-wLb3ieX4Oo&s=p2gHIMdyhIklp4YQgb6TYQlWWJu-fizjkrRavs1soDE&e=>
>  |  *LIKE US
> <https://urldefense.proofpoint.com/v2/url?u=https-3A__www.facebook.com_remindhq&d=DwMFaQ&c=qE8EibqjfXM-zBfebVhd4gtjNZbrDcrKYXvb1gt38s4&r=zKznthi6OTKpoJID9dIcyiJ28NX59JIQ2bD246nnMac&m=TZcbH10ERZbhxougkJ_Y3l5XP28YNEnG-wLb3ieX4Oo&s=dp0eQ_6t64sA-V7D5VUIeakI7Fo9BhV_fAGaWlUfZXw&e=>*
>


-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>

Re: Optimizing for super long checkpoint times

Posted by "Jaffe, Julian" <Ju...@activision.com>.
If you’re using Flink 1.11 or later<https://issues.apache.org/jira/browse/FLINK-14816>, you should be able to take thread dumps via the UI. If you have access to the machines that are running the actual Flink processes, you can use jcmd or the other various java CLI tools. If you’re using an earlier version of Flink and don’t have the necessary permissions on the physical hosts, you’ll have to add a little code to do this manually. Sample code cobbled together from stack overflow:

    /**
     * Print a thread dump and per-thread CPU usage information to the task manager log.
     */
    private void takeThreadDump() {
        StringBuffer threadDump = new StringBuffer(System.lineSeparator());
        long id = Thread.currentThread().getId();
        threadDump.append(
            String.format("******************************** Thread Dump in thread %d:\n\n", id));
        ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
        Map<Long, Long> usageMap = new HashMap<>();
        for (ThreadInfo threadInfo : threadMXBean.dumpAllThreads(true, true)) {
            usageMap.put(
                threadInfo.getThreadId(),
                threadMXBean.getThreadCpuTime(threadInfo.getThreadId())
            );
            threadDump.append(threadInfo.toString());
        }
        threadDump.append("\n\n");
        threadDump.append(
            usageMap
                .entrySet()
                .stream()
                .map(e -> e.getKey().toString() + ": " + e.getValue())
                .collect(Collectors.joining("\n"))
        );
        threadDump.append("\n\n********************************");
        LOG.info(threadDump.toString());
    }


Note that you won’t get nids with this approach. If you need more context in the stack traces, you’ll have to replace the threadInfo.toString() call with a separate function that works deeper into the stack. For instance, to bump the hard-coded max frames from 8 to 32:

    /**
     * Copied from ThreadInfo.toString() but with MAX_FRAMES bumped to 32.
     *
     * @param thread The ThreadInfo object to generate a stack trace for.
     * @return The stack trace for THREAD with up to 32 frames included.
     */
    private String longThreadStackTraceString(ThreadInfo thread) {
        StringBuilder sb = new StringBuilder("\"" + thread.getThreadName() + "\"" +
            " Id=" + thread.getThreadId() + " " +
            thread.getThreadState());
        if (thread.getLockName() != null) {
            sb.append(" on " + thread.getLockName());
        }
        if (thread.getLockOwnerName() != null) {
            sb.append(" owned by \"" + thread.getLockOwnerName() +
                "\" Id=" + thread.getLockOwnerId());
        }
        if (thread.isSuspended()) {
            sb.append(" (suspended)");
        }
        if (thread.isInNative()) {
            sb.append(" (in native)");
        }
        sb.append('\n');
        int i = 0;
        StackTraceElement[] stackTrace = thread.getStackTrace();
        for (; i < stackTrace.length && i < 32; i++) {
            StackTraceElement ste = stackTrace[i];
            sb.append("\tat " + ste.toString());
            sb.append('\n');
            if (i == 0 && thread.getLockInfo() != null) {
                Thread.State ts = thread.getThreadState();
                switch (ts) {
                    case BLOCKED:
                        sb.append("\t-  blocked on " + thread.getLockInfo());
                        sb.append('\n');
                        break;
                    case WAITING:
                    case TIMED_WAITING:
                        sb.append("\t-  waiting on " + thread.getLockInfo());
                        sb.append('\n');
                        break;
                    default:
                }
            }

            for (MonitorInfo mi : thread.getLockedMonitors()) {
                if (mi.getLockedStackDepth() == i) {
                    sb.append("\t-  locked " + mi);
                    sb.append('\n');
                }
            }
        }
        if (i < stackTrace.length) {
            sb.append("\t...");
            sb.append('\n');
        }

        LockInfo[] locks = thread.getLockedSynchronizers();
        if (locks.length > 0) {
            sb.append("\n\tNumber of locked synchronizers = " + locks.length);
            sb.append('\n');
            for (LockInfo li : locks) {
                sb.append("\t- " + li);
                sb.append('\n');
            }
        }
        sb.append('\n');
        return sb.toString();
    }

Then you can use longThreadStackTraceString(threadInfo) instead of threadInfo.toString().


If you have some smart way to trigger the thread dumps (e.g. you have a control channel in your flink app or you have some way of detecting a long-running checkpoint), use that approach to capture the thread dump. Otherwise, just set up a new scheduled thread pool and call takeThreadDump() every n minutes. Make sure to take the thread dump on whichever manager group is actually doing the work (likely the task managers)!

Julian

From: Rex Fenley <Re...@remind101.com>
Date: Sunday, December 13, 2020 at 11:24 AM
To: Steven Wu <st...@gmail.com>
Cc: user <us...@flink.apache.org>, Brad Davis <br...@remind101.com>
Subject: Re: Optimizing for super long checkpoint times

I like that idea though I'm not sure the best way to go about that. Do you have any suggestions?

On Sun, Dec 13, 2020 at 11:06 AM Steven Wu <st...@gmail.com>> wrote:
maybe do a thread dump while the checkpoint is in progress?

On Sat, Dec 12, 2020 at 5:23 PM Rex Fenley <Re...@remind101.com>> wrote:
As in, yes, other than checkpoint latency which is very long, everything is healthy with no lag.

So a few observations, it appears that everything is waiting on 1 join to finish during a checkpoint, most operators finish between 50ms and 2min.
The operator:
Join(joinType=[LeftOuterJoin], where=[(id = user_id)], select=[id, uuid, first_name, last_name, signature, prefix, birthdate, user_roles, group_ids, organization_ids, teacher_organization_ids, admin_organization_ids, user_id, owner_teacher_or_admin_archived_group_ids], leftInputSpec=[JoinKeyContainsUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey]) -> Calc(select=[id, uuid, first_name, last_name, signature, prefix, birthdate, user_roles, group_ids, organization_ids, teacher_organization_ids, admin_organization_ids, owner_teacher_or_admin_archived_group_ids])

Also, while everything is waiting on that operator to finish, all our resources seem basically idle, and then at the end there's a big spike in network bytes out before operators go back to executing. You can see in the first graph an initial spike where every operator but 1 presumably does its checkpointing, a long idle time, then a final spike, then back to work. Other graphs included as well to demonstrate idle time + disk latency. Checkpoint starts at 16:56 and ends at 17:13:
[cid:image001.png@01D6D204.A42EF010]
[cid:image002.png@01D6D204.A42EF010]

[cid:image003.png@01D6D204.A42EF010]
[cid:image004.png@01D6D204.A42EF010]

On Sat, Dec 12, 2020 at 11:16 AM Steven Wu <st...@gmail.com>> wrote:
> things are actually moving pretty smoothly

Do you mean the job is otherwise healthy? like there is no lag etc.

Do you see any bottleneck at system level, like CPU, network, disk I/O etc.?

On Sat, Dec 12, 2020 at 10:54 AM Rex Fenley <Re...@remind101.com>> wrote:
Hi,

We're running a job with on the order of >100GiB of state. For our initial run we wanted to keep things simple, so we allocated a single core node with 1 Taskmanager and 1 parallelism and 1 TiB storage (split between 4 disks on that machine). Overall, things are actually moving pretty smoothly, except for checkpointing. Checkpoints are set to be incremental, yet they're all in the range of 10-20 GiB -- we do have a lot of data being updated in real-time, retracts+appends -- and they take around 10-30 min. We have the Taskmanager to set to checkpoint every 5 min which means we're spending the majority of our time just checkpointing.

My question is, what sort of bottlenecks should we be investigating and what are some things we can try to improve our checkpoint times?

Some things we're considering are:
Increasing parallelism, hoping that this will partition the data and each operator will therefore checkpoint faster.
Changing time between checkpoints, though we don't have a good understanding of how this might affect total time.

Also, we are hesitant to use unaligned checkpointing at the moment and are hoping for some other options.

Thanks!

--

Rex Fenley  |  Software Engineer - Mobile and Backend



Remind.com<https://urldefense.proofpoint.com/v2/url?u=https-3A__www.remind.com_&d=DwMFaQ&c=qE8EibqjfXM-zBfebVhd4gtjNZbrDcrKYXvb1gt38s4&r=zKznthi6OTKpoJID9dIcyiJ28NX59JIQ2bD246nnMac&m=TZcbH10ERZbhxougkJ_Y3l5XP28YNEnG-wLb3ieX4Oo&s=65xPt-fCUB-aoZSt8zbc8Fb0zSrTIKGz6g7ZDmGpiX8&e=> |  BLOG<https://urldefense.proofpoint.com/v2/url?u=http-3A__blog.remind.com_&d=DwMFaQ&c=qE8EibqjfXM-zBfebVhd4gtjNZbrDcrKYXvb1gt38s4&r=zKznthi6OTKpoJID9dIcyiJ28NX59JIQ2bD246nnMac&m=TZcbH10ERZbhxougkJ_Y3l5XP28YNEnG-wLb3ieX4Oo&s=Ck_zjxY0SnRmZlAIAbpFdfXP8Tq5bYyXoPlUBrLJXQM&e=>  |  FOLLOW US<https://urldefense.proofpoint.com/v2/url?u=https-3A__twitter.com_remindhq&d=DwMFaQ&c=qE8EibqjfXM-zBfebVhd4gtjNZbrDcrKYXvb1gt38s4&r=zKznthi6OTKpoJID9dIcyiJ28NX59JIQ2bD246nnMac&m=TZcbH10ERZbhxougkJ_Y3l5XP28YNEnG-wLb3ieX4Oo&s=p2gHIMdyhIklp4YQgb6TYQlWWJu-fizjkrRavs1soDE&e=>  |  LIKE US<https://urldefense.proofpoint.com/v2/url?u=https-3A__www.facebook.com_remindhq&d=DwMFaQ&c=qE8EibqjfXM-zBfebVhd4gtjNZbrDcrKYXvb1gt38s4&r=zKznthi6OTKpoJID9dIcyiJ28NX59JIQ2bD246nnMac&m=TZcbH10ERZbhxougkJ_Y3l5XP28YNEnG-wLb3ieX4Oo&s=dp0eQ_6t64sA-V7D5VUIeakI7Fo9BhV_fAGaWlUfZXw&e=>


--

Rex Fenley  |  Software Engineer - Mobile and Backend



Remind.com<https://urldefense.proofpoint.com/v2/url?u=https-3A__www.remind.com_&d=DwMFaQ&c=qE8EibqjfXM-zBfebVhd4gtjNZbrDcrKYXvb1gt38s4&r=zKznthi6OTKpoJID9dIcyiJ28NX59JIQ2bD246nnMac&m=TZcbH10ERZbhxougkJ_Y3l5XP28YNEnG-wLb3ieX4Oo&s=65xPt-fCUB-aoZSt8zbc8Fb0zSrTIKGz6g7ZDmGpiX8&e=> |  BLOG<https://urldefense.proofpoint.com/v2/url?u=http-3A__blog.remind.com_&d=DwMFaQ&c=qE8EibqjfXM-zBfebVhd4gtjNZbrDcrKYXvb1gt38s4&r=zKznthi6OTKpoJID9dIcyiJ28NX59JIQ2bD246nnMac&m=TZcbH10ERZbhxougkJ_Y3l5XP28YNEnG-wLb3ieX4Oo&s=Ck_zjxY0SnRmZlAIAbpFdfXP8Tq5bYyXoPlUBrLJXQM&e=>  |  FOLLOW US<https://urldefense.proofpoint.com/v2/url?u=https-3A__twitter.com_remindhq&d=DwMFaQ&c=qE8EibqjfXM-zBfebVhd4gtjNZbrDcrKYXvb1gt38s4&r=zKznthi6OTKpoJID9dIcyiJ28NX59JIQ2bD246nnMac&m=TZcbH10ERZbhxougkJ_Y3l5XP28YNEnG-wLb3ieX4Oo&s=p2gHIMdyhIklp4YQgb6TYQlWWJu-fizjkrRavs1soDE&e=>  |  LIKE US<https://urldefense.proofpoint.com/v2/url?u=https-3A__www.facebook.com_remindhq&d=DwMFaQ&c=qE8EibqjfXM-zBfebVhd4gtjNZbrDcrKYXvb1gt38s4&r=zKznthi6OTKpoJID9dIcyiJ28NX59JIQ2bD246nnMac&m=TZcbH10ERZbhxougkJ_Y3l5XP28YNEnG-wLb3ieX4Oo&s=dp0eQ_6t64sA-V7D5VUIeakI7Fo9BhV_fAGaWlUfZXw&e=>


--

Rex Fenley  |  Software Engineer - Mobile and Backend



Remind.com<https://urldefense.proofpoint.com/v2/url?u=https-3A__www.remind.com_&d=DwMFaQ&c=qE8EibqjfXM-zBfebVhd4gtjNZbrDcrKYXvb1gt38s4&r=zKznthi6OTKpoJID9dIcyiJ28NX59JIQ2bD246nnMac&m=TZcbH10ERZbhxougkJ_Y3l5XP28YNEnG-wLb3ieX4Oo&s=65xPt-fCUB-aoZSt8zbc8Fb0zSrTIKGz6g7ZDmGpiX8&e=> |  BLOG<https://urldefense.proofpoint.com/v2/url?u=http-3A__blog.remind.com_&d=DwMFaQ&c=qE8EibqjfXM-zBfebVhd4gtjNZbrDcrKYXvb1gt38s4&r=zKznthi6OTKpoJID9dIcyiJ28NX59JIQ2bD246nnMac&m=TZcbH10ERZbhxougkJ_Y3l5XP28YNEnG-wLb3ieX4Oo&s=Ck_zjxY0SnRmZlAIAbpFdfXP8Tq5bYyXoPlUBrLJXQM&e=>  |  FOLLOW US<https://urldefense.proofpoint.com/v2/url?u=https-3A__twitter.com_remindhq&d=DwMFaQ&c=qE8EibqjfXM-zBfebVhd4gtjNZbrDcrKYXvb1gt38s4&r=zKznthi6OTKpoJID9dIcyiJ28NX59JIQ2bD246nnMac&m=TZcbH10ERZbhxougkJ_Y3l5XP28YNEnG-wLb3ieX4Oo&s=p2gHIMdyhIklp4YQgb6TYQlWWJu-fizjkrRavs1soDE&e=>  |  LIKE US<https://urldefense.proofpoint.com/v2/url?u=https-3A__www.facebook.com_remindhq&d=DwMFaQ&c=qE8EibqjfXM-zBfebVhd4gtjNZbrDcrKYXvb1gt38s4&r=zKznthi6OTKpoJID9dIcyiJ28NX59JIQ2bD246nnMac&m=TZcbH10ERZbhxougkJ_Y3l5XP28YNEnG-wLb3ieX4Oo&s=dp0eQ_6t64sA-V7D5VUIeakI7Fo9BhV_fAGaWlUfZXw&e=>

Re: Optimizing for super long checkpoint times

Posted by Rex Fenley <Re...@remind101.com>.
I like that idea though I'm not sure the best way to go about that. Do you
have any suggestions?

On Sun, Dec 13, 2020 at 11:06 AM Steven Wu <st...@gmail.com> wrote:

> maybe do a thread dump while the checkpoint is in progress?
>
> On Sat, Dec 12, 2020 at 5:23 PM Rex Fenley <Re...@remind101.com> wrote:
>
>> As in, yes, other than checkpoint latency which is very long, everything
>> is healthy with no lag.
>>
>> So a few observations, it appears that everything is waiting on 1 join to
>> finish during a checkpoint, most operators finish between 50ms and 2min.
>> The operator:
>> Join(joinType=[LeftOuterJoin], where=[(id = user_id)], select=[id, uuid,
>> first_name, last_name, signature, prefix, birthdate, user_roles, group_ids,
>> organization_ids, teacher_organization_ids, admin_organization_ids,
>> user_id, owner_teacher_or_admin_archived_group_ids],
>> leftInputSpec=[JoinKeyContainsUniqueKey],
>> rightInputSpec=[JoinKeyContainsUniqueKey]) -> Calc(select=[id, uuid,
>> first_name, last_name, signature, prefix, birthdate, user_roles, group_ids,
>> organization_ids, teacher_organization_ids, admin_organization_ids,
>> owner_teacher_or_admin_archived_group_ids])
>>
>> Also, while everything is waiting on that operator to finish, all our
>> resources seem basically idle, and then at the end there's a big spike in
>> network bytes out before operators go back to executing. You can see in the
>> first graph an initial spike where every operator but 1 presumably does its
>> checkpointing, a long idle time, then a final spike, then back to work.
>> Other graphs included as well to demonstrate idle time + disk latency.
>> Checkpoint starts at 16:56 and ends at 17:13:
>> [image: Screen Shot 2020-12-12 at 5.15.36 PM.png]
>> [image: Screen Shot 2020-12-12 at 5.16.00 PM.png]
>>
>> [image: Screen Shot 2020-12-12 at 5.16.34 PM.png]
>>
>> [image: Screen Shot 2020-12-12 at 5.16.07 PM.png]
>>
>> On Sat, Dec 12, 2020 at 11:16 AM Steven Wu <st...@gmail.com> wrote:
>>
>>> > things are actually moving pretty smoothly
>>>
>>> Do you mean the job is otherwise healthy? like there is no lag etc.
>>>
>>> Do you see any bottleneck at system level, like CPU, network, disk I/O
>>> etc.?
>>>
>>> On Sat, Dec 12, 2020 at 10:54 AM Rex Fenley <Re...@remind101.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> We're running a job with on the order of >100GiB of state. For our
>>>> initial run we wanted to keep things simple, so we allocated a single core
>>>> node with 1 Taskmanager and 1 parallelism and 1 TiB storage (split between
>>>> 4 disks on that machine). Overall, things are actually moving pretty
>>>> smoothly, except for checkpointing. Checkpoints are set to be incremental,
>>>> yet they're all in the range of 10-20 GiB -- we do have a lot of data being
>>>> updated in real-time, retracts+appends -- and they take around 10-30 min.
>>>> We have the Taskmanager to set to checkpoint every 5 min which means we're
>>>> spending the majority of our time just checkpointing.
>>>>
>>>> My question is, what sort of bottlenecks should we be investigating and
>>>> what are some things we can try to improve our checkpoint times?
>>>>
>>>> Some things we're considering are:
>>>> Increasing parallelism, hoping that this will partition the data and
>>>> each operator will therefore checkpoint faster.
>>>> Changing time between checkpoints, though we don't have a good
>>>> understanding of how this might affect total time.
>>>>
>>>> Also, we are hesitant to use unaligned checkpointing at the moment and
>>>> are hoping for some other options.
>>>>
>>>> Thanks!
>>>>
>>>> --
>>>>
>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>
>>>>
>>>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>>>> <https://www.facebook.com/remindhq>
>>>>
>>>
>>
>> --
>>
>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>
>>
>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>> <https://www.facebook.com/remindhq>
>>
>

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>

Re: Optimizing for super long checkpoint times

Posted by Steven Wu <st...@gmail.com>.
maybe do a thread dump while the checkpoint is in progress?

On Sat, Dec 12, 2020 at 5:23 PM Rex Fenley <Re...@remind101.com> wrote:

> As in, yes, other than checkpoint latency which is very long, everything
> is healthy with no lag.
>
> So a few observations, it appears that everything is waiting on 1 join to
> finish during a checkpoint, most operators finish between 50ms and 2min.
> The operator:
> Join(joinType=[LeftOuterJoin], where=[(id = user_id)], select=[id, uuid,
> first_name, last_name, signature, prefix, birthdate, user_roles, group_ids,
> organization_ids, teacher_organization_ids, admin_organization_ids,
> user_id, owner_teacher_or_admin_archived_group_ids],
> leftInputSpec=[JoinKeyContainsUniqueKey],
> rightInputSpec=[JoinKeyContainsUniqueKey]) -> Calc(select=[id, uuid,
> first_name, last_name, signature, prefix, birthdate, user_roles, group_ids,
> organization_ids, teacher_organization_ids, admin_organization_ids,
> owner_teacher_or_admin_archived_group_ids])
>
> Also, while everything is waiting on that operator to finish, all our
> resources seem basically idle, and then at the end there's a big spike in
> network bytes out before operators go back to executing. You can see in the
> first graph an initial spike where every operator but 1 presumably does its
> checkpointing, a long idle time, then a final spike, then back to work.
> Other graphs included as well to demonstrate idle time + disk latency.
> Checkpoint starts at 16:56 and ends at 17:13:
> [image: Screen Shot 2020-12-12 at 5.15.36 PM.png]
> [image: Screen Shot 2020-12-12 at 5.16.00 PM.png]
>
> [image: Screen Shot 2020-12-12 at 5.16.34 PM.png]
>
> [image: Screen Shot 2020-12-12 at 5.16.07 PM.png]
>
> On Sat, Dec 12, 2020 at 11:16 AM Steven Wu <st...@gmail.com> wrote:
>
>> > things are actually moving pretty smoothly
>>
>> Do you mean the job is otherwise healthy? like there is no lag etc.
>>
>> Do you see any bottleneck at system level, like CPU, network, disk I/O
>> etc.?
>>
>> On Sat, Dec 12, 2020 at 10:54 AM Rex Fenley <Re...@remind101.com> wrote:
>>
>>> Hi,
>>>
>>> We're running a job with on the order of >100GiB of state. For our
>>> initial run we wanted to keep things simple, so we allocated a single core
>>> node with 1 Taskmanager and 1 parallelism and 1 TiB storage (split between
>>> 4 disks on that machine). Overall, things are actually moving pretty
>>> smoothly, except for checkpointing. Checkpoints are set to be incremental,
>>> yet they're all in the range of 10-20 GiB -- we do have a lot of data being
>>> updated in real-time, retracts+appends -- and they take around 10-30 min.
>>> We have the Taskmanager to set to checkpoint every 5 min which means we're
>>> spending the majority of our time just checkpointing.
>>>
>>> My question is, what sort of bottlenecks should we be investigating and
>>> what are some things we can try to improve our checkpoint times?
>>>
>>> Some things we're considering are:
>>> Increasing parallelism, hoping that this will partition the data and
>>> each operator will therefore checkpoint faster.
>>> Changing time between checkpoints, though we don't have a good
>>> understanding of how this might affect total time.
>>>
>>> Also, we are hesitant to use unaligned checkpointing at the moment and
>>> are hoping for some other options.
>>>
>>> Thanks!
>>>
>>> --
>>>
>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>
>>>
>>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>>> <https://www.facebook.com/remindhq>
>>>
>>
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>  |
>  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
> <https://www.facebook.com/remindhq>
>

Re: Optimizing for super long checkpoint times

Posted by Rex Fenley <Re...@remind101.com>.
As in, yes, other than checkpoint latency which is very long, everything is
healthy with no lag.

So a few observations, it appears that everything is waiting on 1 join to
finish during a checkpoint, most operators finish between 50ms and 2min.
The operator:
Join(joinType=[LeftOuterJoin], where=[(id = user_id)], select=[id, uuid,
first_name, last_name, signature, prefix, birthdate, user_roles, group_ids,
organization_ids, teacher_organization_ids, admin_organization_ids,
user_id, owner_teacher_or_admin_archived_group_ids],
leftInputSpec=[JoinKeyContainsUniqueKey],
rightInputSpec=[JoinKeyContainsUniqueKey]) -> Calc(select=[id, uuid,
first_name, last_name, signature, prefix, birthdate, user_roles, group_ids,
organization_ids, teacher_organization_ids, admin_organization_ids,
owner_teacher_or_admin_archived_group_ids])

Also, while everything is waiting on that operator to finish, all our
resources seem basically idle, and then at the end there's a big spike in
network bytes out before operators go back to executing. You can see in the
first graph an initial spike where every operator but 1 presumably does its
checkpointing, a long idle time, then a final spike, then back to work.
Other graphs included as well to demonstrate idle time + disk latency.
Checkpoint starts at 16:56 and ends at 17:13:
[image: Screen Shot 2020-12-12 at 5.15.36 PM.png]
[image: Screen Shot 2020-12-12 at 5.16.00 PM.png]

[image: Screen Shot 2020-12-12 at 5.16.34 PM.png]

[image: Screen Shot 2020-12-12 at 5.16.07 PM.png]

On Sat, Dec 12, 2020 at 11:16 AM Steven Wu <st...@gmail.com> wrote:

> > things are actually moving pretty smoothly
>
> Do you mean the job is otherwise healthy? like there is no lag etc.
>
> Do you see any bottleneck at system level, like CPU, network, disk I/O
> etc.?
>
> On Sat, Dec 12, 2020 at 10:54 AM Rex Fenley <Re...@remind101.com> wrote:
>
>> Hi,
>>
>> We're running a job with on the order of >100GiB of state. For our
>> initial run we wanted to keep things simple, so we allocated a single core
>> node with 1 Taskmanager and 1 parallelism and 1 TiB storage (split between
>> 4 disks on that machine). Overall, things are actually moving pretty
>> smoothly, except for checkpointing. Checkpoints are set to be incremental,
>> yet they're all in the range of 10-20 GiB -- we do have a lot of data being
>> updated in real-time, retracts+appends -- and they take around 10-30 min.
>> We have the Taskmanager to set to checkpoint every 5 min which means we're
>> spending the majority of our time just checkpointing.
>>
>> My question is, what sort of bottlenecks should we be investigating and
>> what are some things we can try to improve our checkpoint times?
>>
>> Some things we're considering are:
>> Increasing parallelism, hoping that this will partition the data and each
>> operator will therefore checkpoint faster.
>> Changing time between checkpoints, though we don't have a good
>> understanding of how this might affect total time.
>>
>> Also, we are hesitant to use unaligned checkpointing at the moment and
>> are hoping for some other options.
>>
>> Thanks!
>>
>> --
>>
>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>
>>
>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>> <https://www.facebook.com/remindhq>
>>
>

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>

Re: Optimizing for super long checkpoint times

Posted by Steven Wu <st...@gmail.com>.
> things are actually moving pretty smoothly

Do you mean the job is otherwise healthy? like there is no lag etc.

Do you see any bottleneck at system level, like CPU, network, disk I/O etc.?

On Sat, Dec 12, 2020 at 10:54 AM Rex Fenley <Re...@remind101.com> wrote:

> Hi,
>
> We're running a job with on the order of >100GiB of state. For our initial
> run we wanted to keep things simple, so we allocated a single core node
> with 1 Taskmanager and 1 parallelism and 1 TiB storage (split between 4
> disks on that machine). Overall, things are actually moving pretty
> smoothly, except for checkpointing. Checkpoints are set to be incremental,
> yet they're all in the range of 10-20 GiB -- we do have a lot of data being
> updated in real-time, retracts+appends -- and they take around 10-30 min.
> We have the Taskmanager to set to checkpoint every 5 min which means we're
> spending the majority of our time just checkpointing.
>
> My question is, what sort of bottlenecks should we be investigating and
> what are some things we can try to improve our checkpoint times?
>
> Some things we're considering are:
> Increasing parallelism, hoping that this will partition the data and each
> operator will therefore checkpoint faster.
> Changing time between checkpoints, though we don't have a good
> understanding of how this might affect total time.
>
> Also, we are hesitant to use unaligned checkpointing at the moment and are
> hoping for some other options.
>
> Thanks!
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>  |
>  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
> <https://www.facebook.com/remindhq>
>