You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2016/09/14 17:34:03 UTC

spark git commit: [MINOR][SQL] Add missing functions for some options in SQLConf and use them where applicable

Repository: spark
Updated Branches:
  refs/heads/master 6d06ff6f7 -> a79838bde


[MINOR][SQL] Add missing functions for some options in SQLConf and use them where applicable

## What changes were proposed in this pull request?

I first thought they are missing because they are kind of hidden options but it seems they are just missing.

For example, `spark.sql.parquet.mergeSchema` is documented in [sql-programming-guide.md](https://github.com/apache/spark/blob/master/docs/sql-programming-guide.md) but this function is missing whereas many options such as `spark.sql.join.preferSortMergeJoin` are not documented but have its own function individually.

So, this PR suggests making them consistent by adding the missing functions for some options in `SQLConf` and use them where applicable, in order to make them more readable.

## How was this patch tested?

Existing tests should cover this.

Author: hyukjinkwon <gu...@gmail.com>

Closes #14678 from HyukjinKwon/sqlconf-cleanup.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a79838bd
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a79838bd
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a79838bd

Branch: refs/heads/master
Commit: a79838bdeeb12cec4d50da3948bd8a33777e53a6
Parents: 6d06ff6
Author: hyukjinkwon <gu...@gmail.com>
Authored: Thu Sep 15 01:33:56 2016 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Thu Sep 15 01:33:56 2016 +0800

----------------------------------------------------------------------
 .../spark/sql/RelationalGroupedDataset.scala    |  2 +-
 .../spark/sql/execution/QueryExecution.scala    |  2 +-
 .../sql/execution/datasources/DataSource.scala  |  2 +-
 .../InsertIntoHadoopFsRelationCommand.scala     |  2 +-
 .../PartitioningAwareFileCatalog.scala          |  2 +-
 .../datasources/parquet/ParquetFileFormat.scala |  8 ++--
 .../datasources/parquet/ParquetOptions.scala    |  2 +-
 .../execution/streaming/FileStreamSinkLog.scala |  6 +--
 .../execution/streaming/StreamExecution.scala   |  2 +-
 .../streaming/state/StateStoreConf.scala        |  6 +--
 .../org/apache/spark/sql/internal/SQLConf.scala | 42 +++++++++++++++-----
 .../sql/streaming/StreamingQueryManager.scala   |  4 +-
 12 files changed, 49 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/a79838bd/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala
index 53d7324..6c3fe07 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala
@@ -313,7 +313,7 @@ class RelationalGroupedDataset protected[sql](
    */
   def pivot(pivotColumn: String): RelationalGroupedDataset = {
     // This is to prevent unintended OOM errors when the number of distinct values is large
-    val maxValues = df.sparkSession.conf.get(SQLConf.DATAFRAME_PIVOT_MAX_VALUES)
+    val maxValues = df.sparkSession.sessionState.conf.dataFramePivotMaxValues
     // Get the distinct values of the column and sort them so its consistent
     val values = df.select(pivotColumn)
       .distinct()

http://git-wip-us.apache.org/repos/asf/spark/blob/a79838bd/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
index d484563..383b3a2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
@@ -55,7 +55,7 @@ class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) {
   }
 
   def assertSupported(): Unit = {
-    if (sparkSession.sessionState.conf.getConf(SQLConf.UNSUPPORTED_OPERATION_CHECK_ENABLED)) {
+    if (sparkSession.sessionState.conf.isUnsupportedOperationCheckEnabled) {
       UnsupportedOperationChecker.checkForBatch(analyzed)
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/a79838bd/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 825c013..93154bd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -231,7 +231,7 @@ case class DataSource(
           }
         }
 
-        val isSchemaInferenceEnabled = sparkSession.conf.get(SQLConf.STREAMING_SCHEMA_INFERENCE)
+        val isSchemaInferenceEnabled = sparkSession.sessionState.conf.streamingSchemaInference
         val isTextSource = providingClass == classOf[text.TextFileFormat]
         // If the schema inference is disabled, only text sources require schema to be specified
         if (!isSchemaInferenceEnabled && !isTextSource && userSpecifiedSchema.isEmpty) {

http://git-wip-us.apache.org/repos/asf/spark/blob/a79838bd/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
index 02ce7fa..99ca3df 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
@@ -131,7 +131,7 @@ case class InsertIntoHadoopFsRelationCommand(
             dataColumns = dataColumns,
             inputSchema = query.output,
             PartitioningUtils.DEFAULT_PARTITION_NAME,
-            sparkSession.conf.get(SQLConf.PARTITION_MAX_FILES),
+            sparkSession.sessionState.conf.partitionMaxFiles,
             isAppend)
         }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/a79838bd/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
index cef9d4d..d2d5b56 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
@@ -126,7 +126,7 @@ abstract class PartitioningAwareFileCatalog(
         PartitioningUtils.parsePartitions(
           leafDirs,
           PartitioningUtils.DEFAULT_PARTITION_NAME,
-          typeInference = sparkSession.sessionState.conf.partitionColumnTypeInferenceEnabled(),
+          typeInference = sparkSession.sessionState.conf.partitionColumnTypeInferenceEnabled,
           basePaths = basePaths)
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/a79838bd/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index 9208c82..e7c3545 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -151,7 +151,7 @@ class ParquetFileFormat
     // Should we merge schemas from all Parquet part-files?
     val shouldMergeSchemas = parquetOptions.mergeSchema
 
-    val mergeRespectSummaries = sparkSession.conf.get(SQLConf.PARQUET_SCHEMA_RESPECT_SUMMARIES)
+    val mergeRespectSummaries = sparkSession.sessionState.conf.isParquetSchemaRespectSummaries
 
     val filesByType = splitFiles(files)
 
@@ -308,14 +308,14 @@ class ParquetFileFormat
     // Sets flags for `CatalystSchemaConverter`
     hadoopConf.setBoolean(
       SQLConf.PARQUET_BINARY_AS_STRING.key,
-      sparkSession.conf.get(SQLConf.PARQUET_BINARY_AS_STRING))
+      sparkSession.sessionState.conf.isParquetBinaryAsString)
     hadoopConf.setBoolean(
       SQLConf.PARQUET_INT96_AS_TIMESTAMP.key,
-      sparkSession.conf.get(SQLConf.PARQUET_INT96_AS_TIMESTAMP))
+      sparkSession.sessionState.conf.isParquetINT96AsTimestamp)
 
     // Try to push down filters when filter push-down is enabled.
     val pushed =
-      if (sparkSession.conf.get(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key).toBoolean) {
+      if (sparkSession.sessionState.conf.parquetFilterPushDown) {
         filters
           // Collects all converted Parquet filter predicates. Notice that not all predicates can be
           // converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`

http://git-wip-us.apache.org/repos/asf/spark/blob/a79838bd/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala
index 3eec582..6157318 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala
@@ -52,7 +52,7 @@ private[parquet] class ParquetOptions(
   val mergeSchema: Boolean = parameters
     .get(MERGE_SCHEMA)
     .map(_.toBoolean)
-    .getOrElse(sqlConf.getConf(SQLConf.PARQUET_SCHEMA_MERGING_ENABLED))
+    .getOrElse(sqlConf.isParquetSchemaMergingEnabled)
 }
 
 

http://git-wip-us.apache.org/repos/asf/spark/blob/a79838bd/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala
index 7520163..6f9f7c1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala
@@ -93,11 +93,11 @@ class FileStreamSinkLog(sparkSession: SparkSession, path: String)
    * a live lock may happen if the compaction happens too frequently: one processing keeps deleting
    * old files while another one keeps retrying. Setting a reasonable cleanup delay could avoid it.
    */
-  private val fileCleanupDelayMs = sparkSession.conf.get(SQLConf.FILE_SINK_LOG_CLEANUP_DELAY)
+  private val fileCleanupDelayMs = sparkSession.sessionState.conf.fileSinkLogCleanupDelay
 
-  private val isDeletingExpiredLog = sparkSession.conf.get(SQLConf.FILE_SINK_LOG_DELETION)
+  private val isDeletingExpiredLog = sparkSession.sessionState.conf.fileSinkLogDeletion
 
-  private val compactInterval = sparkSession.conf.get(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL)
+  private val compactInterval = sparkSession.sessionState.conf.fileSinkLogCompatInterval
   require(compactInterval > 0,
     s"Please set ${SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key} (was $compactInterval) " +
       "to a positive value.")

http://git-wip-us.apache.org/repos/asf/spark/blob/a79838bd/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 5e1e5ee..a1aae61 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -58,7 +58,7 @@ class StreamExecution(
 
   import org.apache.spark.sql.streaming.StreamingQueryListener._
 
-  private val pollingDelayMs = sparkSession.conf.get(SQLConf.STREAMING_POLLING_DELAY)
+  private val pollingDelayMs = sparkSession.sessionState.conf.streamingPollingDelay
 
   /**
    * A lock used to wait/notify when batches complete. Use a fair lock to avoid thread starvation.

http://git-wip-us.apache.org/repos/asf/spark/blob/a79838bd/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala
index e55f63a..de72f1c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala
@@ -24,11 +24,9 @@ private[streaming] class StateStoreConf(@transient private val conf: SQLConf) ex
 
   def this() = this(new SQLConf)
 
-  import SQLConf._
+  val minDeltasForSnapshot = conf.stateStoreMinDeltasForSnapshot
 
-  val minDeltasForSnapshot = conf.getConf(STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT)
-
-  val minVersionsToRetain = conf.getConf(STATE_STORE_MIN_VERSIONS_TO_RETAIN)
+  val minVersionsToRetain = conf.stateStoreMinVersionsToRetain
 }
 
 private[streaming] object StateStoreConf {

http://git-wip-us.apache.org/repos/asf/spark/blob/a79838bd/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 1d6ca5a..428032b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -338,11 +338,6 @@ object SQLConf {
       .intConf
       .createWithDefault(4000)
 
-  val PARTITION_DISCOVERY_ENABLED = SQLConfigBuilder("spark.sql.sources.partitionDiscovery.enabled")
-    .doc("When true, automatically discover data partitions.")
-    .booleanConf
-    .createWithDefault(true)
-
   val PARTITION_COLUMN_TYPE_INFERENCE =
     SQLConfigBuilder("spark.sql.sources.partitionColumnTypeInference.enabled")
       .doc("When true, automatically infer the data types for partitioned columns.")
@@ -391,8 +386,10 @@ object SQLConf {
 
   val PARALLEL_PARTITION_DISCOVERY_THRESHOLD =
     SQLConfigBuilder("spark.sql.sources.parallelPartitionDiscovery.threshold")
-      .doc("The degree of parallelism for schema merging and partition discovery of " +
-        "Parquet data sources.")
+      .doc("The maximum number of files allowed for listing files at driver side. If the number " +
+        "of detected files exceeds this value during partition discovery, it tries to list the " +
+        "files with another Spark distributed job. This applies to Parquet, ORC, CSV, JSON and " +
+        "LibSVM data sources.")
       .intConf
       .createWithDefault(32)
 
@@ -592,8 +589,24 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
 
   def optimizerInSetConversionThreshold: Int = getConf(OPTIMIZER_INSET_CONVERSION_THRESHOLD)
 
+  def stateStoreMinDeltasForSnapshot: Int = getConf(STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT)
+
+  def stateStoreMinVersionsToRetain: Int = getConf(STATE_STORE_MIN_VERSIONS_TO_RETAIN)
+
   def checkpointLocation: Option[String] = getConf(CHECKPOINT_LOCATION)
 
+  def isUnsupportedOperationCheckEnabled: Boolean = getConf(UNSUPPORTED_OPERATION_CHECK_ENABLED)
+
+  def fileSinkLogDeletion: Boolean = getConf(FILE_SINK_LOG_DELETION)
+
+  def fileSinkLogCompatInterval: Int = getConf(FILE_SINK_LOG_COMPACT_INTERVAL)
+
+  def fileSinkLogCleanupDelay: Long = getConf(FILE_SINK_LOG_CLEANUP_DELAY)
+
+  def streamingSchemaInference: Boolean = getConf(STREAMING_SCHEMA_INFERENCE)
+
+  def streamingPollingDelay: Long = getConf(STREAMING_POLLING_DELAY)
+
   def filesMaxPartitionBytes: Long = getConf(FILES_MAX_PARTITION_BYTES)
 
   def filesOpenCostInBytes: Long = getConf(FILES_OPEN_COST_IN_BYTES)
@@ -657,6 +670,12 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
 
   def defaultSizeInBytes: Long = getConf(DEFAULT_SIZE_IN_BYTES, Long.MaxValue)
 
+  def isParquetSchemaMergingEnabled: Boolean = getConf(PARQUET_SCHEMA_MERGING_ENABLED)
+
+  def isParquetSchemaRespectSummaries: Boolean = getConf(PARQUET_SCHEMA_RESPECT_SUMMARIES)
+
+  def parquetOutputCommitterClass: String = getConf(PARQUET_OUTPUT_COMMITTER_CLASS)
+
   def isParquetBinaryAsString: Boolean = getConf(PARQUET_BINARY_AS_STRING)
 
   def isParquetINT96AsTimestamp: Boolean = getConf(PARQUET_INT96_AS_TIMESTAMP)
@@ -673,12 +692,11 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
 
   def convertCTAS: Boolean = getConf(CONVERT_CTAS)
 
-  def partitionDiscoveryEnabled(): Boolean =
-    getConf(SQLConf.PARTITION_DISCOVERY_ENABLED)
-
-  def partitionColumnTypeInferenceEnabled(): Boolean =
+  def partitionColumnTypeInferenceEnabled: Boolean =
     getConf(SQLConf.PARTITION_COLUMN_TYPE_INFERENCE)
 
+  def partitionMaxFiles: Int = getConf(PARTITION_MAX_FILES)
+
   def parallelPartitionDiscoveryThreshold: Int =
     getConf(SQLConf.PARALLEL_PARTITION_DISCOVERY_THRESHOLD)
 
@@ -695,6 +713,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
 
   def dataFrameRetainGroupColumns: Boolean = getConf(DATAFRAME_RETAIN_GROUP_COLUMNS)
 
+  def dataFramePivotMaxValues: Int = getConf(DATAFRAME_PIVOT_MAX_VALUES)
+
   override def runSQLonFile: Boolean = getConf(RUN_SQL_ON_FILES)
 
   def enableTwoLevelAggMap: Boolean = getConf(ENABLE_TWOLEVEL_AGG_MAP)

http://git-wip-us.apache.org/repos/asf/spark/blob/a79838bd/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
index bae7f56..bba7bc7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
@@ -204,7 +204,7 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) {
       val checkpointLocation = userSpecifiedCheckpointLocation.map { userSpecified =>
         new Path(userSpecified).toUri.toString
       }.orElse {
-        df.sparkSession.conf.get(SQLConf.CHECKPOINT_LOCATION).map { location =>
+        df.sparkSession.sessionState.conf.checkpointLocation.map { location =>
           new Path(location, name).toUri.toString
         }
       }.getOrElse {
@@ -232,7 +232,7 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) {
       val analyzedPlan = df.queryExecution.analyzed
       df.queryExecution.assertAnalyzed()
 
-      if (sparkSession.conf.get(SQLConf.UNSUPPORTED_OPERATION_CHECK_ENABLED)) {
+      if (sparkSession.sessionState.conf.isUnsupportedOperationCheckEnabled) {
         UnsupportedOperationChecker.checkForStreaming(analyzedPlan, outputMode)
       }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org