You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Jakub Danilewicz <jd...@alto-analytics.com> on 2019/10/24 17:36:51 UTC

Flink 1.5+ performance in a Java standalone environment

Hi,

I have recently tried to upgrade Flink from 1.2.0 to the newest version and
noticed that starting from the version 1.5 the performance is much worse
when processing fixed graphs in a standalone JVM environment (Java 8).

This affects all the use-cases when a Gelly graph (pre-built from a fixed
collection of nodes/edges) gets processed by any of our custom algorithms
(VertexCentric, ScatterGather or GSA), especially when using parallel
processing for a local ExecutionEnvironment. The processing times (compared
to the versions <= 1.4.2) double/triple, while CPU and memory consumption
increase significantly.

Are there any fine-tuning steps/tricks for the job processing engine behind
Flink 1.5+ that would improve the performance in the scenarios described
above?

Best,

Jakub

Re: Flink 1.5+ performance in a Java standalone environment

Posted by Jakub Danilewicz <jd...@alto-analytics.com>.
Thanks for your reply, Till.

As mentioned above I execute graph processing in a straight-ahead Java standalone environment (no cluster underneath, no specific configuration except for parallelism), just as if you simply ran the Java class I pasted upthread with a Flink distribution JAR (plus Gelly and Slf4j/Log4j JARs) on its classpath. 

I do not know what goes on behind the scenes, but the "legacy" mode significantly outperforms the "new" one in every single case. The new mode is a few times slower, getting worse and worse with the increasing size of the graph.

As for setting "the maximum parallelism (== number of key groups) to a multiple of your parallelism", could you tell me which configuration option from the list below is it?

https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/config.html 

Best,

Jakub


