You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@pig.apache.org by "liyunzhang_intel (JIRA)" <ji...@apache.org> on 2016/11/03 05:31:58 UTC

[jira] [Created] (PIG-5054) Initialize SchemaTupleBackend correctly in backend in spark mode if spark job has more than 1 stage

liyunzhang_intel created PIG-5054:
-------------------------------------

             Summary: Initialize SchemaTupleBackend  correctly in backend in spark mode if spark job has more than 1 stage
                 Key: PIG-5054
                 URL: https://issues.apache.org/jira/browse/PIG-5054
             Project: Pig
          Issue Type: Sub-task
            Reporter: liyunzhang_intel
            Assignee: liyunzhang_intel


After PIG-4970, we remove the serialization and deserialization of jobConf in spark mode. But in script of pigmix L5.pig 
{code}
  register pigperf.jar;
A = load '/user/pig/tests/data/pigmix/page_views' using org.apache.pig.test.udf.storefunc.PigPerformanceLoader()
    as (user, action, timespent, query_term, ip_addr, timestamp,
        estimated_revenue, page_info, page_links);
B = foreach A generate user;
alpha = load '/user/pig/tests/data/pigmix/users' using PigStorage('\u0001') as (name, phone, address,
        city, state, zip);
beta = foreach alpha generate name;
C = cogroup beta by name, B by user parallel 40;
D = filter C by COUNT(beta) == 0;
E = foreach D generate group;
store E into 'L5out';
{code}

following error is thrown out in log
{noformat}
java.lang.RuntimeException: org.apache.pig.backend.executionengine.ExecException: ERROR 0: Error while executing ForEach at [C[-1,-1]]
     at org.apache.pig.backend.hadoop.executionengine.spark.converter.OutputConsumerIterator.readNext(OutputConsumerIterator.java:89)
     at org.apache.pig.backend.hadoop.executionengine.spark.converter.OutputConsumerIterator.hasNext(OutputConsumerIterator.java:96)
     at scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:41)
     at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
     at scala.collection.convert.Wrappers$IteratorWrapper.hasNext(Wrappers.scala:29)
     at org.apache.pig.backend.hadoop.executionengine.spark.converter.OutputConsumerIterator.readNext(OutputConsumerIterator.java:57)
     at org.apache.pig.backend.hadoop.executionengine.spark.converter.OutputConsumerIterator.hasNext(OutputConsumerIterator.java:96)
     at scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:41)
     at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
     at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12$$anonfun$apply$4.apply$mcV$sp(PairRDDFunctions.scala:1111)
     at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12$$anonfun$apply$4.apply(PairRDDFunctions.scala:1111)
     at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12$$anonfun$apply$4.apply(PairRDDFunctions.scala:1111)
     at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1250)
     at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1119)
     at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1091)
     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
     at org.apache.spark.scheduler.Task.run(Task.scala:89)
     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
     at java.lang.Thread.run(Thread.java:722)
Caused by: org.apache.pig.backend.executionengine.ExecException: ERROR 0: Error while executing ForEach at [C[-1,-1]]
     at org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach.getNextTuple(POForEach.java:329)
     at org.apache.pig.backend.hadoop.executionengine.spark.converter.ForEachConverter$ForEachFunction$1$1.getNextResult(ForEachConverter.java:87)
     at org.apache.pig.backend.hadoop.executionengine.spark.converter.OutputConsumerIterator.readNext(OutputConsumerIterator.java:69)
     ... 20 more
Caused by: java.lang.RuntimeException: initialize was not called! Even when SchemaTuple feature is not set, it should be called.
     at org.apache.pig.data.SchemaTupleBackend.newSchemaTupleFactory(SchemaTupleBackend.java:294)
     at org.apache.pig.data.SchemaTupleFactory.getInstance(SchemaTupleFactory.java:119)
     at org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach.processPlan(POForEach.java:350)
     at org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach.getNextTuple(POForEach.java:321)
     ... 22 more
{noformat}

It seems that SchemaTupleBackend is not correctly initialized.  The reason for this error is because after PIG-4970, we initialized SchemaTupleBackend in [PigInputFormatSpark#initialize|https://github.com/apache/pig/blob/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/running/PigInputFormatSpark.java#L68] before we load data(stage-0). But it is not initialized in other stage(such as stage1). So if there are more than 1 stage, the exception will be thrown out.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)