You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Apache Spark (JIRA)" <ji...@apache.org> on 2015/06/08 09:09:00 UTC

[jira] [Commented] (SPARK-7792) HiveContext registerTempTable not thread safe

    [ https://issues.apache.org/jira/browse/SPARK-7792?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14576681#comment-14576681 ] 

Apache Spark commented on SPARK-7792:
-------------------------------------

User 'navis' has created a pull request for this issue:
https://github.com/apache/spark/pull/6699

> HiveContext registerTempTable not thread safe
> ---------------------------------------------
>
>                 Key: SPARK-7792
>                 URL: https://issues.apache.org/jira/browse/SPARK-7792
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 1.3.1
>            Reporter: Yana Kadiyska
>
> {code:java}
> public class ThreadRepro {
>     public static void main(String[] args) throws Exception{
>        new ThreadRepro().sparkPerfTest();
>     }
>     public void sparkPerfTest(){
>         final AtomicLong counter = new AtomicLong();
>         SparkConf conf = new SparkConf();
>         conf.setAppName("My Application");
>         conf.setMaster("local[7]");
>         SparkContext sc = new SparkContext(conf);
>         org.apache.spark.sql.hive.HiveContext hc = new org.apache.spark.sql.hive.HiveContext(sc);
>         int poolSize = 10;
>         ExecutorService pool = Executors.newFixedThreadPool(poolSize);
>         for (int i=0; i<poolSize;i++ )
>             pool.execute(new QueryJob(hc, i, counter));
>         pool.shutdown();
>         try {
>             pool.awaitTermination(60, TimeUnit.MINUTES);
>         }catch(Exception e){
>             System.out.println("Thread interrupted");
>         }
>         System.out.println("All jobs complete");
>         System.out.println(" Counter is "+counter.get());
>     }
> }
> class QueryJob implements Runnable{
>     String threadId;
>     org.apache.spark.sql.hive.HiveContext sqlContext;
>     String key;
>     AtomicLong counter;
>     final AtomicLong local_counter = new AtomicLong();
>     public QueryJob(org.apache.spark.sql.hive.HiveContext _sqlContext,int id,AtomicLong ctr){
>         threadId = "thread_"+id;
>         this.sqlContext= _sqlContext;
>         this.counter = ctr;
>     }
>     public void run() {
>         for (int i = 0; i < 100; i++) {
>             String tblName = threadId +"_"+i;
>             DataFrame df = sqlContext.emptyDataFrame();
>             df.registerTempTable(tblName);
>             String _query = String.format("select count(*) from %s",tblName);
>             System.out.println(String.format(" registered table %s; catalog (%s) ",tblName,debugTables()));
>             List<Row> res;
>             try {
>                 res = sqlContext.sql(_query).collectAsList();
>             }catch (Exception e){
>                 System.out.println("*Exception "+ debugTables() +"**");
>                 throw e;
>             }
>             sqlContext.dropTempTable(tblName);
>             System.out.println(" dropped table "+tblName);
>             try {
>                 Thread.sleep(3000);//lets make this a not-so-tight loop
>             }catch(Exception e){
>                 System.out.println("Thread interrupted");
>             }
>         }
>     }
>     private String debugTables(){
>         String v = Joiner.on(',').join(sqlContext.tableNames());
>         if (v==null)return ""; else return v;
>     }
> }
> {code}
> this will periodically produce the following:
> {quote}
>  registered table thread_0_50; catalog (thread_1_50)
>  registered table thread_4_50; catalog (thread_4_50,thread_1_50)
>  registered table thread_1_50; catalog (thread_1_50)
>  dropped table thread_1_50
>  dropped table thread_4_50
> *Exception **
> Exception in thread "pool-6-thread-1" java.lang.Error: org.apache.spark.sql.AnalysisException: no such table thread_0_50; line 1 pos 21
>   at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1151)
>   at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.spark.sql.AnalysisException: no such table thread_0_50; line 1 pos 21
>   at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
>   at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.getTable(Analyzer.scala:177)
>   at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$6.applyOrElse(Analyzer.scala:186)
>   at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$6.applyOrElse(Analyzer.scala:181)
>   at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:188)
>   at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:188)
>   at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:51)
>   at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:187)
>   at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:208)
>   at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>   at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
>   at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
>   at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
>   at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
>   at scala.collection.AbstractIterator.to(Iterator.scala:1157)
>   at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
>   at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
>   at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
>   at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
>   at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildrenDown(TreeNode.scala:238)
>   at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:193)
>   at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:178)
>   at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:181)
>   at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:171)
>   at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:61)
>   at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:59)
>   at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:111)
>   at scala.collection.immutable.List.foldLeft(List.scala:84)
>   at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1.apply(RuleExecutor.scala:59)
>   at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1.apply(RuleExecutor.scala:51)
>   at scala.collection.immutable.List.foreach(List.scala:318)
>   at org.apache.spark.sql.catalyst.rules.RuleExecutor.apply(RuleExecutor.scala:51)
>   at org.apache.spark.sql.SQLContext$QueryExecution.analyzed$lzycompute(SQLContext.scala:1082)
>   at org.apache.spark.sql.SQLContext$QueryExecution.analyzed(SQLContext.scala:1082)
>   at org.apache.spark.sql.SQLContext$QueryExecution.assertAnalyzed(SQLContext.scala:1080)
>   at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:133)
>   at org.apache.spark.sql.DataFrame$.apply(DataFrame.scala:51)
>   at org.apache.spark.sql.hive.HiveContext.sql(HiveContext.scala:101)
>   at test.unit.QueryJob.run(ThreadRepro.java:93)
>   at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> {quote}
> Line 93 is the .sql call...



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org