You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Jungtaek Lim (Jira)" <ji...@apache.org> on 2021/01/06 04:19:00 UTC

[jira] [Comment Edited] (SPARK-33635) Performance regression in Kafka read

    [ https://issues.apache.org/jira/browse/SPARK-33635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17259399#comment-17259399 ] 

Jungtaek Lim edited comment on SPARK-33635 at 1/6/21, 4:18 AM:
---------------------------------------------------------------

I've spent some time to trace the issue, and noticed SPARK-29054 (+SPARK-30495) caused performance regression (though the patch itself is doing the right thing).

{code}
  private[kafka010] def getOrRetrieveConsumer(): InternalKafkaConsumer = {
    if (!_consumer.isDefined) {
      retrieveConsumer()
    }
    require(_consumer.isDefined, "Consumer must be defined")
    if (KafkaTokenUtil.needTokenUpdate(SparkEnv.get.conf, _consumer.get.kafkaParamsWithSecurity,
        _consumer.get.clusterConfig)) {
      logDebug("Cached consumer uses an old delegation token, invalidating.")
      releaseConsumer()
      consumerPool.invalidateKey(cacheKey)
      fetchedDataPool.invalidate(cacheKey)
      retrieveConsumer()
    }
    _consumer.get
  }
{code}

{code}
  def needTokenUpdate(
      sparkConf: SparkConf,
      params: ju.Map[String, Object],
      clusterConfig: Option[KafkaTokenClusterConf]): Boolean = {
    if (HadoopDelegationTokenManager.isServiceEnabled(sparkConf, "kafka") &&
        clusterConfig.isDefined && params.containsKey(SaslConfigs.SASL_JAAS_CONFIG)) {
      logDebug("Delegation token used by connector, checking if uses the latest token.")
      val connectorJaasParams = params.get(SaslConfigs.SASL_JAAS_CONFIG).asInstanceOf[String]
      getTokenJaasParams(clusterConfig.get) != connectorJaasParams
    } else {
      false
    }
  }
{code}

{code}
  def isServiceEnabled(sparkConf: SparkConf, serviceName: String): Boolean = {
    val key = providerEnabledConfig.format(serviceName)

    deprecatedProviderEnabledConfigs.foreach { pattern =>
      val deprecatedKey = pattern.format(serviceName)
      if (sparkConf.contains(deprecatedKey)) {
        logWarning(s"${deprecatedKey} is deprecated.  Please use ${key} instead.")
      }
    }

    val isEnabledDeprecated = deprecatedProviderEnabledConfigs.forall { pattern =>
      sparkConf
        .getOption(pattern.format(serviceName))
        .map(_.toBoolean)
        .getOrElse(true)
    }

    sparkConf
      .getOption(key)
      .map(_.toBoolean)
      .getOrElse(isEnabledDeprecated)
  }
{code}

With my test data and default config, Spark pulled 500 records per a poll from Kafka, which ended up "10,280,000" calls to get() which always calls getOrRetrieveConsumer(). A single call of KafkaTokenUtil.needTokenUpdate() wouldn't add significant overhead, but 10,000,000 calls make a significant difference. Assuming the case where delegation token is not applied, HadoopDelegationTokenManager.isServiceEnabled is the culprit on such huge overhead.

We could probably resolve the issue via short-term solution & long-term solution.

* short-term solution: change the order of check in needTokenUpdate, so that the performance hit is only affected when using delegation token. I'll raise a PR shortly.
* long-term solution(s): 1) optimize HadoopDelegationTokenManager.isServiceEnabled 2) find a way to reduce the occurrence of checking necessarily of token update.

Note that even with short-term solution, a slight performance hit is observed as it still does more things on the code path compared to Spark 2.4. Though I'd ignore it if it affects slightly, like less than 1%, or even slightly higher but the code addition is mandatory.


was (Author: kabhwan):
I've spent some time to trace the issue, and noticed SPARK-29054 (+SPARK-30495) caused performance regression (though the patch itself is doing the right thing).

{code}
  private[kafka010] def getOrRetrieveConsumer(): InternalKafkaConsumer = {
    if (!_consumer.isDefined) {
      retrieveConsumer()
    }
    require(_consumer.isDefined, "Consumer must be defined")
    if (KafkaTokenUtil.needTokenUpdate(SparkEnv.get.conf, _consumer.get.kafkaParamsWithSecurity,
        _consumer.get.clusterConfig)) {
      logDebug("Cached consumer uses an old delegation token, invalidating.")
      releaseConsumer()
      consumerPool.invalidateKey(cacheKey)
      fetchedDataPool.invalidate(cacheKey)
      retrieveConsumer()
    }
    _consumer.get
  }
{code}

