You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "poseidon (JIRA)" <ji...@apache.org> on 2017/06/05 02:10:04 UTC

[jira] [Updated] (SPARK-20896) spark executor get java.lang.ClassCastException when trigger two job at same time

     [ https://issues.apache.org/jira/browse/SPARK-20896?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

poseidon updated SPARK-20896:
-----------------------------
    Description: 
1、zeppelin 0.6.2  in *SCOPE* mode 
2、spark 1.6.2 
3、HDP 2.4 for HDFS YARN 

trigger scala code like :
{noformat}
var tmpDataFrame = sql(" select b1,b2,b3 from xxx.xxxxx")
val vectorDf = assembler.transform(tmpDataFrame)
val vectRdd = vectorDf.select("features").map{x:Row => x.getAs[Vector](0)}
val correlMatrix: Matrix = Statistics.corr(vectRdd, "spearman")

val columns = correlMatrix.toArray.grouped(correlMatrix.numRows)
val rows = columns.toSeq.transpose
val vectors = rows.map(row => new DenseVector(row.toArray))
val vRdd = sc.parallelize(vectors)

import sqlContext.implicits._
val dfV = vRdd.map(_.toArray).map{ case Array(b1,b2,b3) => (b1,b2,b3) }.toDF()

val rows = dfV.rdd.zipWithIndex.map(_.swap)
              .join(sc.parallelize(Array("b1","b2","b3")).zipWithIndex.map(_.swap))
              .values.map{case (row: Row, x: String) => Row.fromSeq(row.toSeq :+ x)}
{noformat}
---
and code :
{noformat}
var df = sql("select b1,b2 from xxxx.xxxxx")
var i = 0
var threshold = Array(2.0,3.0)
var inputCols = Array("b1","b2")

var tmpDataFrame = df
for (col <- inputCols){
  val binarizer: Binarizer = new Binarizer().setInputCol(col)
    .setOutputCol(inputCols(i)+"_binary")
    .setThreshold(threshold(i))
  tmpDataFrame = binarizer.transform(tmpDataFrame).drop(inputCols(i))
  i = i+1
}
var saveDFBin = tmpDataFrame

        val dfAppendBin = sql("select b3 from poseidon.corelatdemo")
        val rows = saveDFBin.rdd.zipWithIndex.map(_.swap)
          .join(dfAppendBin.rdd.zipWithIndex.map(_.swap))
          .values.map{case (row1: Row, row2: Row) => Row.fromSeq(row1.toSeq ++ row2.toSeq)}

        import org.apache.spark.sql.types.StructType
        val rowSchema = StructType(saveDFBin.schema.fields ++ dfAppendBin.schema.fields)
        saveDFBin = sqlContext.createDataFrame(rows, rowSchema)


//save result to table
import org.apache.spark.sql.SaveMode
saveDFBin.write.mode(SaveMode.Overwrite).saveAsTable("xxxx.xxxxxxxx")
sql("alter table xxxx.xxxxxxxx set lifecycle 1")
{noformat}

on zeppelin with two different notebook at same time. 

Found this exception log in  executor :
{quote}
l1.dtdream.com): java.lang.ClassCastException: org.apache.spark.mllib.linalg.DenseVector cannot be cast to scala.Tuple2
        at $line127359816836.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:34)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1597)
        at org.apache.spark.rdd.ZippedWithIndexRDD$$anonfun$2.apply(ZippedWithIndexRDD.scala:52)
        at org.apache.spark.rdd.ZippedWithIndexRDD$$anonfun$2.apply(ZippedWithIndexRDD.scala:52)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1875)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1875)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
        at org.apache.spark.scheduler.Task.run(Task.scala:89)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)

{quote}

OR 
{quote}
java.lang.ClassCastException: scala.Tuple2 cannot be cast to org.apache.spark.mllib.linalg.DenseVector
        at $line34684895436.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:57)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:149)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
        at org.apache.spark.scheduler.Task.run(Task.scala:89)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
{quote}

