You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@kudu.apache.org by Frank Heimerzheim <fh...@gmail.com> on 2017/03/17 14:45:58 UTC

Timeout in spark with compute intensive operations and kudu as datasource

Hello,

we are using cloudera: kudu 1.20 with 5 tablet server and 3 master server
spark 2.0 running on 5 worker nodes
spark-jobs started with yarn-client and python 2.7.10 via spark2-submit

The code i´m running is essentially:

df = sqlContext.read.format('org.apache.kudu.spark.kudu')\
                    .option('kudu.master', self.kudu_master)\
                    .option('kudu.table', self.source_table)\
                    .load()
rdd = df.rdd.map(lambda x: compute_intensive_function(x))
print rdd.count()

After 2 to 10 minutes spark reproducible throws something like:

17/03/17 09:18:18 INFO scheduler.TaskSetManager: Starting task 2.1 in
stage 1.0 (TID 5, xx.xx.de, executor 19, partition 3, NODE_LOCAL, 6133
bytes)
17/03/17 09:18:18 INFO
cluster.YarnSchedulerBackend$YarnDriverEndpoint: Launching task 5 on
executor id: 19 hostname: xx.xx.de.

17/03/17 09:20:40 WARN scheduler.TaskSetManager: Lost task 2.1 in
stage 1.0 (TID 5, xx.xx.de, executor 19):
org.apache.kudu.client.NonRecoverableException: Scanner not found

	at org.apache.kudu.client.TabletClient.dispatchTSErrorOrReturnException(TabletClient.java:557)

On the kudu tablet server in the logs something appears like:

I0317 10:10:33.199148 28240 scanners.cc:165] Expiring scanner id:
eb38ff5eb718426e85bc5bc22de7bdac, of tablet
4d316a9dd5424ceb91c91aff90290164, after 60314403 us of inactivity,
which is > TTL (60000000 us).
I0317 10:13:03.201413 28240 scanners.cc:165] Expiring scanner id:
00d5bec1906241ee8749e354665497e9, of tablet
4d316a9dd5424ceb91c91aff90290164, after 62854357 us of inactivity,
which is > TTL (60000000 us).
I0317 10:15:33.203809 28240 scanners.cc:165] Expiring scanner id:
cabb68f96756463989e0abdbad1bbaaf, of tablet
4d316a9dd5424ceb91c91aff90290164, after 61797695 us of inactivity,
which is > TTL (60000000 us).

Using parquet rather than kudu as data source

df = sqlContext.read.parquet('/tmp/test/foo')
rdd = df.rdd.map(lambda x: compute_intensive_function(x))
print rdd.count()

everything works out fine, even if one single computation task takes
15 minutes or more.

In the kudu flags i found this parameter:

--scanner_ttl_ms=60000

Altering it to some bigger value is helping. As our computation can
take up to 90 minutes on one executor i would have to set
scanner_ttl_ms to such a big value, that i fear some sideeffects i
can't estimate.

So my question to you is if there are other parameters i should try to
alter, like

--scanner_batch_size_rows=100
--scanner_default_batch_size_bytes=1048576
--scanner_max_batch_size_bytes=8388608
--scanner_max_wait_ms=1000

or parameters within spark which could affect the behaviour in a desirable way.

The number of tasks spark is creating depends on the number of
partitions in kudu. Probably it would be helpful to rise the number of
tasks, as one single task would need less computation time, which
would lead to fewer errors like reported above. I don´t know a
performant and simple way to rise the number of tasks at will and i´m
not sure if this approach would be desireable from a spark point of
view. What would be an ideal number of kudu partitions when we have
5*20 Cores in the spark computation nodes?


Thank you for your help
Frank

Re: Timeout in spark with compute intensive operations and kudu as datasource

Posted by Alexey Serbin <as...@cloudera.com>.
Hi Frank,

Open scanners consume memory on the server side -- they hold references 
to the relevant data.  Especially if you have some parallel write/ingest 
activity on the same tables, then setting --scanner_ttl_ms flag to very 
high value might lead to excessive memory usage by tablet servers.  If 
that happened, you could try to tell scanners keep less memory for the 
referenced data via specifying lower value for the 
--scanner_default_batch_size_bytes and --scanner_max_batch_size_bytes flags.

