You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Apache Spark (JIRA)" <ji...@apache.org> on 2015/06/08 09:09:00 UTC
[jira] [Assigned] (SPARK-7792) HiveContext registerTempTable not
thread safe
[ https://issues.apache.org/jira/browse/SPARK-7792?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Apache Spark reassigned SPARK-7792:
-----------------------------------
Assignee: (was: Apache Spark)
> HiveContext registerTempTable not thread safe
> ---------------------------------------------
>
> Key: SPARK-7792
> URL: https://issues.apache.org/jira/browse/SPARK-7792
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 1.3.1
> Reporter: Yana Kadiyska
>
> {code:java}
> public class ThreadRepro {
> public static void main(String[] args) throws Exception{
> new ThreadRepro().sparkPerfTest();
> }
> public void sparkPerfTest(){
> final AtomicLong counter = new AtomicLong();
> SparkConf conf = new SparkConf();
> conf.setAppName("My Application");
> conf.setMaster("local[7]");
> SparkContext sc = new SparkContext(conf);
> org.apache.spark.sql.hive.HiveContext hc = new org.apache.spark.sql.hive.HiveContext(sc);
> int poolSize = 10;
> ExecutorService pool = Executors.newFixedThreadPool(poolSize);
> for (int i=0; i<poolSize;i++ )
> pool.execute(new QueryJob(hc, i, counter));
> pool.shutdown();
> try {
> pool.awaitTermination(60, TimeUnit.MINUTES);
> }catch(Exception e){
> System.out.println("Thread interrupted");
> }
> System.out.println("All jobs complete");
> System.out.println(" Counter is "+counter.get());
> }
> }
> class QueryJob implements Runnable{
> String threadId;
> org.apache.spark.sql.hive.HiveContext sqlContext;
> String key;
> AtomicLong counter;
> final AtomicLong local_counter = new AtomicLong();
> public QueryJob(org.apache.spark.sql.hive.HiveContext _sqlContext,int id,AtomicLong ctr){
> threadId = "thread_"+id;
> this.sqlContext= _sqlContext;
> this.counter = ctr;
> }
> public void run() {
> for (int i = 0; i < 100; i++) {
> String tblName = threadId +"_"+i;
> DataFrame df = sqlContext.emptyDataFrame();
> df.registerTempTable(tblName);
> String _query = String.format("select count(*) from %s",tblName);
> System.out.println(String.format(" registered table %s; catalog (%s) ",tblName,debugTables()));
> List<Row> res;
> try {
> res = sqlContext.sql(_query).collectAsList();
> }catch (Exception e){
> System.out.println("*Exception "+ debugTables() +"**");
> throw e;
> }
> sqlContext.dropTempTable(tblName);
> System.out.println(" dropped table "+tblName);
> try {
> Thread.sleep(3000);//lets make this a not-so-tight loop
> }catch(Exception e){
> System.out.println("Thread interrupted");
> }
> }
> }
> private String debugTables(){
> String v = Joiner.on(',').join(sqlContext.tableNames());
> if (v==null)return ""; else return v;
> }
> }
> {code}
> this will periodically produce the following:
> {quote}
> registered table thread_0_50; catalog (thread_1_50)
> registered table thread_4_50; catalog (thread_4_50,thread_1_50)
> registered table thread_1_50; catalog (thread_1_50)
> dropped table thread_1_50
> dropped table thread_4_50
> *Exception **
> Exception in thread "pool-6-thread-1" java.lang.Error: org.apache.spark.sql.AnalysisException: no such table thread_0_50; line 1 pos 21
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1151)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.spark.sql.AnalysisException: no such table thread_0_50; line 1 pos 21
> at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
> at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.getTable(Analyzer.scala:177)
> at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$6.applyOrElse(Analyzer.scala:186)
> at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$6.applyOrElse(Analyzer.scala:181)
> at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:188)
> at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:188)
> at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:51)
> at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:187)
> at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:208)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
> at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
> at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
> at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
> at scala.collection.AbstractIterator.to(Iterator.scala:1157)
> at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
> at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
> at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
> at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
> at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildrenDown(TreeNode.scala:238)
> at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:193)
> at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:178)
> at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:181)
> at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:171)
> at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:61)
> at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:59)
> at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:111)
> at scala.collection.immutable.List.foldLeft(List.scala:84)
> at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1.apply(RuleExecutor.scala:59)
> at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1.apply(RuleExecutor.scala:51)
> at scala.collection.immutable.List.foreach(List.scala:318)
> at org.apache.spark.sql.catalyst.rules.RuleExecutor.apply(RuleExecutor.scala:51)
> at org.apache.spark.sql.SQLContext$QueryExecution.analyzed$lzycompute(SQLContext.scala:1082)
> at org.apache.spark.sql.SQLContext$QueryExecution.analyzed(SQLContext.scala:1082)
> at org.apache.spark.sql.SQLContext$QueryExecution.assertAnalyzed(SQLContext.scala:1080)
> at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:133)
> at org.apache.spark.sql.DataFrame$.apply(DataFrame.scala:51)
> at org.apache.spark.sql.hive.HiveContext.sql(HiveContext.scala:101)
> at test.unit.QueryJob.run(ThreadRepro.java:93)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> {quote}
> Line 93 is the .sql call...
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org