some log from executor:
{quote}
17/05/26 16:39:44 INFO executor.Executor: Running task 3.1 in stage 36.0 (TID 598)
17/05/26 16:39:44 INFO broadcast.TorrentBroadcast: Started reading broadcast variable 30
17/05/26 16:39:44 INFO storage.MemoryStore: Block broadcast_30_piece0 stored as bytes in memory (estimated size 3.0 KB, free 909.7 KB)
17/05/26 16:39:44 INFO broadcast.TorrentBroadcast: Reading broadcast variable 30 took 80 ms
17/05/26 16:39:44 INFO storage.MemoryStore: Block broadcast_30 stored as values in memory (estimated size 5.4 KB, free 915.0 KB)
17/05/26 16:39:44 INFO parquet.ParquetRelation$$anonfun$buildInternalScan$1$$anon$1: Input split: ParquetInputSplit{part: hdfs://dtyundun/user/hive/warehouse/poseidon.db/corelatdemo2/part-r-00003-985c6ac4-cf31-4d7e-be4d-90df136d6b64.gz.parquet start: 0 end: 922 length: 922 hosts: []}
17/05/26 16:39:44 INFO broadcast.TorrentBroadcast: Started reading broadcast variable 23
17/05/26 16:39:44 INFO storage.MemoryStore: Block broadcast_23_piece0 stored as bytes in memory (estimated size 25.0 KB, free 940.0 KB)
17/05/26 16:39:44 INFO broadcast.TorrentBroadcast: Reading broadcast variable 23 took 6 ms
17/05/26 16:39:44 INFO storage.MemoryStore: Block broadcast_23 stored as values in memory (estimated size 352.7 KB, free 1292.6 KB)
17/05/26 16:39:44 INFO compress.CodecPool: Got brand-new decompressor [.gz]
17/05/26 16:39:44 ERROR executor.Executor: Exception in task 3.1 in stage 36.0 (TID 598)
java.lang.ClassCastException: scala.Tuple2 cannot be cast to org.apache.spark.mllib.linalg.DenseVector
	at $line169687739436.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:57)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:149)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
	at org.apache.spark.scheduler.Task.run(Task.scala:89)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
	at java.lang.Thread.run(Thread.java:745)
{quote}


these two exception never show in pairs, and will never show when you run two code separately

If you delete the zipWithIndex part in any code ,you can not get the exception. 

Feels like when driver making DAG for zipWithIndex with two RDD at same time , some where should be synchronized or locked. 

So far as I know , if you just use spark-shell, or spark-submit or spark-thrift , you can not  recreate some situations like this case do. 

So. Is this a wrong way to use spark-shell like this , multi-user with same context , doing similar job? In other words ,  zeppelin SCOPE mode is not going to be a steady mode , if we can not deal with exceptions like these 


  was:
1、zeppelin 0.6.2  in *SCOPE* mode 
2、spark 1.6.2 
3、HDP 2.4 for HDFS YARN 

trigger scala code like :
{quote}
var tmpDataFrame = sql(" select b1,b2,b3 from xxx.xxxxx")
val vectorDf = assembler.transform(tmpDataFrame)
val vectRdd = vectorDf.select("features").map{x:Row => x.getAs[Vector](0)}
val correlMatrix: Matrix = Statistics.corr(vectRdd, "spearman")

val columns = correlMatrix.toArray.grouped(correlMatrix.numRows)
val rows = columns.toSeq.transpose
val vectors = rows.map(row => new DenseVector(row.toArray))
val vRdd = sc.parallelize(vectors)

import sqlContext.implicits._
val dfV = vRdd.map(_.toArray).map{ case Array(b1,b2,b3) => (b1,b2,b3) }.toDF()

val rows = dfV.rdd.zipWithIndex.map(_.swap)
              .join(sc.parallelize(Array("b1","b2","b3")).zipWithIndex.map(_.swap))
              .values.map{case (row: Row, x: String) => Row.fromSeq(row.toSeq :+ x)}
{quote}
---
and code :
{quote}
var df = sql("select b1,b2 from xxxx.xxxxx")
var i = 0
var threshold = Array(2.0,3.0)
var inputCols = Array("b1","b2")

