You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2016/12/22 07:30:12 UTC

spark git commit: [SQL] Minor readability improvement for partition handling code

Repository: spark
Updated Branches:
  refs/heads/master ff7d82a20 -> 7c5b7b3a2


[SQL] Minor readability improvement for partition handling code

## What changes were proposed in this pull request?
This patch includes minor changes to improve readability for partition handling code. I'm in the middle of implementing some new feature and found some naming / implicit type inference not as intuitive.

## How was this patch tested?
This patch should have no semantic change and the changes should be covered by existing test cases.

Author: Reynold Xin <rx...@databricks.com>

Closes #16378 from rxin/minor-fix.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7c5b7b3a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7c5b7b3a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7c5b7b3a

Branch: refs/heads/master
Commit: 7c5b7b3a2e5a7c1b2d0d8ce655840cad581e47ac
Parents: ff7d82a
Author: Reynold Xin <rx...@databricks.com>
Authored: Thu Dec 22 15:29:56 2016 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Thu Dec 22 15:29:56 2016 +0800

----------------------------------------------------------------------
 .../sql/execution/DataSourceScanExec.scala      |  7 +-
 .../datasources/CatalogFileIndex.scala          | 11 +--
 .../sql/execution/datasources/FileFormat.scala  |  3 +-
 .../execution/datasources/FileStatusCache.scala | 72 ++++++++++----------
 4 files changed, 49 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/7c5b7b3a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
