You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Joseph Zack <jo...@broadcom.com> on 2020/06/24 01:31:05 UTC

Fwd: Streaming Beam jobs keep restarting on Spark/Kubernetes?

+dev I'm running out of things to try

---------- Forwarded message ---------
From: Joseph Zack <jo...@broadcom.com>
Date: Tue, Jun 23, 2020 at 9:25 PM
Subject: Re: Streaming Beam jobs keep restarting on Spark/Kubernetes?
To: <us...@beam.apache.org>


Figured I'd give this one more shot, I'm still not having much luck with
Beam+Spark+Kafka.

It just starts up, and shuts down. I've gotten data through a few times, so
not sure if I've got a race condition or what.

I attached a copy of the logs, and the copy of the way I'm submitting the
job to spark.

Some other pieces of information that might be helpful.

   - The code works great with the localrunner
   - The topic I'm reading from only has 7 messages in it (race condition?)
   - The offsets don't get updated (unless it's one of the rare times when
   it works)
   - I never see a consumer group created for the OffsetConsumerGroup
   - Not doing anything fancy with KafkaIO (attached)
   - I see one Spark/UI exception in the logs
   - Debug logging is on in the job, and in spark
   - Looks like it might be similar to this ticket, but the problem occurs
   with or without "withMaxNumRecords(1)"

*Snippet*
20/06/24 00:28:06 DEBUG servlet.ServletHandler: Chose path=/ mapped to
servlet=org.spark_project.jetty.servlet.DefaultServlet-3e1d5c26 from
default=false
20/06/24 00:28:06 DEBUG servlet.ServletHandler: filterNameMap={}
20/06/24 00:28:06 DEBUG servlet.ServletHandler: pathFilters=null
20/06/24 00:28:06 DEBUG servlet.ServletHandler: servletFilterMap=null
20/06/24 00:28:06 DEBUG servlet.ServletHandler:
servletPathMap={/=org.spark_project.jetty.servlet.DefaultServlet-3e1d5c26@6324b86
==org.spark_project.jetty.servlet.DefaultServlet,jsp=null,order=-1,inst=true}
20/06/24 00:28:06 DEBUG servlet.ServletHandler:
servletNameMap={org.spark_project.jetty.servlet.DefaultServlet-3e1d5c26=org.spark_project.jetty.servlet.DefaultServlet-3e1d5c26@6324b86
==org.spark_project.jetty.servlet.DefaultServlet,jsp=null,order=-1,inst=true}
20/06/24 00:28:06 DEBUG handler.AbstractHandler: starting
org.spark_project.jetty.servlet.ServletHandler@529648a7
20/06/24 00:28:06 DEBUG component.AbstractLifeCycle: STARTED @14744ms
org.spark_project.jetty.servlet.ServletHandler@529648a7
20/06/24 00:28:06 DEBUG component.AbstractLifeCycle: starting
org.spark_project.jetty.servlet.DefaultServlet-3e1d5c26@6324b86
==org.spark_project.jetty.servlet.DefaultServlet,jsp=null,order=-1,inst=true
20/06/24 00:28:06 DEBUG component.AbstractLifeCycle: STARTED @14744ms
org.spark_project.jetty.servlet.DefaultServlet-3e1d5c26@6324b86
==org.spark_project.jetty.servlet.DefaultServlet,jsp=null,order=-1,inst=true
20/06/24 00:28:06 DEBUG servlet.ServletHolder: Servlet.init
org.spark_project.jetty.servlet.DefaultServlet@5fd79ca1 for
org.spark_project.jetty.servlet.DefaultServlet-3e1d5c26
20/06/24 00:28:06 DEBUG servlet.DefaultServlet: resource base =
jar:file:/opt/spark/jars/spark-streaming_2.11-2.4.5.jar!/org/apache/spark/streaming/ui/static
20/06/24 00:28:06 INFO handler.ContextHandler: Started
o.s.j.s.ServletContextHandler@64c1b3c0
{/static/streaming,null,AVAILABLE,@Spark}
20/06/24 00:28:06 DEBUG component.AbstractLifeCycle: STARTED @14745ms
o.s.j.s.ServletContextHandler@64c1b3c0
{/static/streaming,null,AVAILABLE,@Spark}
20/06/24 00:28:06 INFO streaming.StreamingContext: StreamingContext started
20/06/24 00:28:06 INFO streaming.StreamingContext: Invoking
stop(stopGracefully=false) from shutdown hook
20/06/24 00:28:06 DEBUG scheduler.JobScheduler: Stopping JobScheduler
20/06/24 00:28:06 INFO util.BatchedWriteAheadLog: BatchedWriteAheadLog
shutting down at time: 1592958486766.
20/06/24 00:28:06 WARN util.BatchedWriteAheadLog: BatchedWriteAheadLog
Writer queue interrupted.
20/06/24 00:28:06 INFO util.BatchedWriteAheadLog: BatchedWriteAheadLog
Writer thread exiting.
20/06/24 00:28:06 INFO util.FileBasedWriteAheadLog_ReceivedBlockTracker:
Stopped write ahead log manager

