You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Till Rohrmann (JIRA)" <ji...@apache.org> on 2014/10/28 09:41:33 UTC

[jira] [Created] (FLINK-1195) Improvement of benchmarking infrastructure

Till Rohrmann created FLINK-1195:
------------------------------------

             Summary: Improvement of benchmarking infrastructure
                 Key: FLINK-1195
                 URL: https://issues.apache.org/jira/browse/FLINK-1195
             Project: Flink
          Issue Type: Wish
            Reporter: Till Rohrmann


I noticed while running my ALS benchmarks that we still have some potential to improve our benchmarking infrastructure. The current state is that we execute the benchmark jobs by writing a script with a single set of parameters. The runtime is then manually retrieved from the web interface of Flink and Spark, respectively.

I think we need the following extensions:

* Automatic runtime retrieval and storage in a file
* Repeated execution of jobs to gather some "advanced" statistics such as mean and standard deviation of the runtimes
* Support for value sets for the individual parameters

The automatic runtime retrieval would allow us to execute several benchmarks consecutively without having to lookup the runtimes in the logs or in the web interface, which btw only stores the runtimes of the last 5 jobs.

What I mean with value sets is that would be nice to specify a set of parameter values for which the benchmark is run without having to write for every single parameter combination a benchmark script. I believe that this feature would become very handy when we want to look at the runtime behaviour of Flink for different input sizes or degrees of parallelism, for example. To illustrate what I mean:

{code}
INPUTSIZE = 1000, 2000, 4000, 8000
DOP = 1, 2, 4, 8
OUTPUT=benchmarkResults
repetitions=10
command=benchmark.jar -p $DOP $INPUTSIZE 
{code} 

Something like that would execute the benchmark job with (DOP=1, INPUTSIZE=1000), (DOP=2, INPUTSIZE=2000),.... 10 times each, calculate for each parameter combination runtime statistics and store the results in the file benchmarkResults.

I believe that spending some effort now will pay off in the long run because we will benchmark Flink continuously. What do you guys think?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Re: [jira] [Created] (FLINK-1195) Improvement of benchmarking infrastructure

Posted by Stephan Ewen <se...@apache.org>.
Hi!

I totally agree.

I can contribute a bash script that makes various runs with different jobs,
different parameters and jar files:
It automatically collects times and logs for the runs.

Output:

Execution times (msecs):

KMeansPlainJava  64628 56234 62974 66003 66295
KMeansPlainScala  59961 53519 53922 54927 57295
KMeansSimNoKeySels  212684 239473 258493 205840 236463
KMeansImmutable_no_compact  206341 210160 233862 231071 225073
KMeansImmutable_compact  182459 189495 185829 196167 184058
KMeansImmutable_compact_assert  102597 96203 107883 96752 105110
KMeansMutable  95092 91662 103233 88992 93104




Script:

#!/bin/bash

STRATOSPHERE_HOME="/share/nephele/stratosphere-scala";
JOBS_DIR="$STRATOSPHERE_HOME/examples/pact4s/perfTests";
RESULT_DIR="/home/sewen/scalaExperiments";
TIMES_FILE="times.txt"

JOB_NAMES=("WordCountPlainJava" "WordCountPlainScala"
"WordCountSimNoKeySels" "WordCountImmutable_no_compact"
"WordCountImmutable_compact" "WordCountImmutable_compact_assert"
"WordCountMutable" "TPCHQuery3PlainJava" "TPCHQuery3PlainScala"
"TPCHQuery3SimNoKeySels" "TPCHQuery3Immutable_no_compact"
"TPCHQuery3Immutable_compact" "TPCHQuery3Immutable_compact_assert"
"TPCHQuery3Mutable" "KMeansPlainJava" "KMeansPlainScala"
"KMeansSimNoKeySels" "KMeansImmutable_no_compact" "KMeansImmutable_compact"
"KMeansImmutable_compact_assert" "KMeansMutable");

