You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Anson Abraham <an...@gmail.com> on 2014/12/11 00:44:48 UTC

Unread block issue w/ spark 1.1.0 on CDH5

I recently installed spark standalone through cloudera manager on my CDH
5.2 cluster.  CDH 5.2 is runing on CentOS release 6.6.  The version of
spark again through Cloudera is 1.1.  It is standalone.

I have a file in hdfs in /tmp/testfile.txt.

So what I do is i run spark-shell:

scala> val source = sc.textFile("/tmp/testfile.tsv")


14/12/10 22:41:14 INFO MemoryStore: ensureFreeSpace(163794) called with
curMem=0, maxMem=278302556
14/12/10 22:41:14 INFO MemoryStore: Block broadcast_0 stored as values in
memory (estimated size 160.0 KB, free 265.3 MB)
14/12/10 22:41:14 INFO MemoryStore: ensureFreeSpace(13592) called with
curMem=163794, maxMem=278302556
14/12/10 22:41:14 INFO MemoryStore: Block broadcast_0_piece0 stored as
bytes in memory (estimated size 13.3 KB, free 265.2 MB)
14/12/10 22:41:14 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory
on cloudera-3.testdomain.net:34641 (size: 13.3 KB, free: 265.4 MB)
14/12/10 22:41:14 INFO BlockManagerMaster: Updated info of block
broadcast_0_piece0
source: org.apache.spark.rdd.RDD[String] = /tmp/testfile.tsv MappedRDD[1]
at textFile at <console>:12

when i type this command:
scala> source.saveAsTextFile("/tmp/zzz_testsparkoutput")

I'm hitting this:

14/11/18 21:15:08 INFO DAGScheduler: Failed to run saveAsTextFile at
<console>:15
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage
0.0 (TID 6, cloudera-1.testdomain.net): java.lang.IllegalStateException:
unread block data

java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2421)
        java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1382)

java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)

java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)

java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)

org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)

org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87)

org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:162)

java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        java.lang.Thread.run(Thread.java:744)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1173)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688)
at scala.Option.foreach(Option.scala:236)
at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:688)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1391)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


Cant figure out what the issue is.  The file i'm loading is literally just
7 MB.  I thought it was jar files mismatch, but i did a compare and see
they're all identical.  But seeing as how they were all installed through
CDH parcels, not sure how there would be version mismatch on the nodes and
master.  Oh yeah 1 master node w/ 2 worker nodes and running in standalone
not through yarn.  So as a just in case, i copied the jars from the master
to the 2 worker nodes as just in case, and still same issue.


My spark-env.sh on the servers:

#!/usr/bin/env bash
##
# Generated by Cloudera Manager and should not be modified directly
##

export SPARK_HOME=/opt/cloudera/parcels/CDH-5.2.0-1.cdh5.2.0.p0.36/lib/spark
export STANDALONE_SPARK_MASTER_HOST=cloudera-1.testdomain.net
export SPARK_MASTER_PORT=7077
export
DEFAULT_HADOOP_HOME=/opt/cloudera/parcels/CDH-5.2.0-1.cdh5.2.0.p0.36/lib/hadoop

### Path of Spark assembly jar in HDFS
export
SPARK_JAR_HDFS_PATH=${SPARK_JAR_HDFS_PATH:-/user/spark/share/lib/spark-assembly.jar}

### Let's run everything with JVM runtime, instead of Scala
export SPARK_LAUNCH_WITH_SCALA=0
export SPARK_LIBRARY_PATH=${SPARK_HOME}/lib
export SCALA_LIBRARY_PATH=${SPARK_HOME}/lib
export SPARK_MASTER_IP=$STANDALONE_SPARK_MASTER_HOST

export HADOOP_HOME=${HADOOP_HOME:-$DEFAULT_HADOOP_HOME}

if [ -n "$HADOOP_HOME" ]; then
  export SPARK_LIBRARY_PATH=$SPARK_LIBRARY_PATH:${HADOOP_HOME}/lib/native
fi

export HADOOP_CONF_DIR=${HADOOP_CONF_DIR:-/etc/hadoop/conf}


And here's the spark-defaults.conf:

spark.eventLog.dir=hdfs://
cloudera-2.testdomain.net:8020/user/spark/applicationHistory
spark.eventLog.enabled=true
spark.master=spark://cloudera-1.testdomain.net:7077

Any help here would be greatly appreciated.

I'm able to telnet but those ports to each server.  And ip resolves.  I'm @
a loss.

I tried doing a submit of a python file:

spark-submit --master spark://$SPARK_MASTER_IP:$SPARK_MASTER_PORT
--executor-memory 1G --total-executor-cores 2 test.py

where test.py is essentially:

from pyspark import SparkConf, SparkContext
sc = SparkContext()
print sc.textFile('/tmp/testfile.tsv').count()

and I still get same issue.

However, when i tried using the spark examples jar file:

spark-submit --class org.apache.spark.examples.SparkPi --master
spark://$SPARK_MASTER_IP:$SPARK_MASTER_PORT --executor-memory 1G
--total-executor-cores 2
$SPARK_HOME/examples/lib/spark-examples-1.1.0-cdh5.2.0-hadoop2.5.0-cdh5.2.0.jar
10

I had no issues.  What am I missing?

Thanks for any help and/or feedback.