You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Flávio Santos <ba...@chaordicsystems.com> on 2014/12/10 21:44:25 UTC

Key not valid / already cancelled using Spark Streaming

Dear Spark'ers,

I'm trying to run a simple job using Spark Streaming (version 1.1.1) and
YARN (yarn-cluster), but unfortunately I'm facing many issues. In short, my
job does the following:
- Consumes a specific Kafka topic
- Writes its content to S3 or HDFS

Records in Kafka are in the form:
{"key": "someString"}

This is important because I use the value of "key" to define the output
file name in S3.
Here are the Spark and Kafka parameters I'm using:

val sparkConf = new SparkConf()
>   .setAppName("MyDumperApp")
>   .set("spark.task.maxFailures", "100")
>   .set("spark.hadoop.validateOutputSpecs", "false")
>   .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
>   .set("spark.executor.extraJavaOptions", "-XX:+UseCompressedOops")
> val kafkaParams = Map(
>   "zookeeper.connect" -> zkQuorum,
>   "zookeeper.session.timeout.ms" -> "10000",
>   "rebalance.backoff.ms" -> "8000",
>   "rebalance.max.retries" -> "10",
>   "group.id" -> group,
>   "auto.offset.reset" -> "largest"
> )


My application is the following:

KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc,
> kafkaParams, Map(topic -> 1), StorageLevel.MEMORY_AND_DISK_SER_2)
>   .foreachRDD((rdd, time) =>
>     rdd.map {
>       case (_, line) =>
>         val json = parse(line)
>         val key = extract(json, "key").getOrElse("key_not_found")
>         (key, dateFormatter.format(time.milliseconds)) -> line
>     }
>       .partitionBy(new HashPartitioner(10))
>       .saveAsHadoopFile[KeyBasedOutput[(String,String),
> String]]("s3://BUCKET", classOf[BZip2Codec])
>   )


And the last piece:

class KeyBasedOutput[T >: Null, V <: AnyRef] extends
> MultipleTextOutputFormat[T , V] {
>   override protected def generateFileNameForKeyValue(key: T, value: V,
> leaf: String) = key match {
>     case (myKey, batchId) =>
>       "somedir" + "/" + myKey + "/" +
>         "prefix-" + myKey + "_" + batchId + "_" + leaf
>   }
>   override protected def generateActualKey(key: T, value: V) = null
> }


I use batch sizes of 5 minutes with checkpoints activated.
The job fails nondeterministically (I think it never ran longer than ~5
hours). I have no clue why, it simply fails.
Please find below the exceptions thrown by my application.

I really appreciate any kind of hint.
Thank you very much in advance.

Regards,
-- Flávio

==== Executor 1

2014-12-10 19:05:15,150 INFO  [handle-read-write-executor-3]
network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
SendingConnection
 to ConnectionManagerId(ec2-DRIVER.compute-1.amazonaws.com,49163)
2014-12-10 19:05:15,201 INFO  [Thread-6] storage.MemoryStore
(Logging.scala:logInfo(59)) - ensureFreeSpace(98665) called with
curMem=194463488,
 maxMem=4445479895
2014-12-10 19:05:15,202 INFO  [Thread-6] storage.MemoryStore
(Logging.scala:logInfo(59)) - Block input-0-1418238315000 stored as bytes
in memor
y (estimated size 96.4 KB, free 4.0 GB)
2014-12-10 19:05:16,506 INFO  [handle-read-write-executor-2]
network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
ReceivingConnecti
on to ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,51443)
2014-12-10 19:05:16,506 INFO  [handle-read-write-executor-1]
network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
SendingConnection
 to ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,51443)
2014-12-10 19:05:16,506 INFO  [handle-read-write-executor-2]
network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
SendingConnection
 to ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,51443)
2014-12-10 19:05:16,649 INFO  [handle-read-write-executor-0]
network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
ReceivingConnecti
on to ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,39644)
2014-12-10 19:05:16,649 INFO  [handle-read-write-executor-3]
network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
SendingConnection
 to ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,39644)
2014-12-10 19:05:16,649 INFO  [handle-read-write-executor-0]
network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
SendingConnection
 to ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,39644)
2014-12-10 19:05:16,650 INFO  [connection-manager-thread]
network.ConnectionManager (Logging.scala:logInfo(59)) - Key not valid ?
sun.nio.ch.Se
lectionKeyImpl@da2e041
2014-12-10 19:05:16,651 INFO  [connection-manager-thread]
network.ConnectionManager (Logging.scala:logInfo(80)) - key already
cancelled ? sun.n
io.ch.SelectionKeyImpl@da2e041
*java.nio.channels.CancelledKeyException*
        at
org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:316)
        at
org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:145)
2014-12-10 19:05:16,679 INFO  [handle-read-write-executor-1]
network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
ReceivingConnection to ConnectionManagerId(
ec2-EXECUTOR.compute-1.amazonaws.com,39444)
2014-12-10 19:05:16,679 INFO  [handle-read-write-executor-2]
network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
SendingConnection to ConnectionManagerId(
ec2-EXECUTOR.compute-1.amazonaws.com,39444)
2014-12-10 19:05:16,679 INFO  [handle-read-write-executor-1]
network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
SendingConnection to ConnectionManagerId(
ec2-EXECUTOR.compute-1.amazonaws.com,39444)
2014-12-10 19:05:16,680 INFO  [connection-manager-thread]
network.ConnectionManager (Logging.scala:logInfo(59)) - Key not valid ?
sun.nio.ch.SelectionKeyImpl@6a0dd98a
2014-12-10 19:05:16,681 INFO  [connection-manager-thread]
network.ConnectionManager (Logging.scala:logInfo(80)) - key already
cancelled ? sun.nio.ch.SelectionKeyImpl@6a0dd98a
*java.nio.channels.CancelledKeyException*
        at
org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:392)
        at
org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:145)
2014-12-10 19:05:16,717 INFO  [handle-read-write-executor-3]
network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
ReceivingConnection to ConnectionManagerId(
ec2-EXECUTOR.compute-1.amazonaws.com,57984)
2014-12-10 19:05:16,717 INFO  [handle-read-write-executor-0]
network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
SendingConnection to ConnectionManagerId(
ec2-EXECUTOR.compute-1.amazonaws.com,57984)
2014-12-10 19:05:16,717 INFO  [handle-read-write-executor-3]
network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
SendingConnection to ConnectionManagerId(
ec2-EXECUTOR.compute-1.amazonaws.com,57984)
2014-12-10 19:05:17,182 ERROR [SIGTERM handler]
executor.CoarseGrainedExecutorBackend (SignalLogger.scala:handle(57)) -
RECEIVED SIGNAL 15: SIGTERM

==== Executor 2

2014-12-10 19:05:15,010 INFO  [handle-message-executor-11]
storage.BlockManagerMaster (Logging.scala:logInfo(59)) - Updated info of
block input
-0-1418238314800
2014-12-10 19:05:15,157 INFO  [connection-manager-thread]
network.ConnectionManager (Logging.scala:logInfo(59)) - Key not valid ?
sun.nio.ch.Se
lectionKeyImpl@66ea19c
2014-12-10 19:05:15,157 INFO  [handle-read-write-executor-2]
network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
SendingConnection
 to ConnectionManagerId(ec2-DRIVER.compute-1.amazonaws.com,49163)
2014-12-10 19:05:15,157 INFO  [handle-read-write-executor-0]
network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
ReceivingConnecti
on to ConnectionManagerId(ec2-DRIVER.compute-1.amazonaws.com,49163)
2014-12-10 19:05:15,158 ERROR [handle-read-write-executor-0]
network.ConnectionManager (Logging.scala:logError(75)) - Corresponding
SendingConn
ection to ConnectionManagerId(ec2-DRIVER.compute-1.amazonaws.com,49163) not
found
2014-12-10 19:05:15,158 INFO  [connection-manager-thread]
network.ConnectionManager (Logging.scala:logInfo(80)) - key already
cancelled ? sun.n
io.ch.SelectionKeyImpl@66ea19c
*java.nio.channels.CancelledKeyException*
        at
org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:392)
        at
org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:145)

==== Driver

2014-12-10 19:05:13,805 INFO
 [sparkDriver-akka.actor.default-dispatcher-13] storage.BlockManagerInfo
(Logging.scala:logInfo(59)) - Added input
-0-1418238313600 in memory on ec2-EXECUTOR.compute-1.amazonaws.com:39444
(size: 38.2 KB, free: 4.1 GB)
2014-12-10 19:05:13,823 ERROR
[sparkDriver-akka.actor.default-dispatcher-16] scheduler.JobScheduler
(Logging.scala:logError(96)) - Error runnin
g job streaming job 1418238300000 ms.0
*java.io.FileNotFoundException*: File
s3n://BUCKET/_temporary/0/task_201412101900_0039_m_000033 does not exist.
        at
org.apache.hadoop.fs.s3native.NativeS3FileSystem.listStatus(NativeS3FileSystem.java:506)
        at
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:360)
        at
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:310)
        at
org.apache.hadoop.mapred.FileOutputCommitter.commitJob(FileOutputCommitter.java:136)
        at
org.apache.spark.SparkHadoopWriter.commitJob(SparkHadoopWriter.scala:126)
        at
org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:995)
        at
org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:878)
        at
org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:845)
        at
org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:803)
        at
MyDumperClass$$anonfun$main$1.apply(IncrementalDumpsJenkins.scala:100)
        at
MyDumperClass$$anonfun$main$1.apply(IncrementalDumpsJenkins.scala:79)
        at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:41)
        at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
        at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
        at scala.util.Try$.apply(Try.scala:161)
        at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32)
        at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:172)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