However, you could specify smaller batch size for KuduRDD objects to 
help with the default scanner TTL on the server side.  I.e., if the 
client spends a lot of time processing each row, using smaller batches 
for scan operations would result fetching batches or rows from the 
server more frequently, updating scanner last-used-timestamp, so the 
scanner would not time out.  As I understand, doing that would spare you 
from necessity of changing the default value for the --scanner_ttl_ms 
flag on the server side.

I think --scanner_max_wait_ms is not relevant here: that's about 
snapshot scans which are run to capture recent updates on the data (if 
any happens in parallel with scans).

Hope this helps.


Best regards,

Alexey


On 3/17/17 7:45 AM, Frank Heimerzheim wrote:
> Hello,
>
> we are using cloudera: kudu 1.20 with 5 tablet server and 3 master server
> spark 2.0 running on 5 worker nodes
> spark-jobs started with yarn-client and python 2.7.10 via spark2-submit
>
> The code i�m running is essentially:
> df = sqlContext.read.format('org.apache.kudu.spark.kudu')\
>                      .option('kudu.master',self.kudu_master)\
>                      .option('kudu.table',self.source_table)\
>                      .load()
> rdd = df.rdd.map(lambda x: compute_intensive_function(x))
> print rdd.count()
>
> After 2 to 10 minutes spark reproducible throws something like: 
> 17/03/17 09:18:18 INFO scheduler.TaskSetManager: Starting task 2.1 in 
> stage 1.0 (TID 5, xx.xx.de <http://xx.xx.de>, executor 19, partition 
> 3, NODE_LOCAL, 6133 bytes) 17/03/17 09:18:18 INFO 
> cluster.YarnSchedulerBackend$YarnDriverEndpoint: Launching task 5 on 
> executor id: 19 hostname: xx.xx.de <http://xx.xx.de>.
> 17/03/17 09:20:40 WARN scheduler.TaskSetManager: Lost task 2.1 in 
> stage 1.0 (TID 5, xx.xx.de <http://xx.xx.de>, executor 19): org.apache.kudu.client.NonRecoverableException: Scanner not found
> at 
> org.apache.kudu.client.TabletClient.dispatchTSErrorOrReturnException(TabletClient.java:557) 
> On the kudu tablet server in the logs something appears like: I0317 
> 10:10:33.199148 28240 scanners.cc:165] Expiring scanner id: 
> eb38ff5eb718426e85bc5bc22de7bdac, of tablet 
> 4d316a9dd5424ceb91c91aff90290164, after 60314403 us of inactivity, 
> which is > TTL (60000000 us). I0317 10:13:03.201413 28240 
> scanners.cc:165] Expiring scanner id: 
> 00d5bec1906241ee8749e354665497e9, of tablet 
> 4d316a9dd5424ceb91c91aff90290164, after 62854357 us of inactivity, 
> which is > TTL (60000000 us). I0317 10:15:33.203809 28240 
> scanners.cc:165] Expiring scanner id: 
> cabb68f96756463989e0abdbad1bbaaf, of tablet 
> 4d316a9dd5424ceb91c91aff90290164, after 61797695 us of inactivity, 
> which is > TTL (60000000 us). Using parquet rather than kudu as data 
> source df = sqlContext.read.parquet('/tmp/test/foo')
> rdd = df.rdd.map(lambda x: compute_intensive_function(x))
> print rdd.count()
> everything works out fine, even if one single computation task takes 
> 15 minutes or more. In the kudu flags i found this parameter: --scanner_ttl_ms=60000
>
> Altering it to some bigger value is helping. As our computation can 
> take up to 90 minutes on one executor i would have to set 
> scanner_ttl_ms to such a big value, that i fear some sideeffects i 
> can't estimate. So my question to you is if there are other parameters 
> i should try to alter, like --scanner_batch_size_rows=100
> --scanner_default_batch_size_bytes=1048576
> --scanner_max_batch_size_bytes=8388608
> --scanner_max_wait_ms=1000
>
> or parameters within spark which could affect the behaviour in a 
> desirable way. The number of tasks spark is creating depends on the 
> number of partitions in kudu. Probably it would be helpful to rise the 
> number of tasks, as one single task would need less computation time, 
> which would lead to fewer errors like reported above. I don�t know a 
> performant and simple way to rise the number of tasks at will and i�m 
> not sure if this approach would be desireable from a spark point of 
> view. What would be an ideal number of kudu partitions when we have 
> 5*20 Cores in the spark computation nodes? Thank you for your help Frank