You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Till Rohrmann (JIRA)" <ji...@apache.org> on 2014/10/28 09:41:33 UTC
[jira] [Created] (FLINK-1195) Improvement of benchmarking
infrastructure
Till Rohrmann created FLINK-1195:
------------------------------------
Summary: Improvement of benchmarking infrastructure
Key: FLINK-1195
URL: https://issues.apache.org/jira/browse/FLINK-1195
Project: Flink
Issue Type: Wish
Reporter: Till Rohrmann
I noticed while running my ALS benchmarks that we still have some potential to improve our benchmarking infrastructure. The current state is that we execute the benchmark jobs by writing a script with a single set of parameters. The runtime is then manually retrieved from the web interface of Flink and Spark, respectively.
I think we need the following extensions:
* Automatic runtime retrieval and storage in a file
* Repeated execution of jobs to gather some "advanced" statistics such as mean and standard deviation of the runtimes
* Support for value sets for the individual parameters
The automatic runtime retrieval would allow us to execute several benchmarks consecutively without having to lookup the runtimes in the logs or in the web interface, which btw only stores the runtimes of the last 5 jobs.
What I mean with value sets is that would be nice to specify a set of parameter values for which the benchmark is run without having to write for every single parameter combination a benchmark script. I believe that this feature would become very handy when we want to look at the runtime behaviour of Flink for different input sizes or degrees of parallelism, for example. To illustrate what I mean:
{code}
INPUTSIZE = 1000, 2000, 4000, 8000
DOP = 1, 2, 4, 8
OUTPUT=benchmarkResults
repetitions=10
command=benchmark.jar -p $DOP $INPUTSIZE
{code}
Something like that would execute the benchmark job with (DOP=1, INPUTSIZE=1000), (DOP=2, INPUTSIZE=2000),.... 10 times each, calculate for each parameter combination runtime statistics and store the results in the file benchmarkResults.
I believe that spending some effort now will pay off in the long run because we will benchmark Flink continuously. What do you guys think?
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
Re: [jira] [Created] (FLINK-1195) Improvement of benchmarking infrastructure
Posted by Stephan Ewen <se...@apache.org>.
Hi!
I totally agree.
I can contribute a bash script that makes various runs with different jobs,
different parameters and jar files:
It automatically collects times and logs for the runs.
Output:
Execution times (msecs):
KMeansPlainJava 64628 56234 62974 66003 66295
KMeansPlainScala 59961 53519 53922 54927 57295
KMeansSimNoKeySels 212684 239473 258493 205840 236463
KMeansImmutable_no_compact 206341 210160 233862 231071 225073
KMeansImmutable_compact 182459 189495 185829 196167 184058
KMeansImmutable_compact_assert 102597 96203 107883 96752 105110
KMeansMutable 95092 91662 103233 88992 93104
Script:
#!/bin/bash
STRATOSPHERE_HOME="/share/nephele/stratosphere-scala";
JOBS_DIR="$STRATOSPHERE_HOME/examples/pact4s/perfTests";
RESULT_DIR="/home/sewen/scalaExperiments";
TIMES_FILE="times.txt"
JOB_NAMES=("WordCountPlainJava" "WordCountPlainScala"
"WordCountSimNoKeySels" "WordCountImmutable_no_compact"
"WordCountImmutable_compact" "WordCountImmutable_compact_assert"
"WordCountMutable" "TPCHQuery3PlainJava" "TPCHQuery3PlainScala"
"TPCHQuery3SimNoKeySels" "TPCHQuery3Immutable_no_compact"
"TPCHQuery3Immutable_compact" "TPCHQuery3Immutable_compact_assert"
"TPCHQuery3Mutable" "KMeansPlainJava" "KMeansPlainScala"
"KMeansSimNoKeySels" "KMeansImmutable_no_compact" "KMeansImmutable_compact"
"KMeansImmutable_compact_assert" "KMeansMutable");
JOB_JARS=("pact4s-tests-0.2-WordCountPlainJava.jar"
"pact4s-tests-0.2-WordCountPlainScala.jar"
"pact4s-tests-0.2-WordCountSimNoKeySels.jar"
"pact4s-tests-0.2-WordCountImmutable.jar"
"pact4s-tests-0.2-WordCountImmutable.jar"
"pact4s-tests-0.2-WordCountImmutable.jar"
"pact4s-tests-0.2-WordCountMutable.jar"
"pact4s-tests-0.2-TPCHQuery3PlainJava.jar"
"pact4s-tests-0.2-TPCHQuery3PlainScala.jar"
"pact4s-tests-0.2-TPCHQuery3SimNoKeySels.jar"
"pact4s-tests-0.2-TPCHQuery3Immutable.jar"
"pact4s-tests-0.2-TPCHQuery3Immutable.jar"
"pact4s-tests-0.2-TPCHQuery3Immutable.jar"
"pact4s-tests-0.2-TPCHQuery3Mutable.jar"
"pact4s-tests-0.2-KMeansPlainJava.jar"
"pact4s-tests-0.2-KMeansPlainScala.jar"
"pact4s-tests-0.2-KMeansSimNoKeySels.jar"
"pact4s-tests-0.2-KMeansImmutable.jar"
"pact4s-tests-0.2-KMeansImmutable.jar"
"pact4s-tests-0.2-KMeansImmutable.jar"
"pact4s-tests-0.2-KMeansMutable.jar");
JOB_PARAMETERS=("32 hdfs://cloud-7.dima.tu-berlin.de:40010/demo/lipsum
hdfs://cloud-7.dima.tu-berlin.de:40010/demo/result_lipsum" "32 hdfs://
cloud-7.dima.tu-berlin.de:40010/demo/lipsum hdfs://
cloud-7.dima.tu-berlin.de:40010/demo/result_lipsum" "-subtasks 32 -input
hdfs://cloud-7.dima.tu-berlin.de:40010/demo/lipsum -output hdfs://
cloud-7.dima.tu-berlin.de:40010/demo/result_lipsum -nocompact -nohints"
"-subtasks 32 -input hdfs://cloud-7.dima.tu-berlin.de:40010/demo/lipsum
-output hdfs://cloud-7.dima.tu-berlin.de:40010/demo/result_lipsum
-nocompact -nohints" "-subtasks 32 -input hdfs://
cloud-7.dima.tu-berlin.de:40010/demo/lipsum -output hdfs://
cloud-7.dima.tu-berlin.de:40010/demo/result_lipsum -nohints" "-subtasks 32
-input hdfs://cloud-7.dima.tu-berlin.de:40010/demo/lipsum -output hdfs://
cloud-7.dima.tu-berlin.de:40010/demo/result_lipsum" "-subtasks 32 -input
hdfs://cloud-7.dima.tu-berlin.de:40010/demo/lipsum -output hdfs://
cloud-7.dima.tu-berlin.de:40010/demo/result_lipsum" "32 hdfs://
cloud-7.dima.tu-berlin.de:40010/demo/tpch/scale100/orders hdfs://
cloud-7.dima.tu-berlin.de:40010/demo/tpch/scale100/lineitem hdfs://
cloud-7.dima.tu-berlin.de:40010/demo/resultTPCH" "32 hdfs://
cloud-7.dima.tu-berlin.de:40010/demo/tpch/scale100/orders hdfs://
cloud-7.dima.tu-berlin.de:40010/demo/tpch/scale100/lineitem hdfs://
cloud-7.dima.tu-berlin.de:40010/demo/resultTPCH" "-subtasks 32 -orders
hdfs://cloud-7.dima.tu-berlin.de:40010/demo/tpch/scale100/orders -lineItems
hdfs://cloud-7.dima.tu-berlin.de:40010/demo/tpch/scale100/lineitem -output
hdfs://cloud-7.dima.tu-berlin.de:40010/demo/resultTPCH -nocompact -nohints"
"-subtasks 32 -orders hdfs://
cloud-7.dima.tu-berlin.de:40010/demo/tpch/scale100/orders -lineItems hdfs://
cloud-7.dima.tu-berlin.de:40010/demo/tpch/scale100/lineitem -output hdfs://
cloud-7.dima.tu-berlin.de:40010/demo/resultTPCH -nocompact -nohints"
"-subtasks 32 -orders hdfs://
cloud-7.dima.tu-berlin.de:40010/demo/tpch/scale100/orders -lineItems hdfs://
cloud-7.dima.tu-berlin.de:40010/demo/tpch/scale100/lineitem -output hdfs://
cloud-7.dima.tu-berlin.de:40010/demo/resultTPCH -nohints" "-subtasks 32
-orders hdfs://cloud-7.dima.tu-berlin.de:40010/demo/tpch/scale100/orders
-lineItems hdfs://
cloud-7.dima.tu-berlin.de:40010/demo/tpch/scale100/lineitem -output hdfs://
cloud-7.dima.tu-berlin.de:40010/demo/resultTPCH" "-subtasks 32 -orders
hdfs://cloud-7.dima.tu-berlin.de:40010/demo/tpch/scale100/orders -lineItems
hdfs://cloud-7.dima.tu-berlin.de:40010/demo/tpch/scale100/lineitem -output
hdfs://cloud-7.dima.tu-berlin.de:40010/demo/resultTPCH" "32 hdfs://
cloud-7.dima.tu-berlin.de:40010/demo/kmeans_2d/datapoints hdfs://
cloud-7.dima.tu-berlin.de:40010/demo/kmeans_2d/iter_0 hdfs://
cloud-7.dima.tu-berlin.de:40010/demo/result_x_1" "32 hdfs://
cloud-7.dima.tu-berlin.de:40010/demo/kmeans_2d/datapoints hdfs://
cloud-7.dima.tu-berlin.de:40010/demo/kmeans_2d/iter_0 hdfs://
cloud-7.dima.tu-berlin.de:40010/demo/result_x_1" "-subtasks 32
-numIterations 1 -dataPoints hdfs://
cloud-7.dima.tu-berlin.de:40010/demo/kmeans_2d/datapoints -initialCenters
hdfs://cloud-7.dima.tu-berlin.de:40010/demo/kmeans_2d/iter_0 -output hdfs://
cloud-7.dima.tu-berlin.de:40010/demo/result_x_1 -nocompact -nohints"
"-subtasks 32 -numIterations 1 -dataPoints hdfs://
cloud-7.dima.tu-berlin.de:40010/demo/kmeans_2d/datapoints -initialCenters
hdfs://cloud-7.dima.tu-berlin.de:40010/demo/kmeans_2d/iter_0 -output hdfs://
cloud-7.dima.tu-berlin.de:40010/demo/result_x_1 -nocompact -nohints"
"-subtasks 32 -numIterations 1 -dataPoints hdfs://
cloud-7.dima.tu-berlin.de:40010/demo/kmeans_2d/datapoints -initialCenters
hdfs://cloud-7.dima.tu-berlin.de:40010/demo/kmeans_2d/iter_0 -output hdfs://
cloud-7.dima.tu-berlin.de:40010/demo/result_x_1 -nohints" "-subtasks 32
-numIterations 1 -dataPoints hdfs://
cloud-7.dima.tu-berlin.de:40010/demo/kmeans_2d/datapoints -initialCenters
hdfs://cloud-7.dima.tu-berlin.de:40010/demo/kmeans_2d/iter_0 -output hdfs://
cloud-7.dima.tu-berlin.de:40010/demo/result_x_1" "-subtasks 32
-numIterations 1 -dataPoints hdfs://
cloud-7.dima.tu-berlin.de:40010/demo/kmeans_2d/datapoints -initialCenters
hdfs://cloud-7.dima.tu-berlin.de:40010/demo/kmeans_2d/iter_0 -output hdfs://
cloud-7.dima.tu-berlin.de:40010/demo/result_x_1");
NUM_RUNS=5;
echo "Beginning Tests..."
echo "Execution times (msecs):
" > $RESULT_DIR/$TIMES_FILE
for index in ${!JOB_NAMES[*]}
do
job=${JOB_NAMES[$index]};
jar=$JOBS_DIR/${JOB_JARS[$index]};
params=${JOB_PARAMETERS[$index]};
echo "Running $jar with arguments $params as $run_name"
echo -n "$job " >> $RESULT_DIR/$TIMES_FILE
for test_run in `seq 1 $NUM_RUNS`;
do
res_dir="$RESULT_DIR/"$job"_$run_name/$test_run";
mkdir -p "$res_dir"
echo "Run #$test_run"
echo "Restarting Nephele Cluster..."
$STRATOSPHERE_HOME/bin/stop-cluster.sh > /dev/null 2> /dev/null
sleep 10
rm -rf "$STRATOSPHERE_HOME/log/*"
$STRATOSPHERE_HOME/bin/start-cluster.sh > /dev/null 2> /dev/null
sleep 60
echo "Running Job..."
$STRATOSPHERE_HOME/bin/pact-client.sh run -j $jar -w -a $params >
$res_dir/out.txt 2> $res_dir/err.txt
time_taken=`cat $res_dir/out.txt | grep 'Job duration (in ms):' | awk
'{print $5}'`;
echo -n " $time_taken" >> $RESULT_DIR/$TIMES_FILE
echo "Copying files..."
cp $STRATOSPHERE_HOME/log/* $res_dir/;
done
echo '' >> $RESULT_DIR/$TIMES_FILE
done
On Tue, Oct 28, 2014 at 9:41 AM, Till Rohrmann (JIRA) <ji...@apache.org>
wrote:
> Till Rohrmann created FLINK-1195:
> ------------------------------------
>
> Summary: Improvement of benchmarking infrastructure
> Key: FLINK-1195
> URL: https://issues.apache.org/jira/browse/FLINK-1195
> Project: Flink
> Issue Type: Wish
> Reporter: Till Rohrmann
>
>
> I noticed while running my ALS benchmarks that we still have some
> potential to improve our benchmarking infrastructure. The current state is
> that we execute the benchmark jobs by writing a script with a single set of
> parameters. The runtime is then manually retrieved from the web interface
> of Flink and Spark, respectively.
>
> I think we need the following extensions:
>
> * Automatic runtime retrieval and storage in a file
> * Repeated execution of jobs to gather some "advanced" statistics such as
> mean and standard deviation of the runtimes
> * Support for value sets for the individual parameters
>
> The automatic runtime retrieval would allow us to execute several
> benchmarks consecutively without having to lookup the runtimes in the logs
> or in the web interface, which btw only stores the runtimes of the last 5
> jobs.
>
> What I mean with value sets is that would be nice to specify a set of
> parameter values for which the benchmark is run without having to write for
> every single parameter combination a benchmark script. I believe that this
> feature would become very handy when we want to look at the runtime
> behaviour of Flink for different input sizes or degrees of parallelism, for
> example. To illustrate what I mean:
>
> {code}
> INPUTSIZE = 1000, 2000, 4000, 8000
> DOP = 1, 2, 4, 8
> OUTPUT=benchmarkResults
> repetitions=10
> command=benchmark.jar -p $DOP $INPUTSIZE
> {code}
>
> Something like that would execute the benchmark job with (DOP=1,
> INPUTSIZE=1000), (DOP=2, INPUTSIZE=2000),.... 10 times each, calculate for
> each parameter combination runtime statistics and store the results in the
> file benchmarkResults.
>
> I believe that spending some effort now will pay off in the long run
> because we will benchmark Flink continuously. What do you guys think?
>
>
>
> --
> This message was sent by Atlassian JIRA
> (v6.3.4#6332)
>