2014-12-10 19:05:13,829 INFO  [Driver] yarn.ApplicationMaster
(Logging.scala:logInfo(59)) - Unregistering ApplicationMaster with FAILED


*--Flávio R. Santos*

Chaordic | *Platform*
*www.chaordic.com.br <http://www.chaordic.com.br/>*
+55 48 3232.3200

Re: Key not valid / already cancelled using Spark Streaming

Posted by Tathagata Das <ta...@gmail.com>.
Aah yes, that makes sense. You could write first to HDFS, and when
that works, copy from HDFS to S3. That should work as it wont depend
on the temporary files to be in S3.
I am not sure how much you can customize just for S3 in Spark code. In
Spark, since we just use Hadoop API to write there isnt much, if at
all any, code that is customized to a particular file system. An
alternative is to see whether you could do retries efficiently for all
file systems.

TD

On Thu, Dec 11, 2014 at 6:50 AM, Flávio Santos
<ba...@chaordicsystems.com> wrote:
> Hello guys,
>
> Thank you for your prompt reply.
> I followed Akhil suggestion with no success. Then, I tried again replacing
> S3 by HDFS and the job seems to work properly.
> TD, I'm not using speculative execution.
>
> I think I've just realized what is happening. Due to S3 eventual
> consistency, these temporary files sometimes are found, sometimes they are
> not. I confirmed this hypothesis via s3cmd.
> So, I come up with two questions/suggestions:
>
> 1. Does Spark support these temporary files to be written on HDFS and my
> final output on S3?
> 2. What do you think about adding a property like 'spark.s3.maxRetries' that
> determines the number of retries before assuming that a file is indeed not
> found? I can contribute with this patch if you want.
> (Hadoop already have a similar property 'fs.s3.maxRetries', but for
> IOException and S3Exception.)
>
> Thanks again and I look forward for your comments,
>
> --
> Flávio R. Santos
>
> Chaordic | Platform
> www.chaordic.com.br
> +55 48 3232.3200
>
> On Thu, Dec 11, 2014 at 10:03 AM, Tathagata Das
> <ta...@gmail.com> wrote:
>>
>> Following Gerard's thoughts, here are possible things that could be
>> happening.
>>
>> 1. Is there another process in the background that is deleting files
>> in the directory where you are trying to write? Seems like the
>> temporary file generated by one of the tasks is getting delete before
>> it is renamed to the final output file. I suggest trying to not write
>> to S3, rather just count and print (with rest of the computation
>> staying exactly same) and see if the error still occurs. That would
>> narrow down the culprit to what Gerard suggested.
>> 2. Do you have speculative execution turned on? If so, could you turn
>> it off and try?
>>
>> TD
>>
>> On Thu, Dec 11, 2014 at 1:42 AM, Gerard Maas <ge...@gmail.com>
>> wrote:
>> > If the timestamps in the logs are to be trusted It looks like your
>> > driver is
>> > dying with that java.io.FileNotFoundException: and therefore the workers
>> > loose their connection and close down.
>> >
>> > -kr, Gerard.
>> >
>> > On Thu, Dec 11, 2014 at 7:39 AM, Akhil Das <ak...@sigmoidanalytics.com>
>> > wrote:
>> >>
>> >> Try to add the following to the sparkConf
>> >>
>> >>  .set("spark.core.connection.ack.wait.timeout","6000")
>> >>
>> >>       .set("spark.akka.frameSize","60")
>> >>
>> >> Used to face that issue with spark 1.1.0
>> >>
>> >> Thanks
>> >> Best Regards
>> >>
>> >> On Thu, Dec 11, 2014 at 2:14 AM, Flávio Santos
>> >> <ba...@chaordicsystems.com> wrote:
>> >>>
>> >>> Dear Spark'ers,
>> >>>
>> >>> I'm trying to run a simple job using Spark Streaming (version 1.1.1)
>> >>> and
>> >>> YARN (yarn-cluster), but unfortunately I'm facing many issues. In
>> >>> short, my
>> >>> job does the following:
>> >>> - Consumes a specific Kafka topic
>> >>> - Writes its content to S3 or HDFS
>> >>>
>> >>> Records in Kafka are in the form:
>> >>> {"key": "someString"}
>> >>>
>> >>> This is important because I use the value of "key" to define the
>> >>> output
>> >>> file name in S3.
>> >>> Here are the Spark and Kafka parameters I'm using:
>> >>>
>> >>>> val sparkConf = new SparkConf()
>> >>>>   .setAppName("MyDumperApp")
>> >>>>   .set("spark.task.maxFailures", "100")
>> >>>>   .set("spark.hadoop.validateOutputSpecs", "false")
>> >>>>   .set("spark.serializer",
>> >>>> "org.apache.spark.serializer.KryoSerializer")
>> >>>>   .set("spark.executor.extraJavaOptions", "-XX:+UseCompressedOops")
>> >>>> val kafkaParams = Map(
>> >>>>   "zookeeper.connect" -> zkQuorum,
>> >>>>   "zookeeper.session.timeout.ms" -> "10000",
>> >>>>   "rebalance.backoff.ms" -> "8000",
>> >>>>   "rebalance.max.retries" -> "10",
>> >>>>   "group.id" -> group,
>> >>>>   "auto.offset.reset" -> "largest"
>> >>>> )
>> >>>
>> >>>
>> >>> My application is the following:
>> >>>
>> >>>> KafkaUtils.createStream[String, String, StringDecoder,
>> >>>> StringDecoder](ssc, kafkaParams, Map(topic -> 1),
>> >>>> StorageLevel.MEMORY_AND_DISK_SER_2)
>> >>>>   .foreachRDD((rdd, time) =>
>> >>>>     rdd.map {
>> >>>>       case (_, line) =>
>> >>>>         val json = parse(line)
>> >>>>         val key = extract(json, "key").getOrElse("key_not_found")
>> >>>>         (key, dateFormatter.format(time.milliseconds)) -> line
>> >>>>     }
>> >>>>       .partitionBy(new HashPartitioner(10))
>> >>>>       .saveAsHadoopFile[KeyBasedOutput[(String,String),
>> >>>> String]]("s3://BUCKET", classOf[BZip2Codec])
>> >>>>   )
>> >>>
>> >>>
>> >>> And the last piece:
>> >>>
>> >>>> class KeyBasedOutput[T >: Null, V <: AnyRef] extends
>> >>>> MultipleTextOutputFormat[T , V] {
>> >>>>   override protected def generateFileNameForKeyValue(key: T, value:
>> >>>> V,
>> >>>> leaf: String) = key match {
>> >>>>     case (myKey, batchId) =>
>> >>>>       "somedir" + "/" + myKey + "/" +
>> >>>>         "prefix-" + myKey + "_" + batchId + "_" + leaf
>> >>>>   }
>> >>>>   override protected def generateActualKey(key: T, value: V) = null
>> >>>> }
>> >>>
>> >>>
>> >>> I use batch sizes of 5 minutes with checkpoints activated.
>> >>> The job fails nondeterministically (I think it never ran longer than
>> >>> ~5
>> >>> hours). I have no clue why, it simply fails.
>> >>> Please find below the exceptions thrown by my application.
>> >>>
>> >>> I really appreciate any kind of hint.
>> >>> Thank you very much in advance.
>> >>>
>> >>> Regards,
>> >>> -- Flávio
>> >>>
>> >>> ==== Executor 1
>> >>>
>> >>> 2014-12-10 19:05:15,150 INFO  [handle-read-write-executor-3]
>> >>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
>> >>> SendingConnection
>> >>>  to ConnectionManagerId(ec2-DRIVER.compute-1.amazonaws.com,49163)
>> >>> 2014-12-10 19:05:15,201 INFO  [Thread-6] storage.MemoryStore
>> >>> (Logging.scala:logInfo(59)) - ensureFreeSpace(98665) called with
>> >>> curMem=194463488,
>> >>>  maxMem=4445479895
>> >>> 2014-12-10 19:05:15,202 INFO  [Thread-6] storage.MemoryStore
>> >>> (Logging.scala:logInfo(59)) - Block input-0-1418238315000 stored as
>> >>> bytes in
>> >>> memor
>> >>> y (estimated size 96.4 KB, free 4.0 GB)
>> >>> 2014-12-10 19:05:16,506 INFO  [handle-read-write-executor-2]
>> >>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
>> >>> ReceivingConnecti
>> >>> on to ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,51443)
>> >>> 2014-12-10 19:05:16,506 INFO  [handle-read-write-executor-1]
>> >>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
>> >>> SendingConnection
>> >>>  to ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,51443)
>> >>> 2014-12-10 19:05:16,506 INFO  [handle-read-write-executor-2]
>> >>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
>> >>> SendingConnection
>> >>>  to ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,51443)
>> >>> 2014-12-10 19:05:16,649 INFO  [handle-read-write-executor-0]
>> >>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
>> >>> ReceivingConnecti
>> >>> on to ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,39644)
>> >>> 2014-12-10 19:05:16,649 INFO  [handle-read-write-executor-3]
>> >>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
>> >>> SendingConnection
>> >>>  to ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,39644)
>> >>> 2014-12-10 19:05:16,649 INFO  [handle-read-write-executor-0]
>> >>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
>> >>> SendingConnection
>> >>>  to ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,39644)
>> >>> 2014-12-10 19:05:16,650 INFO  [connection-manager-thread]
>> >>> network.ConnectionManager (Logging.scala:logInfo(59)) - Key not valid
>> >>> ?
>> >>> sun.nio.ch.Se
>> >>> lectionKeyImpl@da2e041
>> >>> 2014-12-10 19:05:16,651 INFO  [connection-manager-thread]
>> >>> network.ConnectionManager (Logging.scala:logInfo(80)) - key already
>> >>> cancelled ? sun.n
>> >>> io.ch.SelectionKeyImpl@da2e041
>> >>> java.nio.channels.CancelledKeyException
>> >>>         at
>> >>>
>> >>> org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:316)
>> >>>         at
>> >>>
>> >>> org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:145)
>> >>> 2014-12-10 19:05:16,679 INFO  [handle-read-write-executor-1]
>> >>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
>> >>> ReceivingConnection to
>> >>> ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,39444)
>> >>> 2014-12-10 19:05:16,679 INFO  [handle-read-write-executor-2]
>> >>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
>> >>> SendingConnection to
>> >>> ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,39444)
>> >>> 2014-12-10 19:05:16,679 INFO  [handle-read-write-executor-1]
>> >>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
>> >>> SendingConnection to
>> >>> ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,39444)
>> >>> 2014-12-10 19:05:16,680 INFO  [connection-manager-thread]
>> >>> network.ConnectionManager (Logging.scala:logInfo(59)) - Key not valid
>> >>> ?
>> >>> sun.nio.ch.SelectionKeyImpl@6a0dd98a
>> >>> 2014-12-10 19:05:16,681 INFO  [connection-manager-thread]
>> >>> network.ConnectionManager (Logging.scala:logInfo(80)) - key already
>> >>> cancelled ? sun.nio.ch.SelectionKeyImpl@6a0dd98a
>> >>> java.nio.channels.CancelledKeyException
>> >>>         at
>> >>>
>> >>> org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:392)
>> >>>         at
>> >>>
>> >>> org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:145)
>> >>> 2014-12-10 19:05:16,717 INFO  [handle-read-write-executor-3]
>> >>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
>> >>> ReceivingConnection to
>> >>> ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,57984)
>> >>> 2014-12-10 19:05:16,717 INFO  [handle-read-write-executor-0]
>> >>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
>> >>> SendingConnection to
>> >>> ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,57984)
>> >>> 2014-12-10 19:05:16,717 INFO  [handle-read-write-executor-3]
>> >>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
>> >>> SendingConnection to
>> >>> ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,57984)
>> >>> 2014-12-10 19:05:17,182 ERROR [SIGTERM handler]
>> >>> executor.CoarseGrainedExecutorBackend (SignalLogger.scala:handle(57))
>> >>> -
>> >>> RECEIVED SIGNAL 15: SIGTERM
>> >>>
>> >>> ==== Executor 2
>> >>>
>> >>> 2014-12-10 19:05:15,010 INFO  [handle-message-executor-11]
>> >>> storage.BlockManagerMaster (Logging.scala:logInfo(59)) - Updated info
>> >>> of
>> >>> block input
>> >>> -0-1418238314800
>> >>> 2014-12-10 19:05:15,157 INFO  [connection-manager-thread]
>> >>> network.ConnectionManager (Logging.scala:logInfo(59)) - Key not valid
>> >>> ?
>> >>> sun.nio.ch.Se
>> >>> lectionKeyImpl@66ea19c
>> >>> 2014-12-10 19:05:15,157 INFO  [handle-read-write-executor-2]
>> >>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
>> >>> SendingConnection
>> >>>  to ConnectionManagerId(ec2-DRIVER.compute-1.amazonaws.com,49163)
>> >>> 2014-12-10 19:05:15,157 INFO  [handle-read-write-executor-0]
>> >>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
>> >>> ReceivingConnecti
>> >>> on to ConnectionManagerId(ec2-DRIVER.compute-1.amazonaws.com,49163)
>> >>> 2014-12-10 19:05:15,158 ERROR [handle-read-write-executor-0]
>> >>> network.ConnectionManager (Logging.scala:logError(75)) - Corresponding
>> >>> SendingConn
>> >>> ection to
>> >>> ConnectionManagerId(ec2-DRIVER.compute-1.amazonaws.com,49163)
>> >>> not found
>> >>> 2014-12-10 19:05:15,158 INFO  [connection-manager-thread]
>> >>> network.ConnectionManager (Logging.scala:logInfo(80)) - key already
>> >>> cancelled ? sun.n
>> >>> io.ch.SelectionKeyImpl@66ea19c
>> >>> java.nio.channels.CancelledKeyException
>> >>>         at
>> >>>
>> >>> org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:392)
>> >>>         at
>> >>>
>> >>> org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:145)
>> >>>
>> >>> ==== Driver
>> >>>
>> >>> 2014-12-10 19:05:13,805 INFO
>> >>> [sparkDriver-akka.actor.default-dispatcher-13]
>> >>> storage.BlockManagerInfo
>> >>> (Logging.scala:logInfo(59)) - Added input
>> >>> -0-1418238313600 in memory on
>> >>> ec2-EXECUTOR.compute-1.amazonaws.com:39444
>> >>> (size: 38.2 KB, free: 4.1 GB)
>> >>> 2014-12-10 19:05:13,823 ERROR
>> >>> [sparkDriver-akka.actor.default-dispatcher-16] scheduler.JobScheduler
>> >>> (Logging.scala:logError(96)) - Error runnin
>> >>> g job streaming job 1418238300000 ms.0
>> >>> java.io.FileNotFoundException: File
>> >>> s3n://BUCKET/_temporary/0/task_201412101900_0039_m_000033 does not
>> >>> exist.
>> >>>         at
>> >>>
>> >>> org.apache.hadoop.fs.s3native.NativeS3FileSystem.listStatus(NativeS3FileSystem.java:506)
>> >>>         at
>> >>>
>> >>> org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:360)
>> >>>         at
>> >>>
>> >>> org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:310)
>> >>>         at
>> >>>
>> >>> org.apache.hadoop.mapred.FileOutputCommitter.commitJob(FileOutputCommitter.java:136)
>> >>>         at
>> >>>
>> >>> org.apache.spark.SparkHadoopWriter.commitJob(SparkHadoopWriter.scala:126)
>> >>>         at
>> >>>
>> >>> org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:995)
>> >>>         at
>> >>>
>> >>> org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:878)
>> >>>         at
>> >>>
>> >>> org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:845)
>> >>>         at
>> >>>
>> >>> org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:803)
>> >>>         at
>> >>> MyDumperClass$$anonfun$main$1.apply(IncrementalDumpsJenkins.scala:100)
>> >>>         at
>> >>> MyDumperClass$$anonfun$main$1.apply(IncrementalDumpsJenkins.scala:79)
>> >>>         at
>> >>>
>> >>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:41)
>> >>>         at
>> >>>
>> >>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
>> >>>         at
>> >>>
>> >>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
>> >>>         at scala.util.Try$.apply(Try.scala:161)
>> >>>         at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32)
>> >>>         at
>> >>>
>> >>> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:172)
>> >>>         at
>> >>>
>> >>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>> >>>         at
>> >>>
>> >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>> >>>         at java.lang.Thread.run(Thread.java:745)
>> >>> 2014-12-10 19:05:13,829 INFO  [Driver] yarn.ApplicationMaster
>> >>> (Logging.scala:logInfo(59)) - Unregistering ApplicationMaster with
>> >>> FAILED
>> >>>
>> >>> --
>> >>> Flávio R. Santos
>> >>>
>> >>> Chaordic | Platform
>> >>> www.chaordic.com.br
>> >>> +55 48 3232.3200
>> >>
>> >>
>> >
>
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Key not valid / already cancelled using Spark Streaming