JOB_JARS=("pact4s-tests-0.2-WordCountPlainJava.jar"
"pact4s-tests-0.2-WordCountPlainScala.jar"
"pact4s-tests-0.2-WordCountSimNoKeySels.jar"
"pact4s-tests-0.2-WordCountImmutable.jar"
"pact4s-tests-0.2-WordCountImmutable.jar"
"pact4s-tests-0.2-WordCountImmutable.jar"
"pact4s-tests-0.2-WordCountMutable.jar"
"pact4s-tests-0.2-TPCHQuery3PlainJava.jar"
"pact4s-tests-0.2-TPCHQuery3PlainScala.jar"
"pact4s-tests-0.2-TPCHQuery3SimNoKeySels.jar"
"pact4s-tests-0.2-TPCHQuery3Immutable.jar"
"pact4s-tests-0.2-TPCHQuery3Immutable.jar"
"pact4s-tests-0.2-TPCHQuery3Immutable.jar"
"pact4s-tests-0.2-TPCHQuery3Mutable.jar"
"pact4s-tests-0.2-KMeansPlainJava.jar"
"pact4s-tests-0.2-KMeansPlainScala.jar"
"pact4s-tests-0.2-KMeansSimNoKeySels.jar"
"pact4s-tests-0.2-KMeansImmutable.jar"
"pact4s-tests-0.2-KMeansImmutable.jar"
"pact4s-tests-0.2-KMeansImmutable.jar"
"pact4s-tests-0.2-KMeansMutable.jar");

