You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2019/12/11 04:47:59 UTC

[GitHub] [spark] HeartSaVioR opened a new pull request #26845: [SPARK-21869][SS] Revise Kafka producer pool to implement 'expire' correctly

HeartSaVioR opened a new pull request #26845: [SPARK-21869][SS] Revise Kafka producer pool to implement 'expire' correctly
URL: https://github.com/apache/spark/pull/26845
 
 
   ### What changes were proposed in this pull request?
   
   This patch revises Kafka producer pool (cache) to implement 'expire' correctly.
   
   Current implementation of Kafka producer cache leverages Guava cache, which decides cached producer instance to be expired if the instance is not "accessed" from cache. The behavior defines expiration time as "last accessed time + timeout", which is incorrect because some task may use the instance longer than timeout. There's no concept of "returning" in Guava cache as well, so it cannot be fixed with Guava cache.
   
   This patch introduces a new pool implementation which tracks "reference count" of cached instance, and defines expiration time for the instance as "last returned time + timeout" if the reference count goes 0, otherwise Long.MaxValue (effectively no expire). Expiring instances will be done with evict thread explicitly instead of evicting in part of handling acquire.
   
   This patch also creates a new package `producer` under `kafka010`, to hide the details from `kafka010` package. In point of `kafka010` package's view, only acquire()/release()/reset() are available in pool, and even for CachedKafkaProducer the package cannot close the producer directly.
   
   ### Why are the changes needed?
   
   Explained above.
   
   ### Does this PR introduce any user-facing change?
   
   Yes, but only for the way of expiring cached instances. (The difference is described above.) Each executor leveraging spark-sql-kafka would have one eviction thread.
   
   ### How was this patch tested?
   
   New and existing UTs.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org