Posted by Flávio Santos <ba...@chaordicsystems.com>.
Hello guys,

Thank you for your prompt reply.
I followed Akhil suggestion with no success. Then, I tried again replacing
S3 by HDFS and the job seems to work properly.
TD, I'm not using speculative execution.

I think I've just realized what is happening. Due to S3 eventual
consistency, these temporary files sometimes are found, sometimes they are
not. I confirmed this hypothesis via s3cmd.
So, I come up with two questions/suggestions:

1. Does Spark support these temporary files to be written on HDFS and my
final output on S3?
2. What do you think about adding a property like 'spark.s3.maxRetries'
that determines the number of retries before assuming that a file is indeed
not found? I can contribute with this patch if you want.
(Hadoop already have a similar property 'fs.s3.maxRetries', but for
IOException and S3Exception.)

Thanks again and I look forward for your comments,


*--Flávio R. Santos*

Chaordic | *Platform*
*www.chaordic.com.br <http://www.chaordic.com.br/>*
+55 48 3232.3200

On Thu, Dec 11, 2014 at 10:03 AM, Tathagata Das <tathagata.das1565@gmail.com
> wrote:

> Following Gerard's thoughts, here are possible things that could be
> happening.
>
> 1. Is there another process in the background that is deleting files
> in the directory where you are trying to write? Seems like the
> temporary file generated by one of the tasks is getting delete before
> it is renamed to the final output file. I suggest trying to not write
> to S3, rather just count and print (with rest of the computation
> staying exactly same) and see if the error still occurs. That would
> narrow down the culprit to what Gerard suggested.
> 2. Do you have speculative execution turned on? If so, could you turn
> it off and try?
>
> TD
>
> On Thu, Dec 11, 2014 at 1:42 AM, Gerard Maas <ge...@gmail.com>
> wrote:
> > If the timestamps in the logs are to be trusted It looks like your
> driver is
> > dying with that java.io.FileNotFoundException: and therefore the workers
> > loose their connection and close down.
> >
> > -kr, Gerard.
> >
> > On Thu, Dec 11, 2014 at 7:39 AM, Akhil Das <ak...@sigmoidanalytics.com>
> > wrote:
> >>
> >> Try to add the following to the sparkConf
> >>
> >>  .set("spark.core.connection.ack.wait.timeout","6000")
> >>
> >>       .set("spark.akka.frameSize","60")
> >>
> >> Used to face that issue with spark 1.1.0
> >>
> >> Thanks
> >> Best Regards
> >>
> >> On Thu, Dec 11, 2014 at 2:14 AM, Flávio Santos
> >> <ba...@chaordicsystems.com> wrote:
> >>>
> >>> Dear Spark'ers,
> >>>
> >>> I'm trying to run a simple job using Spark Streaming (version 1.1.1)
> and
> >>> YARN (yarn-cluster), but unfortunately I'm facing many issues. In
> short, my
> >>> job does the following:
> >>> - Consumes a specific Kafka topic
> >>> - Writes its content to S3 or HDFS
> >>>
> >>> Records in Kafka are in the form:
> >>> {"key": "someString"}
> >>>
> >>> This is important because I use the value of "key" to define the output
> >>> file name in S3.
> >>> Here are the Spark and Kafka parameters I'm using:
> >>>
> >>>> val sparkConf = new SparkConf()
> >>>>   .setAppName("MyDumperApp")
> >>>>   .set("spark.task.maxFailures", "100")
> >>>>   .set("spark.hadoop.validateOutputSpecs", "false")
> >>>>   .set("spark.serializer",
> "org.apache.spark.serializer.KryoSerializer")
> >>>>   .set("spark.executor.extraJavaOptions", "-XX:+UseCompressedOops")
> >>>> val kafkaParams = Map(
> >>>>   "zookeeper.connect" -> zkQuorum,
> >>>>   "zookeeper.session.timeout.ms" -> "10000",
> >>>>   "rebalance.backoff.ms" -> "8000",
> >>>>   "rebalance.max.retries" -> "10",
> >>>>   "group.id" -> group,
> >>>>   "auto.offset.reset" -> "largest"
> >>>> )
> >>>
> >>>
> >>> My application is the following:
> >>>
> >>>> KafkaUtils.createStream[String, String, StringDecoder,
> >>>> StringDecoder](ssc, kafkaParams, Map(topic -> 1),
> >>>> StorageLevel.MEMORY_AND_DISK_SER_2)
> >>>>   .foreachRDD((rdd, time) =>
> >>>>     rdd.map {
> >>>>       case (_, line) =>
> >>>>         val json = parse(line)
> >>>>         val key = extract(json, "key").getOrElse("key_not_found")
> >>>>         (key, dateFormatter.format(time.milliseconds)) -> line
> >>>>     }
> >>>>       .partitionBy(new HashPartitioner(10))
> >>>>       .saveAsHadoopFile[KeyBasedOutput[(String,String),
> >>>> String]]("s3://BUCKET", classOf[BZip2Codec])
> >>>>   )
> >>>
> >>>
> >>> And the last piece:
> >>>
> >>>> class KeyBasedOutput[T >: Null, V <: AnyRef] extends
> >>>> MultipleTextOutputFormat[T , V] {
> >>>>   override protected def generateFileNameForKeyValue(key: T, value: V,
> >>>> leaf: String) = key match {
> >>>>     case (myKey, batchId) =>
> >>>>       "somedir" + "/" + myKey + "/" +
> >>>>         "prefix-" + myKey + "_" + batchId + "_" + leaf
> >>>>   }
> >>>>   override protected def generateActualKey(key: T, value: V) = null
> >>>> }
> >>>
> >>>
> >>> I use batch sizes of 5 minutes with checkpoints activated.
> >>> The job fails nondeterministically (I think it never ran longer than ~5
> >>> hours). I have no clue why, it simply fails.
> >>> Please find below the exceptions thrown by my application.
> >>>
> >>> I really appreciate any kind of hint.
> >>> Thank you very much in advance.
> >>>
> >>> Regards,
> >>> -- Flávio
> >>>
> >>> ==== Executor 1
> >>>
> >>> 2014-12-10 19:05:15,150 INFO  [handle-read-write-executor-3]
> >>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
> >>> SendingConnection
> >>>  to ConnectionManagerId(ec2-DRIVER.compute-1.amazonaws.com,49163)
> >>> 2014-12-10 19:05:15,201 INFO  [Thread-6] storage.MemoryStore
> >>> (Logging.scala:logInfo(59)) - ensureFreeSpace(98665) called with
> >>> curMem=194463488,
> >>>  maxMem=4445479895
> >>> 2014-12-10 19:05:15,202 INFO  [Thread-6] storage.MemoryStore
> >>> (Logging.scala:logInfo(59)) - Block input-0-1418238315000 stored as
> bytes in
> >>> memor
> >>> y (estimated size 96.4 KB, free 4.0 GB)
> >>> 2014-12-10 19:05:16,506 INFO  [handle-read-write-executor-2]
> >>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
> >>> ReceivingConnecti
> >>> on to ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,51443)
> >>> 2014-12-10 19:05:16,506 INFO  [handle-read-write-executor-1]
> >>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
> >>> SendingConnection
> >>>  to ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,51443)
> >>> 2014-12-10 19:05:16,506 INFO  [handle-read-write-executor-2]
> >>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
> >>> SendingConnection
> >>>  to ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,51443)
> >>> 2014-12-10 19:05:16,649 INFO  [handle-read-write-executor-0]
> >>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
> >>> ReceivingConnecti
> >>> on to ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,39644)
> >>> 2014-12-10 19:05:16,649 INFO  [handle-read-write-executor-3]
> >>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
> >>> SendingConnection
> >>>  to ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,39644)
> >>> 2014-12-10 19:05:16,649 INFO  [handle-read-write-executor-0]
> >>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
> >>> SendingConnection
> >>>  to ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,39644)
> >>> 2014-12-10 19:05:16,650 INFO  [connection-manager-thread]
> >>> network.ConnectionManager (Logging.scala:logInfo(59)) - Key not valid ?
> >>> sun.nio.ch.Se
> >>> lectionKeyImpl@da2e041
> >>> 2014-12-10 19:05:16,651 INFO  [connection-manager-thread]
> >>> network.ConnectionManager (Logging.scala:logInfo(80)) - key already
> >>> cancelled ? sun.n
> >>> io.ch.SelectionKeyImpl@da2e041
> >>> java.nio.channels.CancelledKeyException
> >>>         at
> >>>
> org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:316)
> >>>         at
> >>>
> org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:145)
> >>> 2014-12-10 19:05:16,679 INFO  [handle-read-write-executor-1]
> >>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
> >>> ReceivingConnection to
> >>> ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,39444)
> >>> 2014-12-10 19:05:16,679 INFO  [handle-read-write-executor-2]
> >>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
> >>> SendingConnection to
> >>> ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,39444)
> >>> 2014-12-10 19:05:16,679 INFO  [handle-read-write-executor-1]
> >>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
> >>> SendingConnection to
> >>> ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,39444)
> >>> 2014-12-10 19:05:16,680 INFO  [connection-manager-thread]
> >>> network.ConnectionManager (Logging.scala:logInfo(59)) - Key not valid ?
> >>> sun.nio.ch.SelectionKeyImpl@6a0dd98a
> >>> 2014-12-10 19:05:16,681 INFO  [connection-manager-thread]
> >>> network.ConnectionManager (Logging.scala:logInfo(80)) - key already
> >>> cancelled ? sun.nio.ch.SelectionKeyImpl@6a0dd98a
> >>> java.nio.channels.CancelledKeyException
> >>>         at
> >>>
> org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:392)
> >>>         at
> >>>
> org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:145)
> >>> 2014-12-10 19:05:16,717 INFO  [handle-read-write-executor-3]
> >>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
> >>> ReceivingConnection to
> >>> ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,57984)
> >>> 2014-12-10 19:05:16,717 INFO  [handle-read-write-executor-0]
> >>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
> >>> SendingConnection to
> >>> ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,57984)
> >>> 2014-12-10 19:05:16,717 INFO  [handle-read-write-executor-3]
> >>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
> >>> SendingConnection to
> >>> ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,57984)
> >>> 2014-12-10 19:05:17,182 ERROR [SIGTERM handler]
> >>> executor.CoarseGrainedExecutorBackend (SignalLogger.scala:handle(57)) -
> >>> RECEIVED SIGNAL 15: SIGTERM
> >>>
> >>> ==== Executor 2
> >>>
> >>> 2014-12-10 19:05:15,010 INFO  [handle-message-executor-11]
> >>> storage.BlockManagerMaster (Logging.scala:logInfo(59)) - Updated info
> of
> >>> block input
> >>> -0-1418238314800
> >>> 2014-12-10 19:05:15,157 INFO  [connection-manager-thread]
> >>> network.ConnectionManager (Logging.scala:logInfo(59)) - Key not valid ?
> >>> sun.nio.ch.Se
> >>> lectionKeyImpl@66ea19c
> >>> 2014-12-10 19:05:15,157 INFO  [handle-read-write-executor-2]
> >>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
> >>> SendingConnection
> >>>  to ConnectionManagerId(ec2-DRIVER.compute-1.amazonaws.com,49163)
> >>> 2014-12-10 19:05:15,157 INFO  [handle-read-write-executor-0]
> >>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
> >>> ReceivingConnecti
> >>> on to ConnectionManagerId(ec2-DRIVER.compute-1.amazonaws.com,49163)
> >>> 2014-12-10 19:05:15,158 ERROR [handle-read-write-executor-0]
> >>> network.ConnectionManager (Logging.scala:logError(75)) - Corresponding
> >>> SendingConn
> >>> ection to ConnectionManagerId(ec2-DRIVER.compute-1.amazonaws.com
> ,49163)
> >>> not found
> >>> 2014-12-10 19:05:15,158 INFO  [connection-manager-thread]
> >>> network.ConnectionManager (Logging.scala:logInfo(80)) - key already
> >>> cancelled ? sun.n
> >>> io.ch.SelectionKeyImpl@66ea19c
> >>> java.nio.channels.CancelledKeyException
> >>>         at
> >>>
> org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:392)
> >>>         at
> >>>
> org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:145)
> >>>
> >>> ==== Driver
> >>>
> >>> 2014-12-10 19:05:13,805 INFO
> >>> [sparkDriver-akka.actor.default-dispatcher-13] storage.BlockManagerInfo
> >>> (Logging.scala:logInfo(59)) - Added input
> >>> -0-1418238313600 in memory on
> ec2-EXECUTOR.compute-1.amazonaws.com:39444
> >>> (size: 38.2 KB, free: 4.1 GB)
> >>> 2014-12-10 19:05:13,823 ERROR
> >>> [sparkDriver-akka.actor.default-dispatcher-16] scheduler.JobScheduler
> >>> (Logging.scala:logError(96)) - Error runnin
> >>> g job streaming job 1418238300000 ms.0
> >>> java.io.FileNotFoundException: File
> >>> s3n://BUCKET/_temporary/0/task_201412101900_0039_m_000033 does not
> exist.
> >>>         at
> >>>
> org.apache.hadoop.fs.s3native.NativeS3FileSystem.listStatus(NativeS3FileSystem.java:506)
> >>>         at
> >>>
> org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:360)
> >>>         at
> >>>
> org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:310)
> >>>         at
> >>>
> org.apache.hadoop.mapred.FileOutputCommitter.commitJob(FileOutputCommitter.java:136)
> >>>         at
> >>>
> org.apache.spark.SparkHadoopWriter.commitJob(SparkHadoopWriter.scala:126)
> >>>         at
> >>>
> org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:995)
> >>>         at
> >>>
> org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:878)
> >>>         at
> >>>
> org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:845)
> >>>         at
> >>>
> org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:803)
> >>>         at
> >>> MyDumperClass$$anonfun$main$1.apply(IncrementalDumpsJenkins.scala:100)
> >>>         at
> >>> MyDumperClass$$anonfun$main$1.apply(IncrementalDumpsJenkins.scala:79)
> >>>         at
> >>>
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:41)
> >>>         at
> >>>
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
> >>>         at
> >>>
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
> >>>         at scala.util.Try$.apply(Try.scala:161)
> >>>         at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32)
> >>>         at
> >>>
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:172)
> >>>         at
> >>>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> >>>         at
> >>>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> >>>         at java.lang.Thread.run(Thread.java:745)
> >>> 2014-12-10 19:05:13,829 INFO  [Driver] yarn.ApplicationMaster
> >>> (Logging.scala:logInfo(59)) - Unregistering ApplicationMaster with
> FAILED
> >>>
> >>> --
> >>> Flávio R. Santos
> >>>
> >>> Chaordic | Platform
> >>> www.chaordic.com.br
> >>> +55 48 3232.3200
> >>
> >>
> >
>