JOB_PARAMETERS=("32 hdfs://cloud-7.dima.tu-berlin.de:40010/demo/lipsum
hdfs://cloud-7.dima.tu-berlin.de:40010/demo/result_lipsum" "32 hdfs://
cloud-7.dima.tu-berlin.de:40010/demo/lipsum hdfs://
cloud-7.dima.tu-berlin.de:40010/demo/result_lipsum" "-subtasks 32 -input
hdfs://cloud-7.dima.tu-berlin.de:40010/demo/lipsum -output hdfs://
cloud-7.dima.tu-berlin.de:40010/demo/result_lipsum -nocompact -nohints"
"-subtasks 32 -input hdfs://cloud-7.dima.tu-berlin.de:40010/demo/lipsum
-output hdfs://cloud-7.dima.tu-berlin.de:40010/demo/result_lipsum
-nocompact -nohints" "-subtasks 32 -input hdfs://
cloud-7.dima.tu-berlin.de:40010/demo/lipsum -output hdfs://
cloud-7.dima.tu-berlin.de:40010/demo/result_lipsum -nohints" "-subtasks 32
-input hdfs://cloud-7.dima.tu-berlin.de:40010/demo/lipsum -output hdfs://
cloud-7.dima.tu-berlin.de:40010/demo/result_lipsum" "-subtasks 32 -input
hdfs://cloud-7.dima.tu-berlin.de:40010/demo/lipsum -output hdfs://
cloud-7.dima.tu-berlin.de:40010/demo/result_lipsum" "32 hdfs://
cloud-7.dima.tu-berlin.de:40010/demo/tpch/scale100/orders hdfs://
cloud-7.dima.tu-berlin.de:40010/demo/tpch/scale100/lineitem hdfs://
cloud-7.dima.tu-berlin.de:40010/demo/resultTPCH" "32 hdfs://
cloud-7.dima.tu-berlin.de:40010/demo/tpch/scale100/orders hdfs://
cloud-7.dima.tu-berlin.de:40010/demo/tpch/scale100/lineitem hdfs://
cloud-7.dima.tu-berlin.de:40010/demo/resultTPCH" "-subtasks 32 -orders
hdfs://cloud-7.dima.tu-berlin.de:40010/demo/tpch/scale100/orders -lineItems
hdfs://cloud-7.dima.tu-berlin.de:40010/demo/tpch/scale100/lineitem -output
hdfs://cloud-7.dima.tu-berlin.de:40010/demo/resultTPCH -nocompact -nohints"
"-subtasks 32 -orders hdfs://
cloud-7.dima.tu-berlin.de:40010/demo/tpch/scale100/orders -lineItems hdfs://
cloud-7.dima.tu-berlin.de:40010/demo/tpch/scale100/lineitem -output hdfs://
cloud-7.dima.tu-berlin.de:40010/demo/resultTPCH -nocompact -nohints"
"-subtasks 32 -orders hdfs://
cloud-7.dima.tu-berlin.de:40010/demo/tpch/scale100/orders -lineItems hdfs://
cloud-7.dima.tu-berlin.de:40010/demo/tpch/scale100/lineitem -output hdfs://
cloud-7.dima.tu-berlin.de:40010/demo/resultTPCH -nohints" "-subtasks 32
-orders hdfs://cloud-7.dima.tu-berlin.de:40010/demo/tpch/scale100/orders
-lineItems hdfs://
cloud-7.dima.tu-berlin.de:40010/demo/tpch/scale100/lineitem -output hdfs://
cloud-7.dima.tu-berlin.de:40010/demo/resultTPCH" "-subtasks 32 -orders
hdfs://cloud-7.dima.tu-berlin.de:40010/demo/tpch/scale100/orders -lineItems
hdfs://cloud-7.dima.tu-berlin.de:40010/demo/tpch/scale100/lineitem -output
hdfs://cloud-7.dima.tu-berlin.de:40010/demo/resultTPCH" "32 hdfs://
cloud-7.dima.tu-berlin.de:40010/demo/kmeans_2d/datapoints hdfs://
cloud-7.dima.tu-berlin.de:40010/demo/kmeans_2d/iter_0 hdfs://
cloud-7.dima.tu-berlin.de:40010/demo/result_x_1" "32 hdfs://
cloud-7.dima.tu-berlin.de:40010/demo/kmeans_2d/datapoints hdfs://
cloud-7.dima.tu-berlin.de:40010/demo/kmeans_2d/iter_0 hdfs://
cloud-7.dima.tu-berlin.de:40010/demo/result_x_1" "-subtasks 32
-numIterations 1 -dataPoints hdfs://
cloud-7.dima.tu-berlin.de:40010/demo/kmeans_2d/datapoints -initialCenters
hdfs://cloud-7.dima.tu-berlin.de:40010/demo/kmeans_2d/iter_0 -output hdfs://
cloud-7.dima.tu-berlin.de:40010/demo/result_x_1 -nocompact -nohints"
"-subtasks 32 -numIterations 1 -dataPoints hdfs://
cloud-7.dima.tu-berlin.de:40010/demo/kmeans_2d/datapoints -initialCenters
hdfs://cloud-7.dima.tu-berlin.de:40010/demo/kmeans_2d/iter_0 -output hdfs://
cloud-7.dima.tu-berlin.de:40010/demo/result_x_1 -nocompact -nohints"
"-subtasks 32 -numIterations 1 -dataPoints hdfs://
cloud-7.dima.tu-berlin.de:40010/demo/kmeans_2d/datapoints -initialCenters
hdfs://cloud-7.dima.tu-berlin.de:40010/demo/kmeans_2d/iter_0 -output hdfs://
cloud-7.dima.tu-berlin.de:40010/demo/result_x_1 -nohints" "-subtasks 32
-numIterations 1 -dataPoints hdfs://
cloud-7.dima.tu-berlin.de:40010/demo/kmeans_2d/datapoints -initialCenters
hdfs://cloud-7.dima.tu-berlin.de:40010/demo/kmeans_2d/iter_0 -output hdfs://
cloud-7.dima.tu-berlin.de:40010/demo/result_x_1" "-subtasks 32
-numIterations 1 -dataPoints hdfs://
cloud-7.dima.tu-berlin.de:40010/demo/kmeans_2d/datapoints -initialCenters
hdfs://cloud-7.dima.tu-berlin.de:40010/demo/kmeans_2d/iter_0 -output hdfs://
cloud-7.dima.tu-berlin.de:40010/demo/result_x_1");

NUM_RUNS=5;

echo "Beginning Tests..."
echo "Execution times (msecs):
" > $RESULT_DIR/$TIMES_FILE

for index in ${!JOB_NAMES[*]}
do
   job=${JOB_NAMES[$index]};
   jar=$JOBS_DIR/${JOB_JARS[$index]};
   params=${JOB_PARAMETERS[$index]};

   echo "Running $jar with arguments $params as $run_name"
   echo -n "$job " >> $RESULT_DIR/$TIMES_FILE

   for test_run in `seq 1 $NUM_RUNS`;
   do
      res_dir="$RESULT_DIR/"$job"_$run_name/$test_run";
      mkdir -p "$res_dir"
      echo "Run #$test_run"

      echo "Restarting Nephele Cluster..."
      $STRATOSPHERE_HOME/bin/stop-cluster.sh > /dev/null 2> /dev/null
      sleep 10
      rm -rf "$STRATOSPHERE_HOME/log/*"
      $STRATOSPHERE_HOME/bin/start-cluster.sh > /dev/null 2> /dev/null
      sleep 60

      echo "Running Job..."
      $STRATOSPHERE_HOME/bin/pact-client.sh run -j $jar -w -a $params >
$res_dir/out.txt 2> $res_dir/err.txt

      time_taken=`cat $res_dir/out.txt | grep 'Job duration (in ms):' | awk
'{print $5}'`;
      echo -n " $time_taken" >> $RESULT_DIR/$TIMES_FILE

      echo "Copying files..."
      cp $STRATOSPHERE_HOME/log/* $res_dir/;
   done

   echo '' >> $RESULT_DIR/$TIMES_FILE
done





On Tue, Oct 28, 2014 at 9:41 AM, Till Rohrmann (JIRA) <ji...@apache.org>
wrote:

> Till Rohrmann created FLINK-1195:
> ------------------------------------
>
>              Summary: Improvement of benchmarking infrastructure
>                  Key: FLINK-1195
>                  URL: https://issues.apache.org/jira/browse/FLINK-1195
>              Project: Flink
>           Issue Type: Wish
>             Reporter: Till Rohrmann
>
>
> I noticed while running my ALS benchmarks that we still have some
> potential to improve our benchmarking infrastructure. The current state is
> that we execute the benchmark jobs by writing a script with a single set of
> parameters. The runtime is then manually retrieved from the web interface
> of Flink and Spark, respectively.
>
> I think we need the following extensions:
>
> * Automatic runtime retrieval and storage in a file
> * Repeated execution of jobs to gather some "advanced" statistics such as
> mean and standard deviation of the runtimes
> * Support for value sets for the individual parameters
>
> The automatic runtime retrieval would allow us to execute several
> benchmarks consecutively without having to lookup the runtimes in the logs
> or in the web interface, which btw only stores the runtimes of the last 5
> jobs.
>
> What I mean with value sets is that would be nice to specify a set of
> parameter values for which the benchmark is run without having to write for
> every single parameter combination a benchmark script. I believe that this
> feature would become very handy when we want to look at the runtime
> behaviour of Flink for different input sizes or degrees of parallelism, for
> example. To illustrate what I mean:
>
> {code}
> INPUTSIZE = 1000, 2000, 4000, 8000
> DOP = 1, 2, 4, 8
> OUTPUT=benchmarkResults
> repetitions=10
> command=benchmark.jar -p $DOP $INPUTSIZE
> {code}
>
> Something like that would execute the benchmark job with (DOP=1,
> INPUTSIZE=1000), (DOP=2, INPUTSIZE=2000),.... 10 times each, calculate for
> each parameter combination runtime statistics and store the results in the
> file benchmarkResults.
>
> I believe that spending some effort now will pay off in the long run
> because we will benchmark Flink continuously. What do you guys think?
>
>
>
> --
> This message was sent by Atlassian JIRA
> (v6.3.4#6332)
>