You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by bjkonglu <gi...@git.apache.org> on 2018/07/05 09:14:30 UTC

[GitHub] spark pull request #21718: [SPARK-24744][STRUCTRURED STREAMING] Set the Spar...

GitHub user bjkonglu opened a pull request:

    https://github.com/apache/spark/pull/21718

    [SPARK-24744][STRUCTRURED STREAMING] Set the SparkSession configuratiā€¦

    # Background
    When I use structured streaming to construct my application, there is something odd! The application always set option [spark.sql.shuffle.partitions] to default value [200]. Even though, I set [spark.sql.shuffle.partitions] to other value by SparkConf or --conf spark.sql.shuffle.partitions=100,  but it doesn't work. The option value is default value as before.
    # Analyse
    I review the relevant code. The relevant code is in [org.apache.spark.sql.execution.streaming.OffsetSeqMetadata].
    
    ```scala
    /** Set the SparkSession configuration with the values in the metadata */
      def setSessionConf(metadata: OffsetSeqMetadata, sessionConf: RuntimeConfig): Unit = {
        OffsetSeqMetadata.relevantSQLConfs.map(_.key).foreach { confKey =>
    
          metadata.conf.get(confKey) match {
    
            case Some(valueInMetadata) =>
              // Config value exists in the metadata, update the session config with this value
              val optionalValueInSession = sessionConf.getOption(confKey)
              if (optionalValueInSession.isDefined && optionalValueInSession.get != valueInMetadata) {
                logWarning(s"Updating the value of conf '$confKey' in current session from " +
                  s"'${optionalValueInSession.get}' to '$valueInMetadata'.")
              }
              sessionConf.set(confKey, valueInMetadata)
    
            case None =>
              // For backward compatibility, if a config was not recorded in the offset log,
              // then log it, and let the existing conf value in SparkSession prevail.
              logWarning (s"Conf '$confKey' was not found in the offset log, using existing value")
          }
        }
      }
    ```
    In this code, we can find it always set some option in metadata value. But as user, we want to those option can set by user. So I changed this code.
    
    ```scala
    /** Set the SparkSession configuration with the values in the metadata */
      def setSessionConf(metadata: OffsetSeqMetadata, sessionConf: RuntimeConfig): Unit = {
        OffsetSeqMetadata.relevantSQLConfs.map(_.key).foreach { confKey =>
    
          metadata.conf.get(confKey) match {
    
            case Some(valueInMetadata) =>
              // Config value exists in the metadata, update the session config with this value
              val optionalValueInSession = sessionConf.getOption(confKey)
    
              if (optionalValueInSession.isDefined && optionalValueInSession.get != valueInMetadata) {
                sessionConf.set(confKey, optionalValueInSession.get)
              } else {
                logWarning(s"Updating the value of conf '$confKey' in current session from " +
                  s"'${optionalValueInSession.get}' to '$valueInMetadata'.")
                sessionConf.set(confKey, valueInMetadata)
              }
    
            case None =>
              // For backward compatibility, if a config was not recorded in the offset log,
              // then log it, and let the existing conf value in SparkSession prevail.
              logWarning (s"Conf '$confKey' was not found in the offset log, using existing value")
          }
        }
      }
    ```
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/bjkonglu/spark OffsetSeq

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/21718.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #21718
    
----
commit 941f123f0a6b9c013394a1004d79740debcc17fe
Author: bjkonglu <ko...@...>
Date:   2018-07-05T08:58:04Z

    [SPARK-24744][STRUCTRURED STREAMING] Set the SparkSession configuration with user set if there are values in metadata and values in user sets.

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21718: [SPARK-24744][STRUCTRURED STREAMING] Set the SparkSessio...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21718
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21718: [SPARK-24744][STRUCTRURED STREAMING] Set the SparkSessio...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21718
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21718: [SPARK-24744][STRUCTRURED STREAMING] Set the SparkSessio...

Posted by bjkonglu <gi...@git.apache.org>.
Github user bjkonglu commented on the issue:

    https://github.com/apache/spark/pull/21718
  
    Thx @HeartSaVioR .I am looking forward to your solution.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21718: [SPARK-24744][STRUCTRURED STREAMING] Set the SparkSessio...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21718
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21718: [SPARK-24744][STRUCTRURED STREAMING] Set the SparkSessio...

Posted by wguangliang <gi...@git.apache.org>.
Github user wguangliang commented on the issue:

    https://github.com/apache/spark/pull/21718
  
    it is neccessary that user can restart the spark app and change the configuration without deleting the checkpoint


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21718: [SPARK-24744][STRUCTRURED STREAMING] Set the SparkSessio...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/spark/pull/21718
  
    I'm aware of this issue and have it in my backlog, but for now it doesn't look like easy to address in efficient way. I'll propose an approach for rescaling state when I get one.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21718: [SPARK-24744][STRUCTRURED STREAMING] Set the SparkSessio...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21718
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21718: [SPARK-24744][STRUCTRURED STREAMING] Set the SparkSessio...

Posted by bethunebtj <gi...@git.apache.org>.
Github user bethunebtj commented on the issue:

    https://github.com/apache/spark/pull/21718
  
    Follow.
    I meet this problem too.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21718: [SPARK-24744][STRUCTRURED STREAMING] Set the SparkSessio...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/spark/pull/21718
  
    @bjkonglu @bethunebtj @wguangliang 
    
    Update: I thought about splitting execution tasks and data partitions (`spark.sql.shuffle.partitions`), and turned out it can be achieved by calling `coalesce`. With `coalesce` you can reduce execution tasks whereas the number of data partitions is kept same. Please note that we still can't change `spark.sql.shuffle.partitions`, since repartitioning state will not be trivial according to the size of the state.
    
    One thing to note is that execution tasks will be reduced even for downstream operators (unless there's a new stage), so you need to call `repartition` to adjust execution tasks for downstream.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21718: [SPARK-24744][STRUCTRURED STREAMING] Set the SparkSessio...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/spark/pull/21718
  
    It has been fairly easy to rescale partitions before stateful operators came into play. For structured streaming, it is now not a trivial thing, cause rescaling partitions should also handle rescaling of state which is stored to disk. Rescaling state may require reading whole states and redistribute via hash function, and resave to disk again. That's why SS stores previous conf. and force using it.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21718: [SPARK-24744][STRUCTRURED STREAMING] Set the SparkSessio...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21718
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org