Re: Key not valid / already cancelled using Spark Streaming

Posted by Tathagata Das <ta...@gmail.com>.
Following Gerard's thoughts, here are possible things that could be happening.

1. Is there another process in the background that is deleting files
in the directory where you are trying to write? Seems like the
temporary file generated by one of the tasks is getting delete before
it is renamed to the final output file. I suggest trying to not write
to S3, rather just count and print (with rest of the computation
staying exactly same) and see if the error still occurs. That would
narrow down the culprit to what Gerard suggested.
2. Do you have speculative execution turned on? If so, could you turn
it off and try?

TD

On Thu, Dec 11, 2014 at 1:42 AM, Gerard Maas <ge...@gmail.com> wrote:
> If the timestamps in the logs are to be trusted It looks like your driver is
> dying with that java.io.FileNotFoundException: and therefore the workers
> loose their connection and close down.
>
> -kr, Gerard.
>
> On Thu, Dec 11, 2014 at 7:39 AM, Akhil Das <ak...@sigmoidanalytics.com>
> wrote:
>>
>> Try to add the following to the sparkConf
>>
>>  .set("spark.core.connection.ack.wait.timeout","6000")
>>
>>       .set("spark.akka.frameSize","60")
>>
>> Used to face that issue with spark 1.1.0
>>
>> Thanks
>> Best Regards
>>
>> On Thu, Dec 11, 2014 at 2:14 AM, Flávio Santos
>> <ba...@chaordicsystems.com> wrote:
>>>
>>> Dear Spark'ers,
>>>
>>> I'm trying to run a simple job using Spark Streaming (version 1.1.1) and
>>> YARN (yarn-cluster), but unfortunately I'm facing many issues. In short, my
>>> job does the following:
>>> - Consumes a specific Kafka topic
>>> - Writes its content to S3 or HDFS
>>>
>>> Records in Kafka are in the form:
>>> {"key": "someString"}
>>>
>>> This is important because I use the value of "key" to define the output
>>> file name in S3.
>>> Here are the Spark and Kafka parameters I'm using:
>>>
>>>> val sparkConf = new SparkConf()
>>>>   .setAppName("MyDumperApp")
>>>>   .set("spark.task.maxFailures", "100")
>>>>   .set("spark.hadoop.validateOutputSpecs", "false")
>>>>   .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
>>>>   .set("spark.executor.extraJavaOptions", "-XX:+UseCompressedOops")
>>>> val kafkaParams = Map(
>>>>   "zookeeper.connect" -> zkQuorum,
>>>>   "zookeeper.session.timeout.ms" -> "10000",
>>>>   "rebalance.backoff.ms" -> "8000",
>>>>   "rebalance.max.retries" -> "10",
>>>>   "group.id" -> group,
>>>>   "auto.offset.reset" -> "largest"
>>>> )
>>>
>>>
>>> My application is the following:
>>>
>>>> KafkaUtils.createStream[String, String, StringDecoder,
>>>> StringDecoder](ssc, kafkaParams, Map(topic -> 1),
>>>> StorageLevel.MEMORY_AND_DISK_SER_2)
>>>>   .foreachRDD((rdd, time) =>
>>>>     rdd.map {
>>>>       case (_, line) =>
>>>>         val json = parse(line)
>>>>         val key = extract(json, "key").getOrElse("key_not_found")
>>>>         (key, dateFormatter.format(time.milliseconds)) -> line
>>>>     }
>>>>       .partitionBy(new HashPartitioner(10))
>>>>       .saveAsHadoopFile[KeyBasedOutput[(String,String),
>>>> String]]("s3://BUCKET", classOf[BZip2Codec])
>>>>   )
>>>
>>>
>>> And the last piece:
>>>
>>>> class KeyBasedOutput[T >: Null, V <: AnyRef] extends
>>>> MultipleTextOutputFormat[T , V] {
>>>>   override protected def generateFileNameForKeyValue(key: T, value: V,
>>>> leaf: String) = key match {
>>>>     case (myKey, batchId) =>
>>>>       "somedir" + "/" + myKey + "/" +
>>>>         "prefix-" + myKey + "_" + batchId + "_" + leaf
>>>>   }
>>>>   override protected def generateActualKey(key: T, value: V) = null
>>>> }
>>>
>>>
>>> I use batch sizes of 5 minutes with checkpoints activated.
>>> The job fails nondeterministically (I think it never ran longer than ~5
>>> hours). I have no clue why, it simply fails.
>>> Please find below the exceptions thrown by my application.
>>>
>>> I really appreciate any kind of hint.
>>> Thank you very much in advance.
>>>
>>> Regards,
>>> -- Flávio
>>>
>>> ==== Executor 1
>>>
>>> 2014-12-10 19:05:15,150 INFO  [handle-read-write-executor-3]
>>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
>>> SendingConnection
>>>  to ConnectionManagerId(ec2-DRIVER.compute-1.amazonaws.com,49163)
>>> 2014-12-10 19:05:15,201 INFO  [Thread-6] storage.MemoryStore
>>> (Logging.scala:logInfo(59)) - ensureFreeSpace(98665) called with
>>> curMem=194463488,
>>>  maxMem=4445479895
>>> 2014-12-10 19:05:15,202 INFO  [Thread-6] storage.MemoryStore
>>> (Logging.scala:logInfo(59)) - Block input-0-1418238315000 stored as bytes in
>>> memor
>>> y (estimated size 96.4 KB, free 4.0 GB)
>>> 2014-12-10 19:05:16,506 INFO  [handle-read-write-executor-2]
>>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
>>> ReceivingConnecti
>>> on to ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,51443)
>>> 2014-12-10 19:05:16,506 INFO  [handle-read-write-executor-1]
>>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
>>> SendingConnection
>>>  to ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,51443)
>>> 2014-12-10 19:05:16,506 INFO  [handle-read-write-executor-2]
>>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
>>> SendingConnection
>>>  to ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,51443)
>>> 2014-12-10 19:05:16,649 INFO  [handle-read-write-executor-0]
>>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
>>> ReceivingConnecti
>>> on to ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,39644)
>>> 2014-12-10 19:05:16,649 INFO  [handle-read-write-executor-3]
>>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
>>> SendingConnection
>>>  to ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,39644)
>>> 2014-12-10 19:05:16,649 INFO  [handle-read-write-executor-0]
>>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
>>> SendingConnection
>>>  to ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,39644)
>>> 2014-12-10 19:05:16,650 INFO  [connection-manager-thread]
>>> network.ConnectionManager (Logging.scala:logInfo(59)) - Key not valid ?
>>> sun.nio.ch.Se
>>> lectionKeyImpl@da2e041
>>> 2014-12-10 19:05:16,651 INFO  [connection-manager-thread]
>>> network.ConnectionManager (Logging.scala:logInfo(80)) - key already
>>> cancelled ? sun.n
>>> io.ch.SelectionKeyImpl@da2e041
>>> java.nio.channels.CancelledKeyException
>>>         at
>>> org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:316)
>>>         at
>>> org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:145)
>>> 2014-12-10 19:05:16,679 INFO  [handle-read-write-executor-1]
>>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
>>> ReceivingConnection to
>>> ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,39444)
>>> 2014-12-10 19:05:16,679 INFO  [handle-read-write-executor-2]
>>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
>>> SendingConnection to
>>> ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,39444)
>>> 2014-12-10 19:05:16,679 INFO  [handle-read-write-executor-1]
>>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
>>> SendingConnection to
>>> ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,39444)
>>> 2014-12-10 19:05:16,680 INFO  [connection-manager-thread]
>>> network.ConnectionManager (Logging.scala:logInfo(59)) - Key not valid ?
>>> sun.nio.ch.SelectionKeyImpl@6a0dd98a
>>> 2014-12-10 19:05:16,681 INFO  [connection-manager-thread]
>>> network.ConnectionManager (Logging.scala:logInfo(80)) - key already
>>> cancelled ? sun.nio.ch.SelectionKeyImpl@6a0dd98a
>>> java.nio.channels.CancelledKeyException
>>>         at
>>> org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:392)
>>>         at
>>> org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:145)
>>> 2014-12-10 19:05:16,717 INFO  [handle-read-write-executor-3]
>>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
>>> ReceivingConnection to
>>> ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,57984)
>>> 2014-12-10 19:05:16,717 INFO  [handle-read-write-executor-0]
>>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
>>> SendingConnection to
>>> ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,57984)
>>> 2014-12-10 19:05:16,717 INFO  [handle-read-write-executor-3]
>>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
>>> SendingConnection to
>>> ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,57984)
>>> 2014-12-10 19:05:17,182 ERROR [SIGTERM handler]
>>> executor.CoarseGrainedExecutorBackend (SignalLogger.scala:handle(57)) -
>>> RECEIVED SIGNAL 15: SIGTERM
>>>
>>> ==== Executor 2
>>>
>>> 2014-12-10 19:05:15,010 INFO  [handle-message-executor-11]
>>> storage.BlockManagerMaster (Logging.scala:logInfo(59)) - Updated info of
>>> block input
>>> -0-1418238314800
>>> 2014-12-10 19:05:15,157 INFO  [connection-manager-thread]
>>> network.ConnectionManager (Logging.scala:logInfo(59)) - Key not valid ?
>>> sun.nio.ch.Se
>>> lectionKeyImpl@66ea19c
>>> 2014-12-10 19:05:15,157 INFO  [handle-read-write-executor-2]
>>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
>>> SendingConnection
>>>  to ConnectionManagerId(ec2-DRIVER.compute-1.amazonaws.com,49163)
>>> 2014-12-10 19:05:15,157 INFO  [handle-read-write-executor-0]
>>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
>>> ReceivingConnecti
>>> on to ConnectionManagerId(ec2-DRIVER.compute-1.amazonaws.com,49163)
>>> 2014-12-10 19:05:15,158 ERROR [handle-read-write-executor-0]
>>> network.ConnectionManager (Logging.scala:logError(75)) - Corresponding
>>> SendingConn
>>> ection to ConnectionManagerId(ec2-DRIVER.compute-1.amazonaws.com,49163)
>>> not found
>>> 2014-12-10 19:05:15,158 INFO  [connection-manager-thread]
>>> network.ConnectionManager (Logging.scala:logInfo(80)) - key already
>>> cancelled ? sun.n
>>> io.ch.SelectionKeyImpl@66ea19c
>>> java.nio.channels.CancelledKeyException
>>>         at
>>> org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:392)
>>>         at
>>> org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:145)
>>>
>>> ==== Driver
>>>
>>> 2014-12-10 19:05:13,805 INFO
>>> [sparkDriver-akka.actor.default-dispatcher-13] storage.BlockManagerInfo
>>> (Logging.scala:logInfo(59)) - Added input
>>> -0-1418238313600 in memory on ec2-EXECUTOR.compute-1.amazonaws.com:39444
>>> (size: 38.2 KB, free: 4.1 GB)
>>> 2014-12-10 19:05:13,823 ERROR
>>> [sparkDriver-akka.actor.default-dispatcher-16] scheduler.JobScheduler
>>> (Logging.scala:logError(96)) - Error runnin
>>> g job streaming job 1418238300000 ms.0
>>> java.io.FileNotFoundException: File
>>> s3n://BUCKET/_temporary/0/task_201412101900_0039_m_000033 does not exist.
>>>         at
>>> org.apache.hadoop.fs.s3native.NativeS3FileSystem.listStatus(NativeS3FileSystem.java:506)
>>>         at
>>> org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:360)
>>>         at
>>> org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:310)
>>>         at
>>> org.apache.hadoop.mapred.FileOutputCommitter.commitJob(FileOutputCommitter.java:136)
>>>         at
>>> org.apache.spark.SparkHadoopWriter.commitJob(SparkHadoopWriter.scala:126)
>>>         at
>>> org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:995)
>>>         at
>>> org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:878)
>>>         at
>>> org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:845)
>>>         at
>>> org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:803)
>>>         at
>>> MyDumperClass$$anonfun$main$1.apply(IncrementalDumpsJenkins.scala:100)
>>>         at
>>> MyDumperClass$$anonfun$main$1.apply(IncrementalDumpsJenkins.scala:79)
>>>         at
>>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:41)
>>>         at
>>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
>>>         at
>>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
>>>         at scala.util.Try$.apply(Try.scala:161)
>>>         at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32)
>>>         at
>>> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:172)
>>>         at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>         at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>         at java.lang.Thread.run(Thread.java:745)
>>> 2014-12-10 19:05:13,829 INFO  [Driver] yarn.ApplicationMaster
>>> (Logging.scala:logInfo(59)) - Unregistering ApplicationMaster with FAILED
>>>
>>> --
>>> Flávio R. Santos
>>>
>>> Chaordic | Platform
>>> www.chaordic.com.br
>>> +55 48 3232.3200
>>
>>
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Key not valid / already cancelled using Spark Streaming