{code}
  def needTokenUpdate(
      sparkConf: SparkConf,
      params: ju.Map[String, Object],
      clusterConfig: Option[KafkaTokenClusterConf]): Boolean = {
    if (HadoopDelegationTokenManager.isServiceEnabled(sparkConf, "kafka") &&
        clusterConfig.isDefined && params.containsKey(SaslConfigs.SASL_JAAS_CONFIG)) {
      logDebug("Delegation token used by connector, checking if uses the latest token.")
      val connectorJaasParams = params.get(SaslConfigs.SASL_JAAS_CONFIG).asInstanceOf[String]
      getTokenJaasParams(clusterConfig.get) != connectorJaasParams
    } else {
      false
    }
  }
{code}

{code}
  def isServiceEnabled(sparkConf: SparkConf, serviceName: String): Boolean = {
    val key = providerEnabledConfig.format(serviceName)

    deprecatedProviderEnabledConfigs.foreach { pattern =>
      val deprecatedKey = pattern.format(serviceName)
      if (sparkConf.contains(deprecatedKey)) {
        logWarning(s"${deprecatedKey} is deprecated.  Please use ${key} instead.")
      }
    }

    val isEnabledDeprecated = deprecatedProviderEnabledConfigs.forall { pattern =>
      sparkConf
        .getOption(pattern.format(serviceName))
        .map(_.toBoolean)
        .getOrElse(true)
    }

    sparkConf
      .getOption(key)
      .map(_.toBoolean)
      .getOrElse(isEnabledDeprecated)
  }
{code}

With my test data creator, Spark pulled 500 records per a poll from Kafka, which ended up "10,280,000" calls to get() which always calls getOrRetrieveConsumer(). A single call of KafkaTokenUtil.needTokenUpdate() wouldn't add significant overhead, but 10,000,000 calls make a significant difference. Assuming the case where delegation token is not applied, HadoopDelegationTokenManager.isServiceEnabled is the culprit on such huge overhead.

We could probably resolve the issue via short-term solution & long-term solution.

* short-term solution: change the order of check in needTokenUpdate, so that the performance hit is only affected when using delegation token. I'll raise a PR shortly.
* long-term solution(s): 1) optimize HadoopDelegationTokenManager.isServiceEnabled 2) find a way to reduce the occurrence of checking necessarily of token update.

Note that even with short-term solution, a slight performance hit is observed as it still does more things on the code path compared to Spark 2.4. Though I'd ignore it if it affects slightly, like less than 1%, or even slightly higher but the code addition is mandatory.

> Performance regression in Kafka read
> ------------------------------------
>
>                 Key: SPARK-33635
>                 URL: https://issues.apache.org/jira/browse/SPARK-33635
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 3.0.0, 3.0.1
>         Environment: A simple 5 node system. A simple data row of csv data in kafka, evenly distributed between the partitions.
> Open JDK 1.8.0.252
> Spark in stand alone - 5 nodes, 10 workers (2 worker per node, each locked to a distinct NUMA group)
> kafka (v 2.3.1) cluster - 5 nodes (1 broker per node).
> Centos 7.7.1908
> 1 topic, 10 partiions, 1 hour queue life
> (this is just one of clusters we have, I have tested on all of them and theyall exhibit the same performance degredation)
>            Reporter: David Wyles
>            Priority: Major
>
> I have observed a slowdown in the reading of data from kafka on all of our systems when migrating from spark 2.4.5 to Spark 3.0.0 (and Spark 3.0.1)
> I have created a sample project to isolate the problem as much as possible, with just a read all data from a kafka topic (see [https://github.com/codegorillauk/spark-kafka-read] ).
> With 2.4.5, across multiple runs, 
>  I get a stable read rate of 1,120,000 (1.12 mill) rows per second
> With 3.0.0 or 3.0.1, across multiple runs,
>  I get a stable read rate of 632,000 (0.632 mil) rows per second
> The represents a *44% loss in performance*. Which is, a lot.
> I have been working though the spark-sql-kafka-0-10 code base, but change for spark 3 have been ongoing for over a year and its difficult to pin point an exact change or reason for the degradation.
> I am happy to help fix this problem, but will need some assitance as I am unfamiliar with the spark-sql-kafka-0-10 project.
>  
> A sample of the data my test reads (note: its not parsing csv - this is just test data)
>  1606921800000,001e0610e532,lightsense,tsl250rd,intensity,21853,53.262,acceleration_z,651,ep,290,commit,913,pressure,138,pm1,799,uv_intensity,823,idletime,-372,count,-72,ir_intensity,185,concentration,-61,flags,-532,tx,694.36,ep_heatsink,-556.92,acceleration_x,-221.40,fw,910.53,sample_flow_rate,-959.60,uptime,-515.15,pm10,-768.03,powersupply,214.72,magnetic_field_y,-616.04,alphasense,606.73,AoT_Chicago,053,Racine Ave & 18th St Chicago IL,41.857959,-87.65642700000002,AoT Chicago (S) [C],2017/12/15 00:00:00,



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org