On 2019/11/01 10:19:47, Till Rohrmann <tr...@apache.org> wrote: 
> Hi Jakub,
> 
> what are the cluster settings and the exact job settings you are running
> your job with? I'm asking because one difference between legacy and FLIP-6
> mode is that the legacy mode spreads out tasks across all available
> TaskManagers whereas the FLIP-6 mode tries to bin package them on as few
> TaskManagers as possible. If you have more slots than the parallelism of
> your job, then I could see how this could affect the performance of your
> job if it is not I/O bound but CPU bound. We will add an option to enable
> the old spread out strategy again [1].
> 
> Another reason why you might see a performance degradation is the placement
> of key groups. In the legacy mode, Flink distributed them so that two
> TaskManagers with the same number of tasks would only have at most one key
> group more. In FLIP-6 it can be up to the number of slots more key groups
> on one of the TaskManagers. In order to mitigate this problem I would
> recommend to set the maximum parallelism (== number of key groups) to a
> multiple of your parallelism.
> 
> [1] https://issues.apache.org/jira/browse/FLINK-12122
> 
> Cheers,
> Till
> 
> On Wed, Oct 30, 2019 at 4:28 PM Jakub Danilewicz <
> jdanilewicz@alto-analytics.com> wrote:
> 
> > Hi,
> >
> > I can confirm that the performance drop is directly related to FLIP-6
> > changes. Applying this modification to the code posted above restores the
> > previous graph processing speed under Flink 1.5.6:
> >
> > ---------------------------------------------------------------------------
> >
> >     org.apache.flink.configuration.Configuration customConfig = new
> > org.apache.flink.configuration.Configuration();
> >     customConfig.setString("mode", "legacy");
> >     final ExecutionEnvironment env =
> > ExecutionEnvironment.createLocalEnvironment(customConfig);
> >     env.setParallelism(parallelism);
> >
> > ---------------------------------------------------------------------------
> >
> > Disabling the "taskmanager.network.credit-model" parameter in
> > Configuration provides only a very slight improvement in the performance
> > under Flink 1.5.6.
> >
> > Now the big question: what about newer versions where the legacy mode is
> > not supported anymore? I checked Flink 1.8.2 and it does not work.
> >
> > Is there any way to make the new mode as performant as the "legacy" one in
> > the standalone scenarios? Alternatively may we expect improvements in this
> > area in the upcoming releases?
> >
> > Best,
> >
> > Jakub
> >
> > On 2019/10/30 14:11:19, Piotr Nowojski <pi...@ververica.com> wrote:
> > > Hi,
> > >
> > > In Flink 1.5 there were three big changes, that could affect
> > performance.
> > > 1. FLIP-6 changes (As previously Yang and Fabian mentioned)
> > > 2. Credit base flow control (especially if you are using SSL)
> > > 3. Low latency network changes
> > >
> > > I would suspect them in that order. First and second you can disable via
> > configuration switches [1] and [2] respectively.
> > >
> > > [1] “mode:legacy"
> > https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/config.html#core
> > <
> > https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/config.html#core
> > >
> > > [2] "taskmanager.network.credit-model:false”
> > >
> > > Could you try disabling them out?
> > >
> > > Piotrek
> > >
> > > > On 28 Oct 2019, at 14:10, Jakub Danilewicz <
> > jdanilewicz@alto-analytics.com> wrote:
> > > >
> > > > Thanks for your replies.
> > > >
> > > > We use Flink from within a standalone Java 8 application (no Hadoop,
> > no clustering), so it's basically boils down to running a simple code like
> > this:
> > > >
> > > > import java.util.*;
> > > > import org.apache.flink.api.java.ExecutionEnvironment;
> > > > import org.apache.flink.graph.*;
> > > > import org.apache.flink.graph.library.CommunityDetection;
> > > >
> > > > public class FlinkTester {
> > > >    final Random random = new Random(1);
> > > >    final float density = 3.0F;
> > > >
> > > >    public static void main(String[] args) throws Exception {
> > > >        new FlinkTester().execute(1000000, 4);
> > > >    }
> > > >
> > > >    private void execute(int numEdges, int parallelism) throws
> > Exception {
> > > >        final ExecutionEnvironment env =
> > ExecutionEnvironment.createLocalEnvironment(parallelism);
> > > >        final Graph<Long, Long, Double> graph = createGraph(numEdges,
> > env);
> > > >
> > > >        final long start = System.currentTimeMillis();
> > > >        List<Vertex<Long, Long>> vertices = graph.run(new
> > CommunityDetection<Long>(10, 0.5)).getVertices().collect();
> > > >        System.out.println(vertices.size() + " vertices processed in "
> > + (System.currentTimeMillis()-start)/1000 + " s");
> > > >    }
> > > >
> > > >    private Graph<Long, Long, Double> createGraph(int numEdges,
> > ExecutionEnvironment env) {
> > > >        System.out.println("Creating new graph of " + numEdges + "
> > edges...");
> > > >
> > > >        final int maxNumVertices = (int)(numEdges/density);
> > > >        final Map<Long, Vertex<Long, Long>> vertexMap = new
> > HashMap<>(maxNumVertices);
> > > >        final Map<String, Edge<Long, Double>> edgeMap = new
> > HashMap<>(numEdges);
> > > >
> > > >        while (edgeMap.size() < numEdges) {
> > > >            long sourceId = random.nextInt(maxNumVertices) + 1;
> > > >            long targetId = sourceId;
> > > >            while (targetId == sourceId)
> > > >                targetId = random.nextInt(maxNumVertices) + 1;
> > > >
> > > >            final String edgeKey = sourceId + "#" + targetId;
> > > >            if (!edgeMap.containsKey(edgeKey)) {
> > > >                edgeMap.put(edgeKey, new Edge<>(sourceId, targetId,
> > 1D));
> > > >                if (!vertexMap.containsKey(sourceId))
> > > >                    vertexMap.put(sourceId, new Vertex<>(sourceId,
> > sourceId));
> > > >                if (!vertexMap.containsKey(targetId))
> > > >                    vertexMap.put(targetId, new Vertex<>(targetId,
> > targetId));
> > > >            }
> > > >        }
> > > >
> > > >        System.out.println(edgeMap.size() + " edges created between " +
> > vertexMap.size() + " vertices.");
> > > >        return Graph.fromCollection(vertexMap.values(),
> > edgeMap.values(), env);
> > > >    }
> > > > }
> > > >
> > > > No matter what graph algorithm you pick for benchmarking (above it's
> > CommunityDetection) the bigger the graph the wider performance gap (and
> > higher CPU/memory consumption) you observe when comparing the execution
> > times between the old engine (<= Flink 1.4.2) and the new one (checked on
> > 1.5.6, 1.8.2 and 1.9.1).
> > > >
> > > > Just run the code yourselves (you may play with the number of edges
> > and parallel threads).
> > > >
> > > > Best,
> > > >
> > > > Jakub
> > > >
> > >
> > >
> >
> 

Re: Flink 1.5+ performance in a Java standalone environment

Posted by Till Rohrmann <tr...@apache.org>.
Hi Jakub,

what are the cluster settings and the exact job settings you are running
your job with? I'm asking because one difference between legacy and FLIP-6
mode is that the legacy mode spreads out tasks across all available
TaskManagers whereas the FLIP-6 mode tries to bin package them on as few
TaskManagers as possible. If you have more slots than the parallelism of
your job, then I could see how this could affect the performance of your
job if it is not I/O bound but CPU bound. We will add an option to enable
the old spread out strategy again [1].

Another reason why you might see a performance degradation is the placement
of key groups. In the legacy mode, Flink distributed them so that two
TaskManagers with the same number of tasks would only have at most one key
group more. In FLIP-6 it can be up to the number of slots more key groups
on one of the TaskManagers. In order to mitigate this problem I would
recommend to set the maximum parallelism (== number of key groups) to a
multiple of your parallelism.

[1] https://issues.apache.org/jira/browse/FLINK-12122

Cheers,
Till

On Wed, Oct 30, 2019 at 4:28 PM Jakub Danilewicz <
jdanilewicz@alto-analytics.com> wrote:

> Hi,
>
> I can confirm that the performance drop is directly related to FLIP-6
> changes. Applying this modification to the code posted above restores the
> previous graph processing speed under Flink 1.5.6:
>
> ---------------------------------------------------------------------------
>
>     org.apache.flink.configuration.Configuration customConfig = new
> org.apache.flink.configuration.Configuration();
>     customConfig.setString("mode", "legacy");
>     final ExecutionEnvironment env =
> ExecutionEnvironment.createLocalEnvironment(customConfig);
>     env.setParallelism(parallelism);
>
> ---------------------------------------------------------------------------
>
> Disabling the "taskmanager.network.credit-model" parameter in
> Configuration provides only a very slight improvement in the performance
> under Flink 1.5.6.
>
> Now the big question: what about newer versions where the legacy mode is
> not supported anymore? I checked Flink 1.8.2 and it does not work.
>
> Is there any way to make the new mode as performant as the "legacy" one in
> the standalone scenarios? Alternatively may we expect improvements in this
> area in the upcoming releases?
>
> Best,
>
> Jakub
>
> On 2019/10/30 14:11:19, Piotr Nowojski <pi...@ververica.com> wrote:
> > Hi,
> >
> > In Flink 1.5 there were three big changes, that could affect
> performance.
> > 1. FLIP-6 changes (As previously Yang and Fabian mentioned)
> > 2. Credit base flow control (especially if you are using SSL)
> > 3. Low latency network changes
> >
> > I would suspect them in that order. First and second you can disable via
> configuration switches [1] and [2] respectively.
> >
> > [1] “mode:legacy"
> https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/config.html#core
> <
> https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/config.html#core
> >
> > [2] "taskmanager.network.credit-model:false”
> >
> > Could you try disabling them out?
> >
> > Piotrek
> >
> > > On 28 Oct 2019, at 14:10, Jakub Danilewicz <
> jdanilewicz@alto-analytics.com> wrote:
> > >
> > > Thanks for your replies.
> > >
> > > We use Flink from within a standalone Java 8 application (no Hadoop,
> no clustering), so it's basically boils down to running a simple code like
> this:
> > >
> > > import java.util.*;
> > > import org.apache.flink.api.java.ExecutionEnvironment;
> > > import org.apache.flink.graph.*;
> > > import org.apache.flink.graph.library.CommunityDetection;
> > >
> > > public class FlinkTester {
> > >    final Random random = new Random(1);
> > >    final float density = 3.0F;
> > >
> > >    public static void main(String[] args) throws Exception {
> > >        new FlinkTester().execute(1000000, 4);
> > >    }
> > >
> > >    private void execute(int numEdges, int parallelism) throws
> Exception {
> > >        final ExecutionEnvironment env =
> ExecutionEnvironment.createLocalEnvironment(parallelism);
> > >        final Graph<Long, Long, Double> graph = createGraph(numEdges,
> env);
> > >
> > >        final long start = System.currentTimeMillis();
> > >        List<Vertex<Long, Long>> vertices = graph.run(new
> CommunityDetection<Long>(10, 0.5)).getVertices().collect();
> > >        System.out.println(vertices.size() + " vertices processed in "
> + (System.currentTimeMillis()-start)/1000 + " s");
> > >    }
> > >
> > >    private Graph<Long, Long, Double> createGraph(int numEdges,
> ExecutionEnvironment env) {
> > >        System.out.println("Creating new graph of " + numEdges + "
> edges...");
> > >
> > >        final int maxNumVertices = (int)(numEdges/density);
> > >        final Map<Long, Vertex<Long, Long>> vertexMap = new
> HashMap<>(maxNumVertices);
> > >        final Map<String, Edge<Long, Double>> edgeMap = new
> HashMap<>(numEdges);
> > >
> > >        while (edgeMap.size() < numEdges) {
> > >            long sourceId = random.nextInt(maxNumVertices) + 1;
> > >            long targetId = sourceId;
> > >            while (targetId == sourceId)
> > >                targetId = random.nextInt(maxNumVertices) + 1;
> > >
> > >            final String edgeKey = sourceId + "#" + targetId;
> > >            if (!edgeMap.containsKey(edgeKey)) {
> > >                edgeMap.put(edgeKey, new Edge<>(sourceId, targetId,
> 1D));
> > >                if (!vertexMap.containsKey(sourceId))
> > >                    vertexMap.put(sourceId, new Vertex<>(sourceId,
> sourceId));
> > >                if (!vertexMap.containsKey(targetId))
> > >                    vertexMap.put(targetId, new Vertex<>(targetId,
> targetId));
> > >            }
> > >        }
> > >
> > >        System.out.println(edgeMap.size() + " edges created between " +
> vertexMap.size() + " vertices.");
> > >        return Graph.fromCollection(vertexMap.values(),
> edgeMap.values(), env);
> > >    }
> > > }
> > >
> > > No matter what graph algorithm you pick for benchmarking (above it's
> CommunityDetection) the bigger the graph the wider performance gap (and
> higher CPU/memory consumption) you observe when comparing the execution
> times between the old engine (<= Flink 1.4.2) and the new one (checked on
> 1.5.6, 1.8.2 and 1.9.1).
> > >
> > > Just run the code yourselves (you may play with the number of edges
> and parallel threads).
> > >
> > > Best,
> > >
> > > Jakub
> > >
> >
> >
>

Re: Flink 1.5+ performance in a Java standalone environment

Posted by Jakub Danilewicz <jd...@alto-analytics.com>.
Hi,

I can confirm that the performance drop is directly related to FLIP-6 changes. Applying this modification to the code posted above restores the previous graph processing speed under Flink 1.5.6:

---------------------------------------------------------------------------

    org.apache.flink.configuration.Configuration customConfig = new org.apache.flink.configuration.Configuration();
    customConfig.setString("mode", "legacy");
    final ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(customConfig);
    env.setParallelism(parallelism);

---------------------------------------------------------------------------

Disabling the "taskmanager.network.credit-model" parameter in Configuration provides only a very slight improvement in the performance under Flink 1.5.6.

Now the big question: what about newer versions where the legacy mode is not supported anymore? I checked Flink 1.8.2 and it does not work.

Is there any way to make the new mode as performant as the "legacy" one in the standalone scenarios? Alternatively may we expect improvements in this area in the upcoming releases?

Best,

Jakub

On 2019/10/30 14:11:19, Piotr Nowojski <pi...@ververica.com> wrote: 
> Hi,
> 
> In Flink 1.5 there were three big changes, that could affect performance. 
> 1. FLIP-6 changes (As previously Yang and Fabian mentioned)
> 2. Credit base flow control (especially if you are using SSL)
> 3. Low latency network changes
> 
> I would suspect them in that order. First and second you can disable via configuration switches [1] and [2] respectively.
> 
> [1] “mode:legacy" https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/config.html#core <https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/config.html#core>
> [2] "taskmanager.network.credit-model:false”
> 
> Could you try disabling them out?
> 
> Piotrek
> 
> > On 28 Oct 2019, at 14:10, Jakub Danilewicz <jd...@alto-analytics.com> wrote:
> > 
> > Thanks for your replies.
> > 
> > We use Flink from within a standalone Java 8 application (no Hadoop, no clustering), so it's basically boils down to running a simple code like this:
> > 
> > import java.util.*;
> > import org.apache.flink.api.java.ExecutionEnvironment;
> > import org.apache.flink.graph.*;
> > import org.apache.flink.graph.library.CommunityDetection;
> > 
> > public class FlinkTester {
> >    final Random random = new Random(1);
> >    final float density = 3.0F;
> > 
> >    public static void main(String[] args) throws Exception {
> >        new FlinkTester().execute(1000000, 4);
> >    }
> > 
> >    private void execute(int numEdges, int parallelism) throws Exception {
> >        final ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(parallelism);
> >        final Graph<Long, Long, Double> graph = createGraph(numEdges, env);
> > 
> >        final long start = System.currentTimeMillis();
> >        List<Vertex<Long, Long>> vertices = graph.run(new CommunityDetection<Long>(10, 0.5)).getVertices().collect();
> >        System.out.println(vertices.size() + " vertices processed in " + (System.currentTimeMillis()-start)/1000 + " s");
> >    }
> > 
> >    private Graph<Long, Long, Double> createGraph(int numEdges, ExecutionEnvironment env) {
> >        System.out.println("Creating new graph of " + numEdges + " edges...");
> > 
> >        final int maxNumVertices = (int)(numEdges/density);
> >        final Map<Long, Vertex<Long, Long>> vertexMap = new HashMap<>(maxNumVertices);
> >        final Map<String, Edge<Long, Double>> edgeMap = new HashMap<>(numEdges);
> > 
> >        while (edgeMap.size() < numEdges) {
> >            long sourceId = random.nextInt(maxNumVertices) + 1;
> >            long targetId = sourceId;
> >            while (targetId == sourceId)
> >                targetId = random.nextInt(maxNumVertices) + 1;
> > 
> >            final String edgeKey = sourceId + "#" + targetId;
> >            if (!edgeMap.containsKey(edgeKey)) {
> >                edgeMap.put(edgeKey, new Edge<>(sourceId, targetId, 1D));
> >                if (!vertexMap.containsKey(sourceId))
> >                    vertexMap.put(sourceId, new Vertex<>(sourceId, sourceId));
> >                if (!vertexMap.containsKey(targetId))
> >                    vertexMap.put(targetId, new Vertex<>(targetId, targetId));
> >            }
> >        }
> > 
> >        System.out.println(edgeMap.size() + " edges created between " + vertexMap.size() + " vertices.");
> >        return Graph.fromCollection(vertexMap.values(), edgeMap.values(), env);
> >    }
> > }
> > 
> > No matter what graph algorithm you pick for benchmarking (above it's CommunityDetection) the bigger the graph the wider performance gap (and higher CPU/memory consumption) you observe when comparing the execution times between the old engine (<= Flink 1.4.2) and the new one (checked on 1.5.6, 1.8.2 and 1.9.1).
> > 
> > Just run the code yourselves (you may play with the number of edges and parallel threads).
> > 
> > Best,
> > 
> > Jakub
> > 
> 
> 

Re: Flink 1.5+ performance in a Java standalone environment

Posted by Piotr Nowojski <pi...@ververica.com>.
Hi,

In Flink 1.5 there were three big changes, that could affect performance. 
1. FLIP-6 changes (As previously Yang and Fabian mentioned)
2. Credit base flow control (especially if you are using SSL)
3. Low latency network changes

I would suspect them in that order. First and second you can disable via configuration switches [1] and [2] respectively.

[1] “mode:legacy" https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/config.html#core <https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/config.html#core>
[2] "taskmanager.network.credit-model:false”

Could you try disabling them out?

Piotrek

> On 28 Oct 2019, at 14:10, Jakub Danilewicz <jd...@alto-analytics.com> wrote:
> 
> Thanks for your replies.
> 
> We use Flink from within a standalone Java 8 application (no Hadoop, no clustering), so it's basically boils down to running a simple code like this:
> 
> import java.util.*;
> import org.apache.flink.api.java.ExecutionEnvironment;
> import org.apache.flink.graph.*;
> import org.apache.flink.graph.library.CommunityDetection;
> 
> public class FlinkTester {
>    final Random random = new Random(1);
>    final float density = 3.0F;
> 
>    public static void main(String[] args) throws Exception {
>        new FlinkTester().execute(1000000, 4);
>    }
> 
>    private void execute(int numEdges, int parallelism) throws Exception {
>        final ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(parallelism);
>        final Graph<Long, Long, Double> graph = createGraph(numEdges, env);
> 
>        final long start = System.currentTimeMillis();
>        List<Vertex<Long, Long>> vertices = graph.run(new CommunityDetection<Long>(10, 0.5)).getVertices().collect();
>        System.out.println(vertices.size() + " vertices processed in " + (System.currentTimeMillis()-start)/1000 + " s");
>    }
> 
>    private Graph<Long, Long, Double> createGraph(int numEdges, ExecutionEnvironment env) {
>        System.out.println("Creating new graph of " + numEdges + " edges...");
> 
>        final int maxNumVertices = (int)(numEdges/density);
>        final Map<Long, Vertex<Long, Long>> vertexMap = new HashMap<>(maxNumVertices);
>        final Map<String, Edge<Long, Double>> edgeMap = new HashMap<>(numEdges);
> 
>        while (edgeMap.size() < numEdges) {
>            long sourceId = random.nextInt(maxNumVertices) + 1;
>            long targetId = sourceId;
>            while (targetId == sourceId)
>                targetId = random.nextInt(maxNumVertices) + 1;
> 
>            final String edgeKey = sourceId + "#" + targetId;
>            if (!edgeMap.containsKey(edgeKey)) {
>                edgeMap.put(edgeKey, new Edge<>(sourceId, targetId, 1D));
>                if (!vertexMap.containsKey(sourceId))
>                    vertexMap.put(sourceId, new Vertex<>(sourceId, sourceId));
>                if (!vertexMap.containsKey(targetId))
>                    vertexMap.put(targetId, new Vertex<>(targetId, targetId));
>            }
>        }
> 
>        System.out.println(edgeMap.size() + " edges created between " + vertexMap.size() + " vertices.");
>        return Graph.fromCollection(vertexMap.values(), edgeMap.values(), env);
>    }
> }
> 
> No matter what graph algorithm you pick for benchmarking (above it's CommunityDetection) the bigger the graph the wider performance gap (and higher CPU/memory consumption) you observe when comparing the execution times between the old engine (<= Flink 1.4.2) and the new one (checked on 1.5.6, 1.8.2 and 1.9.1).
> 
> Just run the code yourselves (you may play with the number of edges and parallel threads).
> 
> Best,
> 
> Jakub
> 


Re: Flink 1.5+ performance in a Java standalone environment

Posted by Jakub Danilewicz <jd...@alto-analytics.com>.
Thanks for your replies.

We use Flink from within a standalone Java 8 application (no Hadoop, no clustering), so it's basically boils down to running a simple code like this:

import java.util.*;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.graph.*;
import org.apache.flink.graph.library.CommunityDetection;

public class FlinkTester {
    final Random random = new Random(1);
    final float density = 3.0F;

    public static void main(String[] args) throws Exception {
        new FlinkTester().execute(1000000, 4);
    }

    private void execute(int numEdges, int parallelism) throws Exception {
        final ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(parallelism);
        final Graph<Long, Long, Double> graph = createGraph(numEdges, env);

        final long start = System.currentTimeMillis();
        List<Vertex<Long, Long>> vertices = graph.run(new CommunityDetection<Long>(10, 0.5)).getVertices().collect();
        System.out.println(vertices.size() + " vertices processed in " + (System.currentTimeMillis()-start)/1000 + " s");
    }

    private Graph<Long, Long, Double> createGraph(int numEdges, ExecutionEnvironment env) {
        System.out.println("Creating new graph of " + numEdges + " edges...");

        final int maxNumVertices = (int)(numEdges/density);
        final Map<Long, Vertex<Long, Long>> vertexMap = new HashMap<>(maxNumVertices);
        final Map<String, Edge<Long, Double>> edgeMap = new HashMap<>(numEdges);

        while (edgeMap.size() < numEdges) {
            long sourceId = random.nextInt(maxNumVertices) + 1;
            long targetId = sourceId;
            while (targetId == sourceId)
                targetId = random.nextInt(maxNumVertices) + 1;

            final String edgeKey = sourceId + "#" + targetId;
            if (!edgeMap.containsKey(edgeKey)) {
                edgeMap.put(edgeKey, new Edge<>(sourceId, targetId, 1D));
                if (!vertexMap.containsKey(sourceId))
                    vertexMap.put(sourceId, new Vertex<>(sourceId, sourceId));
                if (!vertexMap.containsKey(targetId))
                    vertexMap.put(targetId, new Vertex<>(targetId, targetId));
            }
        }

        System.out.println(edgeMap.size() + " edges created between " + vertexMap.size() + " vertices.");
        return Graph.fromCollection(vertexMap.values(), edgeMap.values(), env);
    }
}

No matter what graph algorithm you pick for benchmarking (above it's CommunityDetection) the bigger the graph the wider performance gap (and higher CPU/memory consumption) you observe when comparing the execution times between the old engine (<= Flink 1.4.2) and the new one (checked on 1.5.6, 1.8.2 and 1.9.1).

Just run the code yourselves (you may play with the number of edges and parallel threads).

Best,

Jakub


Re: Flink 1.5+ performance in a Java standalone environment

Posted by Yang Wang <da...@gmail.com>.
Hi Jakub,

Do you have multiple task managers in your standalone environment? If it
is, after FLIP-6,
we could not guarantee to distribute the tasks across multiple TMs evenly.
And this may cause
some nodes are in heavy utilization. Could you check the task distribution?

This is a known issue[1].

Best,
Yang

[1] https://issues.apache.org/jira/browse/FLINK-11815

Fabian Hueske <fh...@gmail.com> 于2019年10月25日周五 下午11:49写道:

> Hi Jakub,
>
> I had a look at the changes of Flink 1.5 [1] and didn't find anything
> obvious.
> Something that might cause a different behavior is the new deployment and
> process model (FLIP-6).
>
> In Flink 1.5, there is a switch to disable it and use the previous
> deployment mechanism.
> You could try to disable the new new model [2] and see if this cause the
> performance issue.
>
> Note that the legacy mode was removed in one of the later versions.
>
> Best, Fabian
>
> [1] https://flink.apache.org/news/2018/05/25/release-1.5.0.html
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.5/release-notes/flink-1.5.html#update-configuration-for-reworked-job-deployment
>
> Am Do., 24. Okt. 2019 um 19:37 Uhr schrieb Jakub Danilewicz <
> jdanilewicz@alto-analytics.com>:
>
>> Hi,
>>
>> I have recently tried to upgrade Flink from 1.2.0 to the newest version
>> and noticed that starting from the version 1.5 the performance is much
>> worse when processing fixed graphs in a standalone JVM environment (Java
>> 8).
>>
>> This affects all the use-cases when a Gelly graph (pre-built from a fixed
>> collection of nodes/edges) gets processed by any of our custom algorithms
>> (VertexCentric, ScatterGather or GSA), especially when using parallel
>> processing for a local ExecutionEnvironment. The processing times
>> (compared to the versions <= 1.4.2) double/triple, while CPU and memory
>> consumption increase significantly.
>>
>> Are there any fine-tuning steps/tricks for the job processing engine
>> behind Flink 1.5+ that would improve the performance in the scenarios
>> described above?
>>
>> Best,
>>
>> Jakub
>>
>

Re: Flink 1.5+ performance in a Java standalone environment

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Jakub,

I had a look at the changes of Flink 1.5 [1] and didn't find anything
obvious.
Something that might cause a different behavior is the new deployment and
process model (FLIP-6).

In Flink 1.5, there is a switch to disable it and use the previous
deployment mechanism.
You could try to disable the new new model [2] and see if this cause the
performance issue.

Note that the legacy mode was removed in one of the later versions.

Best, Fabian

[1] https://flink.apache.org/news/2018/05/25/release-1.5.0.html
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.5/release-notes/flink-1.5.html#update-configuration-for-reworked-job-deployment

Am Do., 24. Okt. 2019 um 19:37 Uhr schrieb Jakub Danilewicz <
jdanilewicz@alto-analytics.com>:

> Hi,
>
> I have recently tried to upgrade Flink from 1.2.0 to the newest version
> and noticed that starting from the version 1.5 the performance is much
> worse when processing fixed graphs in a standalone JVM environment (Java
> 8).
>
> This affects all the use-cases when a Gelly graph (pre-built from a fixed
> collection of nodes/edges) gets processed by any of our custom algorithms
> (VertexCentric, ScatterGather or GSA), especially when using parallel
> processing for a local ExecutionEnvironment. The processing times
> (compared to the versions <= 1.4.2) double/triple, while CPU and memory
> consumption increase significantly.
>
> Are there any fine-tuning steps/tricks for the job processing engine
> behind Flink 1.5+ that would improve the performance in the scenarios
> described above?
>
> Best,
>
> Jakub
>