index e485b52..7616164 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
@@ -136,7 +136,7 @@ case class RowDataSourceScanExec(
  * @param outputSchema Output schema of the scan.
  * @param partitionFilters Predicates to use for partition pruning.
  * @param dataFilters Data source filters to use for filtering data within partitions.
- * @param metastoreTableIdentifier
+ * @param metastoreTableIdentifier identifier for the table in the metastore.
  */
 case class FileSourceScanExec(
     @transient relation: HadoopFsRelation,
@@ -147,10 +147,10 @@ case class FileSourceScanExec(
     override val metastoreTableIdentifier: Option[TableIdentifier])
   extends DataSourceScanExec {
 
-  val supportsBatch = relation.fileFormat.supportBatch(
+  val supportsBatch: Boolean = relation.fileFormat.supportBatch(
     relation.sparkSession, StructType.fromAttributes(output))
 
-  val needsUnsafeRowConversion = if (relation.fileFormat.isInstanceOf[ParquetSource]) {
+  val needsUnsafeRowConversion: Boolean = if (relation.fileFormat.isInstanceOf[ParquetSource]) {
     SparkSession.getActiveSession.get.sessionState.conf.parquetVectorizedReaderEnabled
   } else {
     false
@@ -516,7 +516,6 @@ case class FileSourceScanExec(
     }
 
     // Assign files to partitions using "First Fit Decreasing" (FFD)
-    // TODO: consider adding a slop factor here?
     splitFiles.foreach { file =>
       if (currentSize + file.length > maxSplitBytes) {
         closePartition()

http://git-wip-us.apache.org/repos/asf/spark/blob/7c5b7b3a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala
index 4ad91dc..1235a4b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.execution.datasources
 
+import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 
 import org.apache.spark.sql.SparkSession
@@ -37,14 +38,15 @@ class CatalogFileIndex(
     val table: CatalogTable,
     override val sizeInBytes: Long) extends FileIndex {
 
-  protected val hadoopConf = sparkSession.sessionState.newHadoopConf
+  protected val hadoopConf: Configuration = sparkSession.sessionState.newHadoopConf()
 
-  private val fileStatusCache = FileStatusCache.newCache(sparkSession)
+  /** Globally shared (not exclusive to this table) cache for file statuses to speed up listing. */
+  private val fileStatusCache = FileStatusCache.getOrCreate(sparkSession)
 
   assert(table.identifier.database.isDefined,
     "The table identifier must be qualified in CatalogFileIndex")
 
-  private val baseLocation = table.storage.locationUri
+  private val baseLocation: Option[String] = table.storage.locationUri
 
   override def partitionSchema: StructType = table.partitionSchema
 
@@ -76,7 +78,8 @@ class CatalogFileIndex(
       new PrunedInMemoryFileIndex(
         sparkSession, new Path(baseLocation.get), fileStatusCache, partitionSpec)
     } else {
-      new InMemoryFileIndex(sparkSession, rootPaths, table.storage.properties, None)
+      new InMemoryFileIndex(
+        sparkSession, rootPaths, table.storage.properties, partitionSchema = None)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/7c5b7b3a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala
index 4f4aaaa..6784ee2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala
@@ -148,7 +148,8 @@ trait FileFormat {
  * The base class file format that is based on text file.
  */
 abstract class TextBasedFileFormat extends FileFormat {
-  private var codecFactory: CompressionCodecFactory = null
+  private var codecFactory: CompressionCodecFactory = _
+
   override def isSplitable(
       sparkSession: SparkSession,
       options: Map[String, String],

http://git-wip-us.apache.org/repos/asf/spark/blob/7c5b7b3a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala
index 7c2e6fd..5d97558 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala
@@ -17,7 +17,6 @@
 
 package org.apache.spark.sql.execution.datasources
 
-import java.util.concurrent.ConcurrentHashMap
 import java.util.concurrent.atomic.AtomicBoolean
 
 import scala.collection.JavaConverters._
@@ -26,9 +25,38 @@ import com.google.common.cache._
 import org.apache.hadoop.fs.{FileStatus, Path}
 
 import org.apache.spark.internal.Logging
-import org.apache.spark.metrics.source.HiveCatalogMetrics
 import org.apache.spark.sql.SparkSession
-import org.apache.spark.util.{SerializableConfiguration, SizeEstimator}
+import org.apache.spark.util.SizeEstimator
+
+
+/**
+ * Use [[FileStatusCache.getOrCreate()]] to construct a globally shared file status cache.
+ */
+object FileStatusCache {
+  private var sharedCache: SharedInMemoryCache = _
+
+  /**
+   * @return a new FileStatusCache based on session configuration. Cache memory quota is
+   *         shared across all clients.
+   */
+  def getOrCreate(session: SparkSession): FileStatusCache = synchronized {
+    if (session.sqlContext.conf.manageFilesourcePartitions &&
+      session.sqlContext.conf.filesourcePartitionFileCacheSize > 0) {
+      if (sharedCache == null) {
+        sharedCache = new SharedInMemoryCache(
+          session.sqlContext.conf.filesourcePartitionFileCacheSize)
+      }
+      sharedCache.createForNewClient()
+    } else {
+      NoopCache
+    }
+  }
+
+  def resetForTesting(): Unit = synchronized {
+    sharedCache = null
+  }
+}
+
 
 /**
  * A cache of the leaf files of partition directories. We cache these files in order to speed
@@ -55,32 +83,6 @@ abstract class FileStatusCache {
   def invalidateAll(): Unit
 }
 
-object FileStatusCache {
-  private var sharedCache: SharedInMemoryCache = null
-
-  /**
-   * @return a new FileStatusCache based on session configuration. Cache memory quota is
-   *         shared across all clients.
-   */
-  def newCache(session: SparkSession): FileStatusCache = {
-    synchronized {
-      if (session.sqlContext.conf.manageFilesourcePartitions &&
-          session.sqlContext.conf.filesourcePartitionFileCacheSize > 0) {
-        if (sharedCache == null) {
-          sharedCache = new SharedInMemoryCache(
-            session.sqlContext.conf.filesourcePartitionFileCacheSize)
-        }
-        sharedCache.getForNewClient()
-      } else {
-        NoopCache
-      }
-    }
-  }
-
-  def resetForTesting(): Unit = synchronized {
-    sharedCache = null
-  }
-}
 
 /**
  * An implementation that caches partition file statuses in memory.
@@ -88,7 +90,6 @@ object FileStatusCache {
  * @param maxSizeInBytes max allowable cache size before entries start getting evicted
  */
 private class SharedInMemoryCache(maxSizeInBytes: Long) extends Logging {
-  import FileStatusCache._
 
   // Opaque object that uniquely identifies a shared cache user
   private type ClientId = Object
@@ -102,8 +103,9 @@ private class SharedInMemoryCache(maxSizeInBytes: Long) extends Logging {
         (SizeEstimator.estimate(key) + SizeEstimator.estimate(value)).toInt
       }})
     .removalListener(new RemovalListener[(ClientId, Path), Array[FileStatus]]() {
-      override def onRemoval(removed: RemovalNotification[(ClientId, Path), Array[FileStatus]]) = {
-        if (removed.getCause() == RemovalCause.SIZE &&
+      override def onRemoval(removed: RemovalNotification[(ClientId, Path), Array[FileStatus]])
+        : Unit = {
+        if (removed.getCause == RemovalCause.SIZE &&
             warnedAboutEviction.compareAndSet(false, true)) {
           logWarning(
             "Evicting cached table partition metadata from memory due to size constraints " +
@@ -112,13 +114,13 @@ private class SharedInMemoryCache(maxSizeInBytes: Long) extends Logging {
         }
       }})
     .maximumWeight(maxSizeInBytes)
-    .build()
+    .build[(ClientId, Path), Array[FileStatus]]()
 
   /**
    * @return a FileStatusCache that does not share any entries with any other client, but does
    *         share memory resources for the purpose of cache eviction.
    */
-  def getForNewClient(): FileStatusCache = new FileStatusCache {
+  def createForNewClient(): FileStatusCache = new FileStatusCache {
     val clientId = new Object()
 
     override def getLeafFiles(path: Path): Option[Array[FileStatus]] = {
@@ -126,7 +128,7 @@ private class SharedInMemoryCache(maxSizeInBytes: Long) extends Logging {
     }
 
     override def putLeafFiles(path: Path, leafFiles: Array[FileStatus]): Unit = {
-      cache.put((clientId, path), leafFiles.toArray)
+      cache.put((clientId, path), leafFiles)
     }
 
     override def invalidateAll(): Unit = {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org