Posted by Gerard Maas <ge...@gmail.com>.
If the timestamps in the logs are to be trusted It looks like your driver
is dying with that *java.io.FileNotFoundException*: and therefore the
workers loose their connection and close down.

-kr, Gerard.

On Thu, Dec 11, 2014 at 7:39 AM, Akhil Das <ak...@sigmoidanalytics.com>
wrote:

> Try to add the following to the sparkConf
>
>  .set("spark.core.connection.ack.wait.timeout","6000")
>
>       .set("spark.akka.frameSize","60")
>
> Used to face that issue with spark 1.1.0
>
> Thanks
> Best Regards
>
> On Thu, Dec 11, 2014 at 2:14 AM, Flávio Santos <barata@chaordicsystems.com
> > wrote:
>
>> Dear Spark'ers,
>>
>> I'm trying to run a simple job using Spark Streaming (version 1.1.1) and
>> YARN (yarn-cluster), but unfortunately I'm facing many issues. In short, my
>> job does the following:
>> - Consumes a specific Kafka topic
>> - Writes its content to S3 or HDFS
>>
>> Records in Kafka are in the form:
>> {"key": "someString"}
>>
>> This is important because I use the value of "key" to define the output
>> file name in S3.
>> Here are the Spark and Kafka parameters I'm using:
>>
>> val sparkConf = new SparkConf()
>>>   .setAppName("MyDumperApp")
>>>   .set("spark.task.maxFailures", "100")
>>>   .set("spark.hadoop.validateOutputSpecs", "false")
>>>   .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
>>>   .set("spark.executor.extraJavaOptions", "-XX:+UseCompressedOops")
>>> val kafkaParams = Map(
>>>   "zookeeper.connect" -> zkQuorum,
>>>   "zookeeper.session.timeout.ms" -> "10000",
>>>   "rebalance.backoff.ms" -> "8000",
>>>   "rebalance.max.retries" -> "10",
>>>   "group.id" -> group,
>>>   "auto.offset.reset" -> "largest"
>>> )
>>
>>
>> My application is the following:
>>
>> KafkaUtils.createStream[String, String, StringDecoder,
>>> StringDecoder](ssc, kafkaParams, Map(topic -> 1),
>>> StorageLevel.MEMORY_AND_DISK_SER_2)
>>>   .foreachRDD((rdd, time) =>
>>>     rdd.map {
>>>       case (_, line) =>
>>>         val json = parse(line)
>>>         val key = extract(json, "key").getOrElse("key_not_found")
>>>         (key, dateFormatter.format(time.milliseconds)) -> line
>>>     }
>>>       .partitionBy(new HashPartitioner(10))
>>>       .saveAsHadoopFile[KeyBasedOutput[(String,String),
>>> String]]("s3://BUCKET", classOf[BZip2Codec])
>>>   )
>>
>>
>> And the last piece:
>>
>> class KeyBasedOutput[T >: Null, V <: AnyRef] extends
>>> MultipleTextOutputFormat[T , V] {
>>>   override protected def generateFileNameForKeyValue(key: T, value: V,
>>> leaf: String) = key match {
>>>     case (myKey, batchId) =>
>>>       "somedir" + "/" + myKey + "/" +
>>>         "prefix-" + myKey + "_" + batchId + "_" + leaf
>>>   }
>>>   override protected def generateActualKey(key: T, value: V) = null
>>> }
>>
>>
>> I use batch sizes of 5 minutes with checkpoints activated.
>> The job fails nondeterministically (I think it never ran longer than ~5
>> hours). I have no clue why, it simply fails.
>> Please find below the exceptions thrown by my application.
>>
>> I really appreciate any kind of hint.
>> Thank you very much in advance.
>>
>> Regards,
>> -- Flávio
>>
>> ==== Executor 1
>>
>> 2014-12-10 19:05:15,150 INFO  [handle-read-write-executor-3]
>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
>> SendingConnection
>>  to ConnectionManagerId(ec2-DRIVER.compute-1.amazonaws.com,49163)
>> 2014-12-10 19:05:15,201 INFO  [Thread-6] storage.MemoryStore
>> (Logging.scala:logInfo(59)) - ensureFreeSpace(98665) called with
>> curMem=194463488,
>>  maxMem=4445479895
>> 2014-12-10 19:05:15,202 INFO  [Thread-6] storage.MemoryStore
>> (Logging.scala:logInfo(59)) - Block input-0-1418238315000 stored as bytes
>> in memor
>> y (estimated size 96.4 KB, free 4.0 GB)
>> 2014-12-10 19:05:16,506 INFO  [handle-read-write-executor-2]
>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
>> ReceivingConnecti
>> on to ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,51443)
>> 2014-12-10 19:05:16,506 INFO  [handle-read-write-executor-1]
>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
>> SendingConnection
>>  to ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,51443)
>> 2014-12-10 19:05:16,506 INFO  [handle-read-write-executor-2]
>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
>> SendingConnection
>>  to ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,51443)
>> 2014-12-10 19:05:16,649 INFO  [handle-read-write-executor-0]
>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
>> ReceivingConnecti
>> on to ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,39644)
>> 2014-12-10 19:05:16,649 INFO  [handle-read-write-executor-3]
>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
>> SendingConnection
>>  to ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,39644)
>> 2014-12-10 19:05:16,649 INFO  [handle-read-write-executor-0]
>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
>> SendingConnection
>>  to ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,39644)
>> 2014-12-10 19:05:16,650 INFO  [connection-manager-thread]
>> network.ConnectionManager (Logging.scala:logInfo(59)) - Key not valid ?
>> sun.nio.ch.Se
>> lectionKeyImpl@da2e041
>> 2014-12-10 19:05:16,651 INFO  [connection-manager-thread]
>> network.ConnectionManager (Logging.scala:logInfo(80)) - key already
>> cancelled ? sun.n
>> io.ch.SelectionKeyImpl@da2e041
>> *java.nio.channels.CancelledKeyException*
>>         at
>> org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:316)
>>         at
>> org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:145)
>> 2014-12-10 19:05:16,679 INFO  [handle-read-write-executor-1]
>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
>> ReceivingConnection to ConnectionManagerId(
>> ec2-EXECUTOR.compute-1.amazonaws.com,39444)
>> 2014-12-10 19:05:16,679 INFO  [handle-read-write-executor-2]
>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
>> SendingConnection to ConnectionManagerId(
>> ec2-EXECUTOR.compute-1.amazonaws.com,39444)
>> 2014-12-10 19:05:16,679 INFO  [handle-read-write-executor-1]
>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
>> SendingConnection to ConnectionManagerId(
>> ec2-EXECUTOR.compute-1.amazonaws.com,39444)
>> 2014-12-10 19:05:16,680 INFO  [connection-manager-thread]
>> network.ConnectionManager (Logging.scala:logInfo(59)) - Key not valid ?
>> sun.nio.ch.SelectionKeyImpl@6a0dd98a
>> 2014-12-10 19:05:16,681 INFO  [connection-manager-thread]
>> network.ConnectionManager (Logging.scala:logInfo(80)) - key already
>> cancelled ? sun.nio.ch.SelectionKeyImpl@6a0dd98a
>> *java.nio.channels.CancelledKeyException*
>>         at
>> org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:392)
>>         at
>> org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:145)
>> 2014-12-10 19:05:16,717 INFO  [handle-read-write-executor-3]
>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
>> ReceivingConnection to ConnectionManagerId(
>> ec2-EXECUTOR.compute-1.amazonaws.com,57984)
>> 2014-12-10 19:05:16,717 INFO  [handle-read-write-executor-0]
>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
>> SendingConnection to ConnectionManagerId(
>> ec2-EXECUTOR.compute-1.amazonaws.com,57984)
>> 2014-12-10 19:05:16,717 INFO  [handle-read-write-executor-3]
>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
>> SendingConnection to ConnectionManagerId(
>> ec2-EXECUTOR.compute-1.amazonaws.com,57984)
>> 2014-12-10 19:05:17,182 ERROR [SIGTERM handler]
>> executor.CoarseGrainedExecutorBackend (SignalLogger.scala:handle(57)) -
>> RECEIVED SIGNAL 15: SIGTERM
>>
>> ==== Executor 2
>>
>> 2014-12-10 19:05:15,010 INFO  [handle-message-executor-11]
>> storage.BlockManagerMaster (Logging.scala:logInfo(59)) - Updated info of
>> block input
>> -0-1418238314800
>> 2014-12-10 19:05:15,157 INFO  [connection-manager-thread]
>> network.ConnectionManager (Logging.scala:logInfo(59)) - Key not valid ?
>> sun.nio.ch.Se
>> lectionKeyImpl@66ea19c
>> 2014-12-10 19:05:15,157 INFO  [handle-read-write-executor-2]
>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
>> SendingConnection
>>  to ConnectionManagerId(ec2-DRIVER.compute-1.amazonaws.com,49163)
>> 2014-12-10 19:05:15,157 INFO  [handle-read-write-executor-0]
>> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
>> ReceivingConnecti
>> on to ConnectionManagerId(ec2-DRIVER.compute-1.amazonaws.com,49163)
>> 2014-12-10 19:05:15,158 ERROR [handle-read-write-executor-0]
>> network.ConnectionManager (Logging.scala:logError(75)) - Corresponding
>> SendingConn
>> ection to ConnectionManagerId(ec2-DRIVER.compute-1.amazonaws.com,49163)
>> not found
>> 2014-12-10 19:05:15,158 INFO  [connection-manager-thread]
>> network.ConnectionManager (Logging.scala:logInfo(80)) - key already
>> cancelled ? sun.n
>> io.ch.SelectionKeyImpl@66ea19c
>> *java.nio.channels.CancelledKeyException*
>>         at
>> org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:392)
>>         at
>> org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:145)
>>
>> ==== Driver
>>
>> 2014-12-10 19:05:13,805 INFO
>>  [sparkDriver-akka.actor.default-dispatcher-13] storage.BlockManagerInfo
>> (Logging.scala:logInfo(59)) - Added input
>> -0-1418238313600 in memory on ec2-EXECUTOR.compute-1.amazonaws.com:39444
>> (size: 38.2 KB, free: 4.1 GB)
>> 2014-12-10 19:05:13,823 ERROR
>> [sparkDriver-akka.actor.default-dispatcher-16] scheduler.JobScheduler
>> (Logging.scala:logError(96)) - Error runnin
>> g job streaming job 1418238300000 ms.0
>> *java.io.FileNotFoundException*: File
>> s3n://BUCKET/_temporary/0/task_201412101900_0039_m_000033 does not exist.
>>         at
>> org.apache.hadoop.fs.s3native.NativeS3FileSystem.listStatus(NativeS3FileSystem.java:506)
>>         at
>> org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:360)
>>         at
>> org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:310)
>>         at
>> org.apache.hadoop.mapred.FileOutputCommitter.commitJob(FileOutputCommitter.java:136)
>>         at
>> org.apache.spark.SparkHadoopWriter.commitJob(SparkHadoopWriter.scala:126)
>>         at
>> org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:995)
>>         at
>> org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:878)
>>         at
>> org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:845)
>>         at
>> org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:803)
>>         at
>> MyDumperClass$$anonfun$main$1.apply(IncrementalDumpsJenkins.scala:100)
>>         at
>> MyDumperClass$$anonfun$main$1.apply(IncrementalDumpsJenkins.scala:79)
>>         at
>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:41)
>>         at
>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
>>         at
>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
>>         at scala.util.Try$.apply(Try.scala:161)
>>         at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32)
>>         at
>> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:172)
>>         at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>         at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>         at java.lang.Thread.run(Thread.java:745)
>> 2014-12-10 19:05:13,829 INFO  [Driver] yarn.ApplicationMaster
>> (Logging.scala:logInfo(59)) - Unregistering ApplicationMaster with FAILED
>>
>>
>> *--Flávio R. Santos*
>>
>> Chaordic | *Platform*
>> *www.chaordic.com.br <http://www.chaordic.com.br/>*
>> +55 48 3232.3200
>>
>
>

