You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2015/06/10 04:33:22 UTC

spark git commit: [SPARK-7792] [SQL] HiveContext registerTempTable not thread safe

Repository: spark
Updated Branches:
  refs/heads/master 6e4fb0c9e -> 778f3ca81


[SPARK-7792] [SQL] HiveContext registerTempTable not thread safe

Just replaced mutable.HashMap to ConcurrentHashMap

Author: navis.ryu <na...@apache.org>

Closes #6699 from navis/SPARK-7792 and squashes the following commits:

f03654a [navis.ryu] [SPARK-7792] [SQL] HiveContext registerTempTable not thread safe


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/778f3ca8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/778f3ca8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/778f3ca8

Branch: refs/heads/master
Commit: 778f3ca81f8d90faec0775509632fe68f1399dc4
Parents: 6e4fb0c
Author: navis.ryu <na...@apache.org>
Authored: Tue Jun 9 19:33:00 2015 -0700
Committer: Reynold Xin <rx...@databricks.com>
Committed: Tue Jun 9 19:33:00 2015 -0700

----------------------------------------------------------------------
 .../spark/sql/catalyst/analysis/Catalog.scala   | 28 ++++++++++++--------
 1 file changed, 17 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/778f3ca8/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala
index 3e240fd..1541491 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala
@@ -17,7 +17,11 @@
 
 package org.apache.spark.sql.catalyst.analysis
 
+import java.util.concurrent.ConcurrentHashMap
+
+import scala.collection.JavaConversions._
 import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
 
 import org.apache.spark.sql.catalyst.CatalystConf
 import org.apache.spark.sql.catalyst.EmptyConf
@@ -81,18 +85,18 @@ trait Catalog {
 }
 
 class SimpleCatalog(val conf: CatalystConf) extends Catalog {
-  val tables = new mutable.HashMap[String, LogicalPlan]()
+  val tables = new ConcurrentHashMap[String, LogicalPlan]
 
   override def registerTable(
       tableIdentifier: Seq[String],
       plan: LogicalPlan): Unit = {
     val tableIdent = processTableIdentifier(tableIdentifier)
-    tables += ((getDbTableName(tableIdent), plan))
+    tables.put(getDbTableName(tableIdent), plan)
   }
 
   override def unregisterTable(tableIdentifier: Seq[String]): Unit = {
     val tableIdent = processTableIdentifier(tableIdentifier)
-    tables -= getDbTableName(tableIdent)
+    tables.remove(getDbTableName(tableIdent))
   }
 
   override def unregisterAllTables(): Unit = {
@@ -101,10 +105,7 @@ class SimpleCatalog(val conf: CatalystConf) extends Catalog {
 
   override def tableExists(tableIdentifier: Seq[String]): Boolean = {
     val tableIdent = processTableIdentifier(tableIdentifier)
-    tables.get(getDbTableName(tableIdent)) match {
-      case Some(_) => true
-      case None => false
-    }
+    tables.containsKey(getDbTableName(tableIdent))
   }
 
   override def lookupRelation(
@@ -112,7 +113,10 @@ class SimpleCatalog(val conf: CatalystConf) extends Catalog {
       alias: Option[String] = None): LogicalPlan = {
     val tableIdent = processTableIdentifier(tableIdentifier)
     val tableFullName = getDbTableName(tableIdent)
-    val table = tables.getOrElse(tableFullName, sys.error(s"Table Not Found: $tableFullName"))
+    val table = tables.get(tableFullName)
+    if (table == null) {
+      sys.error(s"Table Not Found: $tableFullName")
+    }
     val tableWithQualifiers = Subquery(tableIdent.last, table)
 
     // If an alias was specified by the lookup, wrap the plan in a subquery so that attributes are
@@ -121,9 +125,11 @@ class SimpleCatalog(val conf: CatalystConf) extends Catalog {
   }
 
   override def getTables(databaseName: Option[String]): Seq[(String, Boolean)] = {
-    tables.map {
-      case (name, _) => (name, true)
-    }.toSeq
+    val result = ArrayBuffer.empty[(String, Boolean)]
+    for (name <- tables.keySet()) {
+      result += ((name, true))
+    }
+    result
   }
 
   override def refreshTable(databaseName: String, tableName: String): Unit = {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org