You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Yana Kadiyska (JIRA)" <ji...@apache.org> on 2015/05/21 18:07:17 UTC
[jira] [Created] (SPARK-7792) HiveContext registerTempTable not
thread safe
Yana Kadiyska created SPARK-7792:
------------------------------------
Summary: HiveContext registerTempTable not thread safe
Key: SPARK-7792
URL: https://issues.apache.org/jira/browse/SPARK-7792
Project: Spark
Issue Type: Bug
Components: SQL
Affects Versions: 1.3.1
Reporter: Yana Kadiyska
{quote}
public class ThreadRepro {
public static void main(String[] args) throws Exception{
new ThreadRepro().sparkPerfTest();
}
public void sparkPerfTest(){
final AtomicLong counter = new AtomicLong();
SparkConf conf = new SparkConf();
conf.setAppName("My Application");
conf.setMaster("local[7]");
SparkContext sc = new SparkContext(conf);
org.apache.spark.sql.hive.HiveContext hc = new org.apache.spark.sql.hive.HiveContext(sc);
int poolSize = 10;
ExecutorService pool = Executors.newFixedThreadPool(poolSize);
for (int i=0; i<poolSize;i++ )
pool.execute(new QueryJob(hc, i, counter));
pool.shutdown();
try {
pool.awaitTermination(60, TimeUnit.MINUTES);
}catch(Exception e){
System.out.println("Thread interrupted");
}
System.out.println("All jobs complete");
System.out.println(" Counter is "+counter.get());
}
}
class QueryJob implements Runnable{
String threadId;
org.apache.spark.sql.hive.HiveContext sqlContext;
String key;
AtomicLong counter;
final AtomicLong local_counter = new AtomicLong();
public QueryJob(org.apache.spark.sql.hive.HiveContext _sqlContext,int id,AtomicLong ctr){
threadId = "thread_"+id;
this.sqlContext= _sqlContext;
this.counter = ctr;
}
public void run() {
for (int i = 0; i < 100; i++) {
String tblName = threadId +"_"+i;
DataFrame df = sqlContext.emptyDataFrame();
df.registerTempTable(tblName);
String _query = String.format("select count(*) from %s",tblName);
System.out.println(String.format(" registered table %s; catalog (%s) ",tblName,debugTables()));
List<Row> res;
try {
res = sqlContext.sql(_query).collectAsList();
}catch (Exception e){
System.out.println("*Exception "+ debugTables() +"**");
throw e;
}
sqlContext.dropTempTable(tblName);
System.out.println(" dropped table "+tblName);
try {
Thread.sleep(3000);//lets make this a not-so-tight loop
}catch(Exception e){
System.out.println("Thread interrupted");
}
}
}
private String debugTables(){
String v = Joiner.on(',').join(sqlContext.tableNames());
if (v==null)return ""; else return v;
}
}
{quote}
this will periodically produce the following:
{quote}
registered table thread_0_50; catalog (thread_1_50)
registered table thread_4_50; catalog (thread_4_50,thread_1_50)
registered table thread_1_50; catalog (thread_1_50)
dropped table thread_1_50
dropped table thread_4_50
*Exception **
Exception in thread "pool-6-thread-1" java.lang.Error: org.apache.spark.sql.AnalysisException: no such table thread_0_50; line 1 pos 21
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1151)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.sql.AnalysisException: no such table thread_0_50; line 1 pos 21
at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.getTable(Analyzer.scala:177)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$6.applyOrElse(Analyzer.scala:186)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$6.applyOrElse(Analyzer.scala:181)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:188)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:188)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:51)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:187)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:208)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildrenDown(TreeNode.scala:238)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:193)
at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:178)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:181)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:171)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:61)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:59)
at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:111)
at scala.collection.immutable.List.foldLeft(List.scala:84)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1.apply(RuleExecutor.scala:59)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1.apply(RuleExecutor.scala:51)
at scala.collection.immutable.List.foreach(List.scala:318)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.apply(RuleExecutor.scala:51)
at org.apache.spark.sql.SQLContext$QueryExecution.analyzed$lzycompute(SQLContext.scala:1082)
at org.apache.spark.sql.SQLContext$QueryExecution.analyzed(SQLContext.scala:1082)
at org.apache.spark.sql.SQLContext$QueryExecution.assertAnalyzed(SQLContext.scala:1080)
at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:133)
at org.apache.spark.sql.DataFrame$.apply(DataFrame.scala:51)
at org.apache.spark.sql.hive.HiveContext.sql(HiveContext.scala:101)
at test.unit.QueryJob.run(ThreadRepro.java:93)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
{quote}
Line 93 is the .sql call...
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org