You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2016/09/21 05:36:28 UTC

spark git commit: [SPARK-15698][SQL][STREAMING][FOLLW-UP] Fix FileStream source and sink log get configuration issue

Repository: spark
Updated Branches:
  refs/heads/master 1ea49916a -> e48ebc4e4


[SPARK-15698][SQL][STREAMING][FOLLW-UP] Fix FileStream source and sink log get configuration issue

## What changes were proposed in this pull request?

This issue was introduced in the previous commit of SPARK-15698. Mistakenly change the way to get configuration back to original one, so here with the follow up PR to revert them up.

## How was this patch tested?

N/A

Ping zsxwing , please review again, sorry to bring the inconvenience. Thanks a lot.

Author: jerryshao <ss...@hortonworks.com>

Closes #15173 from jerryshao/SPARK-15698-follow.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e48ebc4e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e48ebc4e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e48ebc4e

Branch: refs/heads/master
Commit: e48ebc4e403ca3a0e580b47aadffe9fbfcf3c655
Parents: 1ea4991
Author: jerryshao <ss...@hortonworks.com>
Authored: Tue Sep 20 22:36:24 2016 -0700
Committer: Shixiong Zhu <sh...@databricks.com>
Committed: Tue Sep 20 22:36:24 2016 -0700

----------------------------------------------------------------------
 .../spark/sql/execution/streaming/FileStreamSinkLog.scala   | 9 +++------
 .../spark/sql/execution/streaming/FileStreamSourceLog.scala | 7 +++----
 .../main/scala/org/apache/spark/sql/internal/SQLConf.scala  | 8 +++++++-
 3 files changed, 13 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e48ebc4e/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala
index 64f2f00..f9e2416 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala
@@ -84,14 +84,11 @@ class FileStreamSinkLog(
 
   private implicit val formats = Serialization.formats(NoTypeHints)
 
-  protected override val fileCleanupDelayMs =
-    sparkSession.conf.get(SQLConf.FILE_SINK_LOG_CLEANUP_DELAY)
+  protected override val fileCleanupDelayMs = sparkSession.sessionState.conf.fileSinkLogCleanupDelay
 
-  protected override val isDeletingExpiredLog =
-    sparkSession.conf.get(SQLConf.FILE_SINK_LOG_DELETION)
+  protected override val isDeletingExpiredLog = sparkSession.sessionState.conf.fileSinkLogDeletion
 
-  protected override val compactInterval =
-    sparkSession.conf.get(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL)
+  protected override val compactInterval = sparkSession.sessionState.conf.fileSinkLogCompactInterval
   require(compactInterval > 0,
     s"Please set ${SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key} (was $compactInterval) " +
       "to a positive value.")

http://git-wip-us.apache.org/repos/asf/spark/blob/e48ebc4e/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala
index 8103309..4681f2b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala
@@ -39,16 +39,15 @@ class FileStreamSourceLog(
 
   // Configurations about metadata compaction
   protected override val compactInterval =
-  sparkSession.conf.get(SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL)
+    sparkSession.sessionState.conf.fileSourceLogCompactInterval
   require(compactInterval > 0,
     s"Please set ${SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL.key} (was $compactInterval) to a " +
       s"positive value.")
 
   protected override val fileCleanupDelayMs =
-    sparkSession.conf.get(SQLConf.FILE_SOURCE_LOG_CLEANUP_DELAY)
+    sparkSession.sessionState.conf.fileSourceLogCleanupDelay
 
-  protected override val isDeletingExpiredLog =
-    sparkSession.conf.get(SQLConf.FILE_SOURCE_LOG_DELETION)
+  protected override val isDeletingExpiredLog = sparkSession.sessionState.conf.fileSourceLogDeletion
 
   private implicit val formats = Serialization.formats(NoTypeHints)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/e48ebc4e/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index f8b7a7f..e67140f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -620,10 +620,16 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
 
   def fileSinkLogDeletion: Boolean = getConf(FILE_SINK_LOG_DELETION)
 
-  def fileSinkLogCompatInterval: Int = getConf(FILE_SINK_LOG_COMPACT_INTERVAL)
+  def fileSinkLogCompactInterval: Int = getConf(FILE_SINK_LOG_COMPACT_INTERVAL)
 
   def fileSinkLogCleanupDelay: Long = getConf(FILE_SINK_LOG_CLEANUP_DELAY)
 
+  def fileSourceLogDeletion: Boolean = getConf(FILE_SOURCE_LOG_DELETION)
+
+  def fileSourceLogCompactInterval: Int = getConf(FILE_SOURCE_LOG_COMPACT_INTERVAL)
+
+  def fileSourceLogCleanupDelay: Long = getConf(FILE_SOURCE_LOG_CLEANUP_DELAY)
+
   def streamingSchemaInference: Boolean = getConf(STREAMING_SCHEMA_INFERENCE)
 
   def streamingPollingDelay: Long = getConf(STREAMING_POLLING_DELAY)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org