var tmpDataFrame = df
for (col <- inputCols){
  val binarizer: Binarizer = new Binarizer().setInputCol(col)
    .setOutputCol(inputCols(i)+"_binary")
    .setThreshold(threshold(i))
  tmpDataFrame = binarizer.transform(tmpDataFrame).drop(inputCols(i))
  i = i+1
}
var saveDFBin = tmpDataFrame

        val dfAppendBin = sql("select b3 from poseidon.corelatdemo")
        val rows = saveDFBin.rdd.zipWithIndex.map(_.swap)
          .join(dfAppendBin.rdd.zipWithIndex.map(_.swap))
          .values.map{case (row1: Row, row2: Row) => Row.fromSeq(row1.toSeq ++ row2.toSeq)}

        import org.apache.spark.sql.types.StructType
        val rowSchema = StructType(saveDFBin.schema.fields ++ dfAppendBin.schema.fields)
        saveDFBin = sqlContext.createDataFrame(rows, rowSchema)


//save result to table
import org.apache.spark.sql.SaveMode
saveDFBin.write.mode(SaveMode.Overwrite).saveAsTable("xxxx.xxxxxxxx")
sql("alter table xxxx.xxxxxxxx set lifecycle 1")
{quote}

on zeppelin with two different notebook at same time. 

Found this exception log in  executor :
{quote}
l1.dtdream.com): java.lang.ClassCastException: org.apache.spark.mllib.linalg.DenseVector cannot be cast to scala.Tuple2
        at $line127359816836.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:34)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1597)
        at org.apache.spark.rdd.ZippedWithIndexRDD$$anonfun$2.apply(ZippedWithIndexRDD.scala:52)
        at org.apache.spark.rdd.ZippedWithIndexRDD$$anonfun$2.apply(ZippedWithIndexRDD.scala:52)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1875)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1875)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
        at org.apache.spark.scheduler.Task.run(Task.scala:89)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)

{quote}

OR 
{quote}
java.lang.ClassCastException: scala.Tuple2 cannot be cast to org.apache.spark.mllib.linalg.DenseVector
        at $line34684895436.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:57)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:149)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
        at org.apache.spark.scheduler.Task.run(Task.scala:89)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
{quote}

some log from executor:
{quote}
17/05/26 16:39:44 INFO executor.Executor: Running task 3.1 in stage 36.0 (TID 598)
17/05/26 16:39:44 INFO broadcast.TorrentBroadcast: Started reading broadcast variable 30
17/05/26 16:39:44 INFO storage.MemoryStore: Block broadcast_30_piece0 stored as bytes in memory (estimated size 3.0 KB, free 909.7 KB)
17/05/26 16:39:44 INFO broadcast.TorrentBroadcast: Reading broadcast variable 30 took 80 ms
17/05/26 16:39:44 INFO storage.MemoryStore: Block broadcast_30 stored as values in memory (estimated size 5.4 KB, free 915.0 KB)
17/05/26 16:39:44 INFO parquet.ParquetRelation$$anonfun$buildInternalScan$1$$anon$1: Input split: ParquetInputSplit{part: hdfs://dtyundun/user/hive/warehouse/poseidon.db/corelatdemo2/part-r-00003-985c6ac4-cf31-4d7e-be4d-90df136d6b64.gz.parquet start: 0 end: 922 length: 922 hosts: []}
17/05/26 16:39:44 INFO broadcast.TorrentBroadcast: Started reading broadcast variable 23
17/05/26 16:39:44 INFO storage.MemoryStore: Block broadcast_23_piece0 stored as bytes in memory (estimated size 25.0 KB, free 940.0 KB)
17/05/26 16:39:44 INFO broadcast.TorrentBroadcast: Reading broadcast variable 23 took 6 ms
17/05/26 16:39:44 INFO storage.MemoryStore: Block broadcast_23 stored as values in memory (estimated size 352.7 KB, free 1292.6 KB)
17/05/26 16:39:44 INFO compress.CodecPool: Got brand-new decompressor [.gz]
17/05/26 16:39:44 ERROR executor.Executor: Exception in task 3.1 in stage 36.0 (TID 598)
java.lang.ClassCastException: scala.Tuple2 cannot be cast to org.apache.spark.mllib.linalg.DenseVector
	at $line169687739436.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:57)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:149)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
	at org.apache.spark.scheduler.Task.run(Task.scala:89)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
	at java.lang.Thread.run(Thread.java:745)
{quote}


