You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2015/06/10 04:33:22 UTC
spark git commit: [SPARK-7792] [SQL] HiveContext registerTempTable
not thread safe
Repository: spark
Updated Branches:
refs/heads/master 6e4fb0c9e -> 778f3ca81
[SPARK-7792] [SQL] HiveContext registerTempTable not thread safe
Just replaced mutable.HashMap to ConcurrentHashMap
Author: navis.ryu <na...@apache.org>
Closes #6699 from navis/SPARK-7792 and squashes the following commits:
f03654a [navis.ryu] [SPARK-7792] [SQL] HiveContext registerTempTable not thread safe
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/778f3ca8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/778f3ca8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/778f3ca8
Branch: refs/heads/master
Commit: 778f3ca81f8d90faec0775509632fe68f1399dc4
Parents: 6e4fb0c
Author: navis.ryu <na...@apache.org>
Authored: Tue Jun 9 19:33:00 2015 -0700
Committer: Reynold Xin <rx...@databricks.com>
Committed: Tue Jun 9 19:33:00 2015 -0700
----------------------------------------------------------------------
.../spark/sql/catalyst/analysis/Catalog.scala | 28 ++++++++++++--------
1 file changed, 17 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/778f3ca8/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala
index 3e240fd..1541491 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala
@@ -17,7 +17,11 @@
package org.apache.spark.sql.catalyst.analysis
+import java.util.concurrent.ConcurrentHashMap
+
+import scala.collection.JavaConversions._
import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
import org.apache.spark.sql.catalyst.CatalystConf
import org.apache.spark.sql.catalyst.EmptyConf
@@ -81,18 +85,18 @@ trait Catalog {
}
class SimpleCatalog(val conf: CatalystConf) extends Catalog {
- val tables = new mutable.HashMap[String, LogicalPlan]()
+ val tables = new ConcurrentHashMap[String, LogicalPlan]
override def registerTable(
tableIdentifier: Seq[String],
plan: LogicalPlan): Unit = {
val tableIdent = processTableIdentifier(tableIdentifier)
- tables += ((getDbTableName(tableIdent), plan))
+ tables.put(getDbTableName(tableIdent), plan)
}
override def unregisterTable(tableIdentifier: Seq[String]): Unit = {
val tableIdent = processTableIdentifier(tableIdentifier)
- tables -= getDbTableName(tableIdent)
+ tables.remove(getDbTableName(tableIdent))
}
override def unregisterAllTables(): Unit = {
@@ -101,10 +105,7 @@ class SimpleCatalog(val conf: CatalystConf) extends Catalog {
override def tableExists(tableIdentifier: Seq[String]): Boolean = {
val tableIdent = processTableIdentifier(tableIdentifier)
- tables.get(getDbTableName(tableIdent)) match {
- case Some(_) => true
- case None => false
- }
+ tables.containsKey(getDbTableName(tableIdent))
}
override def lookupRelation(
@@ -112,7 +113,10 @@ class SimpleCatalog(val conf: CatalystConf) extends Catalog {
alias: Option[String] = None): LogicalPlan = {
val tableIdent = processTableIdentifier(tableIdentifier)
val tableFullName = getDbTableName(tableIdent)
- val table = tables.getOrElse(tableFullName, sys.error(s"Table Not Found: $tableFullName"))
+ val table = tables.get(tableFullName)
+ if (table == null) {
+ sys.error(s"Table Not Found: $tableFullName")
+ }
val tableWithQualifiers = Subquery(tableIdent.last, table)
// If an alias was specified by the lookup, wrap the plan in a subquery so that attributes are
@@ -121,9 +125,11 @@ class SimpleCatalog(val conf: CatalystConf) extends Catalog {
}
override def getTables(databaseName: Option[String]): Seq[(String, Boolean)] = {
- tables.map {
- case (name, _) => (name, true)
- }.toSeq
+ val result = ArrayBuffer.empty[(String, Boolean)]
+ for (name <- tables.keySet()) {
+ result += ((name, true))
+ }
+ result
}
override def refreshTable(databaseName: String, tableName: String): Unit = {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org