Re: Key not valid / already cancelled using Spark Streaming

Posted by Akhil Das <ak...@sigmoidanalytics.com>.
Try to add the following to the sparkConf

 .set("spark.core.connection.ack.wait.timeout","6000")

      .set("spark.akka.frameSize","60")

Used to face that issue with spark 1.1.0

Thanks
Best Regards

On Thu, Dec 11, 2014 at 2:14 AM, Flávio Santos <ba...@chaordicsystems.com>
wrote:

> Dear Spark'ers,
>
> I'm trying to run a simple job using Spark Streaming (version 1.1.1) and
> YARN (yarn-cluster), but unfortunately I'm facing many issues. In short, my
> job does the following:
> - Consumes a specific Kafka topic
> - Writes its content to S3 or HDFS
>
> Records in Kafka are in the form:
> {"key": "someString"}
>
> This is important because I use the value of "key" to define the output
> file name in S3.
> Here are the Spark and Kafka parameters I'm using:
>
> val sparkConf = new SparkConf()
>>   .setAppName("MyDumperApp")
>>   .set("spark.task.maxFailures", "100")
>>   .set("spark.hadoop.validateOutputSpecs", "false")
>>   .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
>>   .set("spark.executor.extraJavaOptions", "-XX:+UseCompressedOops")
>> val kafkaParams = Map(
>>   "zookeeper.connect" -> zkQuorum,
>>   "zookeeper.session.timeout.ms" -> "10000",
>>   "rebalance.backoff.ms" -> "8000",
>>   "rebalance.max.retries" -> "10",
>>   "group.id" -> group,
>>   "auto.offset.reset" -> "largest"
>> )
>
>
> My application is the following:
>
> KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc,
>> kafkaParams, Map(topic -> 1), StorageLevel.MEMORY_AND_DISK_SER_2)
>>   .foreachRDD((rdd, time) =>
>>     rdd.map {
>>       case (_, line) =>
>>         val json = parse(line)
>>         val key = extract(json, "key").getOrElse("key_not_found")
>>         (key, dateFormatter.format(time.milliseconds)) -> line
>>     }
>>       .partitionBy(new HashPartitioner(10))
>>       .saveAsHadoopFile[KeyBasedOutput[(String,String),
>> String]]("s3://BUCKET", classOf[BZip2Codec])
>>   )
>
>
> And the last piece:
>
> class KeyBasedOutput[T >: Null, V <: AnyRef] extends
>> MultipleTextOutputFormat[T , V] {
>>   override protected def generateFileNameForKeyValue(key: T, value: V,
>> leaf: String) = key match {
>>     case (myKey, batchId) =>
>>       "somedir" + "/" + myKey + "/" +
>>         "prefix-" + myKey + "_" + batchId + "_" + leaf
>>   }
>>   override protected def generateActualKey(key: T, value: V) = null
>> }
>
>
> I use batch sizes of 5 minutes with checkpoints activated.
> The job fails nondeterministically (I think it never ran longer than ~5
> hours). I have no clue why, it simply fails.
> Please find below the exceptions thrown by my application.
>
> I really appreciate any kind of hint.
> Thank you very much in advance.
>
> Regards,
> -- Flávio
>
> ==== Executor 1
>
> 2014-12-10 19:05:15,150 INFO  [handle-read-write-executor-3]
> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
> SendingConnection
>  to ConnectionManagerId(ec2-DRIVER.compute-1.amazonaws.com,49163)
> 2014-12-10 19:05:15,201 INFO  [Thread-6] storage.MemoryStore
> (Logging.scala:logInfo(59)) - ensureFreeSpace(98665) called with
> curMem=194463488,
>  maxMem=4445479895
> 2014-12-10 19:05:15,202 INFO  [Thread-6] storage.MemoryStore
> (Logging.scala:logInfo(59)) - Block input-0-1418238315000 stored as bytes
> in memor
> y (estimated size 96.4 KB, free 4.0 GB)
> 2014-12-10 19:05:16,506 INFO  [handle-read-write-executor-2]
> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
> ReceivingConnecti
> on to ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,51443)
> 2014-12-10 19:05:16,506 INFO  [handle-read-write-executor-1]
> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
> SendingConnection
>  to ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,51443)
> 2014-12-10 19:05:16,506 INFO  [handle-read-write-executor-2]
> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
> SendingConnection
>  to ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,51443)
> 2014-12-10 19:05:16,649 INFO  [handle-read-write-executor-0]
> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
> ReceivingConnecti
> on to ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,39644)
> 2014-12-10 19:05:16,649 INFO  [handle-read-write-executor-3]
> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
> SendingConnection
>  to ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,39644)
> 2014-12-10 19:05:16,649 INFO  [handle-read-write-executor-0]
> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
> SendingConnection
>  to ConnectionManagerId(ec2-EXECUTOR.compute-1.amazonaws.com,39644)
> 2014-12-10 19:05:16,650 INFO  [connection-manager-thread]
> network.ConnectionManager (Logging.scala:logInfo(59)) - Key not valid ?
> sun.nio.ch.Se
> lectionKeyImpl@da2e041
> 2014-12-10 19:05:16,651 INFO  [connection-manager-thread]
> network.ConnectionManager (Logging.scala:logInfo(80)) - key already
> cancelled ? sun.n
> io.ch.SelectionKeyImpl@da2e041
> *java.nio.channels.CancelledKeyException*
>         at
> org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:316)
>         at
> org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:145)
> 2014-12-10 19:05:16,679 INFO  [handle-read-write-executor-1]
> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
> ReceivingConnection to ConnectionManagerId(
> ec2-EXECUTOR.compute-1.amazonaws.com,39444)
> 2014-12-10 19:05:16,679 INFO  [handle-read-write-executor-2]
> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
> SendingConnection to ConnectionManagerId(
> ec2-EXECUTOR.compute-1.amazonaws.com,39444)
> 2014-12-10 19:05:16,679 INFO  [handle-read-write-executor-1]
> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
> SendingConnection to ConnectionManagerId(
> ec2-EXECUTOR.compute-1.amazonaws.com,39444)
> 2014-12-10 19:05:16,680 INFO  [connection-manager-thread]
> network.ConnectionManager (Logging.scala:logInfo(59)) - Key not valid ?
> sun.nio.ch.SelectionKeyImpl@6a0dd98a
> 2014-12-10 19:05:16,681 INFO  [connection-manager-thread]
> network.ConnectionManager (Logging.scala:logInfo(80)) - key already
> cancelled ? sun.nio.ch.SelectionKeyImpl@6a0dd98a
> *java.nio.channels.CancelledKeyException*
>         at
> org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:392)
>         at
> org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:145)
> 2014-12-10 19:05:16,717 INFO  [handle-read-write-executor-3]
> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
> ReceivingConnection to ConnectionManagerId(
> ec2-EXECUTOR.compute-1.amazonaws.com,57984)
> 2014-12-10 19:05:16,717 INFO  [handle-read-write-executor-0]
> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
> SendingConnection to ConnectionManagerId(
> ec2-EXECUTOR.compute-1.amazonaws.com,57984)
> 2014-12-10 19:05:16,717 INFO  [handle-read-write-executor-3]
> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
> SendingConnection to ConnectionManagerId(
> ec2-EXECUTOR.compute-1.amazonaws.com,57984)
> 2014-12-10 19:05:17,182 ERROR [SIGTERM handler]
> executor.CoarseGrainedExecutorBackend (SignalLogger.scala:handle(57)) -
> RECEIVED SIGNAL 15: SIGTERM
>
> ==== Executor 2
>
> 2014-12-10 19:05:15,010 INFO  [handle-message-executor-11]
> storage.BlockManagerMaster (Logging.scala:logInfo(59)) - Updated info of
> block input
> -0-1418238314800
> 2014-12-10 19:05:15,157 INFO  [connection-manager-thread]
> network.ConnectionManager (Logging.scala:logInfo(59)) - Key not valid ?
> sun.nio.ch.Se
> lectionKeyImpl@66ea19c
> 2014-12-10 19:05:15,157 INFO  [handle-read-write-executor-2]
> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
> SendingConnection
>  to ConnectionManagerId(ec2-DRIVER.compute-1.amazonaws.com,49163)
> 2014-12-10 19:05:15,157 INFO  [handle-read-write-executor-0]
> network.ConnectionManager (Logging.scala:logInfo(59)) - Removing
> ReceivingConnecti
> on to ConnectionManagerId(ec2-DRIVER.compute-1.amazonaws.com,49163)
> 2014-12-10 19:05:15,158 ERROR [handle-read-write-executor-0]
> network.ConnectionManager (Logging.scala:logError(75)) - Corresponding
> SendingConn
> ection to ConnectionManagerId(ec2-DRIVER.compute-1.amazonaws.com,49163)
> not found
> 2014-12-10 19:05:15,158 INFO  [connection-manager-thread]
> network.ConnectionManager (Logging.scala:logInfo(80)) - key already
> cancelled ? sun.n
> io.ch.SelectionKeyImpl@66ea19c
> *java.nio.channels.CancelledKeyException*
>         at
> org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:392)
>         at
> org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:145)
>
> ==== Driver
>
> 2014-12-10 19:05:13,805 INFO
>  [sparkDriver-akka.actor.default-dispatcher-13] storage.BlockManagerInfo
> (Logging.scala:logInfo(59)) - Added input
> -0-1418238313600 in memory on ec2-EXECUTOR.compute-1.amazonaws.com:39444
> (size: 38.2 KB, free: 4.1 GB)
> 2014-12-10 19:05:13,823 ERROR
> [sparkDriver-akka.actor.default-dispatcher-16] scheduler.JobScheduler
> (Logging.scala:logError(96)) - Error runnin
> g job streaming job 1418238300000 ms.0
> *java.io.FileNotFoundException*: File
> s3n://BUCKET/_temporary/0/task_201412101900_0039_m_000033 does not exist.
>         at
> org.apache.hadoop.fs.s3native.NativeS3FileSystem.listStatus(NativeS3FileSystem.java:506)
>         at
> org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:360)
>         at
> org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:310)
>         at
> org.apache.hadoop.mapred.FileOutputCommitter.commitJob(FileOutputCommitter.java:136)
>         at
> org.apache.spark.SparkHadoopWriter.commitJob(SparkHadoopWriter.scala:126)
>         at
> org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:995)
>         at
> org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:878)
>         at
> org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:845)
>         at
> org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:803)
>         at
> MyDumperClass$$anonfun$main$1.apply(IncrementalDumpsJenkins.scala:100)
>         at
> MyDumperClass$$anonfun$main$1.apply(IncrementalDumpsJenkins.scala:79)
>         at
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:41)
>         at
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
>         at
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
>         at scala.util.Try$.apply(Try.scala:161)
>         at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32)
>         at
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:172)
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>         at java.lang.Thread.run(Thread.java:745)
> 2014-12-10 19:05:13,829 INFO  [Driver] yarn.ApplicationMaster
> (Logging.scala:logInfo(59)) - Unregistering ApplicationMaster with FAILED
>
>
> *--Flávio R. Santos*
>
> Chaordic | *Platform*
> *www.chaordic.com.br <http://www.chaordic.com.br/>*
> +55 48 3232.3200
>