these two exception never show in pairs, and will never show when you run two code separately

If you delete the zipWithIndex part in any code ,you can not get the exception. 

Feels like when driver making DAG for zipWithIndex with two RDD at same time , some where should be synchronized or locked. 

So far as I know , if you just use spark-shell, or spark-submit or spark-thrift , you can not  recreate some situations like this case do. 

So. Is this a wrong way to use spark-shell like this , multi-user with same context , doing similar job? In other words ,  zeppelin SCOPE mode is not going to be a steady mode , if we can not deal with exceptions like these 



> spark executor get java.lang.ClassCastException when trigger two job at same time
> ---------------------------------------------------------------------------------
>
>                 Key: SPARK-20896
>                 URL: https://issues.apache.org/jira/browse/SPARK-20896
>             Project: Spark
>          Issue Type: Bug
>          Components: MLlib
>    Affects Versions: 1.6.1
>            Reporter: poseidon
>
> 1、zeppelin 0.6.2  in *SCOPE* mode 
> 2、spark 1.6.2 
> 3、HDP 2.4 for HDFS YARN 
> trigger scala code like :
> {noformat}
> var tmpDataFrame = sql(" select b1,b2,b3 from xxx.xxxxx")
> val vectorDf = assembler.transform(tmpDataFrame)
> val vectRdd = vectorDf.select("features").map{x:Row => x.getAs[Vector](0)}
> val correlMatrix: Matrix = Statistics.corr(vectRdd, "spearman")
> val columns = correlMatrix.toArray.grouped(correlMatrix.numRows)
> val rows = columns.toSeq.transpose
> val vectors = rows.map(row => new DenseVector(row.toArray))
> val vRdd = sc.parallelize(vectors)
> import sqlContext.implicits._
> val dfV = vRdd.map(_.toArray).map{ case Array(b1,b2,b3) => (b1,b2,b3) }.toDF()
> val rows = dfV.rdd.zipWithIndex.map(_.swap)
>               .join(sc.parallelize(Array("b1","b2","b3")).zipWithIndex.map(_.swap))
>               .values.map{case (row: Row, x: String) => Row.fromSeq(row.toSeq :+ x)}
> {noformat}
> ---
> and code :
> {noformat}
> var df = sql("select b1,b2 from xxxx.xxxxx")
> var i = 0
> var threshold = Array(2.0,3.0)
> var inputCols = Array("b1","b2")
> var tmpDataFrame = df
> for (col <- inputCols){
>   val binarizer: Binarizer = new Binarizer().setInputCol(col)
>     .setOutputCol(inputCols(i)+"_binary")
>     .setThreshold(threshold(i))
>   tmpDataFrame = binarizer.transform(tmpDataFrame).drop(inputCols(i))
>   i = i+1
> }
> var saveDFBin = tmpDataFrame
>         val dfAppendBin = sql("select b3 from poseidon.corelatdemo")
>         val rows = saveDFBin.rdd.zipWithIndex.map(_.swap)
>           .join(dfAppendBin.rdd.zipWithIndex.map(_.swap))
>           .values.map{case (row1: Row, row2: Row) => Row.fromSeq(row1.toSeq ++ row2.toSeq)}
>         import org.apache.spark.sql.types.StructType
>         val rowSchema = StructType(saveDFBin.schema.fields ++ dfAppendBin.schema.fields)
>         saveDFBin = sqlContext.createDataFrame(rows, rowSchema)
> //save result to table
> import org.apache.spark.sql.SaveMode
> saveDFBin.write.mode(SaveMode.Overwrite).saveAsTable("xxxx.xxxxxxxx")
> sql("alter table xxxx.xxxxxxxx set lifecycle 1")
> {noformat}
> on zeppelin with two different notebook at same time. 
> Found this exception log in  executor :
> {quote}
> l1.dtdream.com): java.lang.ClassCastException: org.apache.spark.mllib.linalg.DenseVector cannot be cast to scala.Tuple2
>         at $line127359816836.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:34)
>         at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>         at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>         at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>         at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>         at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1597)
>         at org.apache.spark.rdd.ZippedWithIndexRDD$$anonfun$2.apply(ZippedWithIndexRDD.scala:52)
>         at org.apache.spark.rdd.ZippedWithIndexRDD$$anonfun$2.apply(ZippedWithIndexRDD.scala:52)
>         at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1875)
>         at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1875)
>         at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>         at org.apache.spark.scheduler.Task.run(Task.scala:89)
>         at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>         at java.lang.Thread.run(Thread.java:745)
> {quote}
> OR 
> {quote}
> java.lang.ClassCastException: scala.Tuple2 cannot be cast to org.apache.spark.mllib.linalg.DenseVector
>         at $line34684895436.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:57)
>         at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>         at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:149)
>         at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
>         at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>         at org.apache.spark.scheduler.Task.run(Task.scala:89)
>         at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>         at java.lang.Thread.run(Thread.java:745)
> {quote}
> some log from executor:
> {quote}
> 17/05/26 16:39:44 INFO executor.Executor: Running task 3.1 in stage 36.0 (TID 598)
> 17/05/26 16:39:44 INFO broadcast.TorrentBroadcast: Started reading broadcast variable 30
> 17/05/26 16:39:44 INFO storage.MemoryStore: Block broadcast_30_piece0 stored as bytes in memory (estimated size 3.0 KB, free 909.7 KB)
> 17/05/26 16:39:44 INFO broadcast.TorrentBroadcast: Reading broadcast variable 30 took 80 ms
> 17/05/26 16:39:44 INFO storage.MemoryStore: Block broadcast_30 stored as values in memory (estimated size 5.4 KB, free 915.0 KB)
> 17/05/26 16:39:44 INFO parquet.ParquetRelation$$anonfun$buildInternalScan$1$$anon$1: Input split: ParquetInputSplit{part: hdfs://dtyundun/user/hive/warehouse/poseidon.db/corelatdemo2/part-r-00003-985c6ac4-cf31-4d7e-be4d-90df136d6b64.gz.parquet start: 0 end: 922 length: 922 hosts: []}
> 17/05/26 16:39:44 INFO broadcast.TorrentBroadcast: Started reading broadcast variable 23
> 17/05/26 16:39:44 INFO storage.MemoryStore: Block broadcast_23_piece0 stored as bytes in memory (estimated size 25.0 KB, free 940.0 KB)
> 17/05/26 16:39:44 INFO broadcast.TorrentBroadcast: Reading broadcast variable 23 took 6 ms
> 17/05/26 16:39:44 INFO storage.MemoryStore: Block broadcast_23 stored as values in memory (estimated size 352.7 KB, free 1292.6 KB)
> 17/05/26 16:39:44 INFO compress.CodecPool: Got brand-new decompressor [.gz]
> 17/05/26 16:39:44 ERROR executor.Executor: Exception in task 3.1 in stage 36.0 (TID 598)
> java.lang.ClassCastException: scala.Tuple2 cannot be cast to org.apache.spark.mllib.linalg.DenseVector
> 	at $line169687739436.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:57)
> 	at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> 	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:149)
> 	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
> 	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> 	at org.apache.spark.scheduler.Task.run(Task.scala:89)
> 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> 	at java.lang.Thread.run(Thread.java:745)
> {quote}
> these two exception never show in pairs, and will never show when you run two code separately
> If you delete the zipWithIndex part in any code ,you can not get the exception. 
> Feels like when driver making DAG for zipWithIndex with two RDD at same time , some where should be synchronized or locked. 
> So far as I know , if you just use spark-shell, or spark-submit or spark-thrift , you can not  recreate some situations like this case do. 
> So. Is this a wrong way to use spark-shell like this , multi-user with same context , doing similar job? In other words ,  zeppelin SCOPE mode is not going to be a steady mode , if we can not deal with exceptions like these 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org