---------- Forwarded message ---------
From: Joseph Zack <jo...@broadcom.com>
Date: Fri, Jun 12, 2020 at 11:16 AM
Subject: Re: Streaming Beam jobs keep restarting on Spark/Kubernetes?
To: <us...@beam.apache.org>


Getting back to this, as far as I can tell the job is exiting without
error, even though it sees an unbounded dataset.

Below is a link to the full logs for the job run:
https://pastebin.com/Rh6vTqWU

The rough steps:
* I submit the job via the GCP spark operator
<https://github.com/GoogleCloudPlatform/spark-on-k8s-operator>
* The driver spins up and creates executors
* It connects to a Kafka topic
* Job completes and shuts down

Am I missing something? Is this what is supposed to happen?



[image: image.png]

On Mon, Jun 8, 2020 at 4:56 PM Joseph Zack <jo...@broadcom.com> wrote:

> I'm submitting the job via an operator provided by Google Cloud Platform.
> Here's a rough sample showing how I do it - though this is just with the
> wordcount sample:
> https://github.com/THEjoezack/beam-on-spark-on-kubernetes
>
> The driver shows as "Completed" before it starts again, but I'll dig
> deeper since it sounds fishy to you. :)
>
> On Mon, Jun 8, 2020 at 4:53 PM Kyle Weaver <kc...@google.com> wrote:
>
>> > There is no error
>>
>> Are you sure? That sounds like a crash loop to me. It might take some
>> digging through various Kubernetes logs to find the cause.
>>
>> Can you provide more information about how you're running the job?
>>
>> On Mon, Jun 8, 2020 at 1:50 PM Joseph Zack <jo...@broadcom.com>
>> wrote:
>>
>>> Anybody out there running Beam on Spark?
>>>
>>> I am pulling data from a Kafka topic with KafkaIO, but the job keeps
>>> restarting. There is no error, it just....
>>>
>>>    1. creates the driver
>>>    2. creates the executors
>>>    3. runs for a few seconds
>>>    4. terminates the executors
>>>    5. terminates the driver
>>>    6. goto #1
>>>
>>> I'm new to Beam, and completely new to Spark so I'm not sure how it's
>>> supposed to work. Is this expected behavior? I expected the Beam job to run
>>> continuously. Either I'm missing a setting, or I'm misunderstanding how
>>> things are supposed to work.
>>>
>>> Thanks for your consideration!
>>>
>>> --
>>>
>>> Joseph Zack
>>> Software Engineer   | Information Security Group   | Symantec
>>> Enterprise Division
>>> Broadcom
>>>
>>> mobile: 407.920.4930 <(407)%20920-4930>
>>>
>>> joseph.zack@broadcom.com   | broadcom.com
>>>
>>
>
> --
>
> Joseph Zack
> Software Engineer   | Information Security Group   | Symantec Enterprise
> Division
> Broadcom
>
> mobile: 407.920.4930
>
> joseph.zack@broadcom.com   | broadcom.com
>


-- 

Joseph Zack
Software Engineer   | Information Security Group   | Symantec Enterprise
Division
Broadcom

mobile: 407.920.4930

joseph.zack@broadcom.com   | broadcom.com


-- 

Joseph Zack
Software Engineer   | Information Security Group   | Symantec Enterprise
Division
Broadcom

mobile: 407.920.4930

joseph.zack@broadcom.com   | broadcom.com


-- 

Joseph Zack
Software Engineer   | Information Security Group   | Symantec Enterprise
Division
Broadcom

mobile: 407.920.4930

joseph.zack@broadcom.com   | broadcom.com