You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Yana Kadiyska (JIRA)" <ji...@apache.org> on 2015/05/21 18:07:17 UTC

[jira] [Created] (SPARK-7792) HiveContext registerTempTable not thread safe

Yana Kadiyska created SPARK-7792:
------------------------------------

             Summary: HiveContext registerTempTable not thread safe
                 Key: SPARK-7792
                 URL: https://issues.apache.org/jira/browse/SPARK-7792
             Project: Spark
          Issue Type: Bug
          Components: SQL
    Affects Versions: 1.3.1
            Reporter: Yana Kadiyska


{quote}
public class ThreadRepro {
    public static void main(String[] args) throws Exception{
       new ThreadRepro().sparkPerfTest();
    }

    public void sparkPerfTest(){

        final AtomicLong counter = new AtomicLong();
        SparkConf conf = new SparkConf();
        conf.setAppName("My Application");
        conf.setMaster("local[7]");
        SparkContext sc = new SparkContext(conf);

        org.apache.spark.sql.hive.HiveContext hc = new org.apache.spark.sql.hive.HiveContext(sc);
        int poolSize = 10;
        ExecutorService pool = Executors.newFixedThreadPool(poolSize);
        for (int i=0; i<poolSize;i++ )
            pool.execute(new QueryJob(hc, i, counter));

        pool.shutdown();
        try {
            pool.awaitTermination(60, TimeUnit.MINUTES);
        }catch(Exception e){
            System.out.println("Thread interrupted");
        }
        System.out.println("All jobs complete");
        System.out.println(" Counter is "+counter.get());

    }
}

class QueryJob implements Runnable{
    String threadId;
    org.apache.spark.sql.hive.HiveContext sqlContext;
    String key;
    AtomicLong counter;
    final AtomicLong local_counter = new AtomicLong();

    public QueryJob(org.apache.spark.sql.hive.HiveContext _sqlContext,int id,AtomicLong ctr){

        threadId = "thread_"+id;
        this.sqlContext= _sqlContext;
        this.counter = ctr;
    }
    public void run() {
        for (int i = 0; i < 100; i++) {
            String tblName = threadId +"_"+i;
            DataFrame df = sqlContext.emptyDataFrame();
            df.registerTempTable(tblName);
            String _query = String.format("select count(*) from %s",tblName);
            System.out.println(String.format(" registered table %s; catalog (%s) ",tblName,debugTables()));
            List<Row> res;
            try {
                res = sqlContext.sql(_query).collectAsList();
            }catch (Exception e){
                System.out.println("*Exception "+ debugTables() +"**");
                throw e;
            }
            sqlContext.dropTempTable(tblName);
            System.out.println(" dropped table "+tblName);
            try {
                Thread.sleep(3000);//lets make this a not-so-tight loop
            }catch(Exception e){
                System.out.println("Thread interrupted");
            }
        }
    }

    private String debugTables(){
        String v = Joiner.on(',').join(sqlContext.tableNames());
        if (v==null)return ""; else return v;
    }
}
{quote}

this will periodically produce the following:

{quote}
 registered table thread_0_50; catalog (thread_1_50)
 registered table thread_4_50; catalog (thread_4_50,thread_1_50)
 registered table thread_1_50; catalog (thread_1_50)
 dropped table thread_1_50
 dropped table thread_4_50
*Exception **
Exception in thread "pool-6-thread-1" java.lang.Error: org.apache.spark.sql.AnalysisException: no such table thread_0_50; line 1 pos 21
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1151)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
  at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.sql.AnalysisException: no such table thread_0_50; line 1 pos 21
  at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
  at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.getTable(Analyzer.scala:177)
  at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$6.applyOrElse(Analyzer.scala:186)
  at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$6.applyOrElse(Analyzer.scala:181)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:188)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:188)
  at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:51)
  at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:187)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:208)
  at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
  at scala.collection.Iterator$class.foreach(Iterator.scala:727)
  at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
  at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
  at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
  at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
  at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
  at scala.collection.AbstractIterator.to(Iterator.scala:1157)
  at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
  at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
  at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
  at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
  at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildrenDown(TreeNode.scala:238)
  at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:193)
  at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:178)
  at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:181)
  at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:171)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:61)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:59)
  at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:111)
  at scala.collection.immutable.List.foldLeft(List.scala:84)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1.apply(RuleExecutor.scala:59)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1.apply(RuleExecutor.scala:51)
  at scala.collection.immutable.List.foreach(List.scala:318)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor.apply(RuleExecutor.scala:51)
  at org.apache.spark.sql.SQLContext$QueryExecution.analyzed$lzycompute(SQLContext.scala:1082)
  at org.apache.spark.sql.SQLContext$QueryExecution.analyzed(SQLContext.scala:1082)
  at org.apache.spark.sql.SQLContext$QueryExecution.assertAnalyzed(SQLContext.scala:1080)
  at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:133)
  at org.apache.spark.sql.DataFrame$.apply(DataFrame.scala:51)
  at org.apache.spark.sql.hive.HiveContext.sql(HiveContext.scala:101)
  at test.unit.QueryJob.run(ThreadRepro.java:93)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
{quote}

Line 93 is the .sql call...



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org