You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Renyi Xiong <re...@gmail.com> on 2015/10/06 00:03:41 UTC

Re: failure notice

if RDDs from same DStream not guaranteed to run on same worker, then the
question becomes:

is it possible to specify an unlimited duration in ssc to have a continuous
stream (as opposed to discretized).

say, we have a per node streaming engine (built-in checkpoint and recovery)
we'd like to integrate with spark streaming. can we have a never-ending
batch (or RDD) this way?

On Mon, Sep 28, 2015 at 4:31 PM, <MA...@apache.org> wrote:

> Hi. This is the qmail-send program at apache.org.
> I'm afraid I wasn't able to deliver your message to the following
> addresses.
> This is a permanent error; I've given up. Sorry it didn't work out.
>
> <us...@spark.apache.org>:
> Must be sent from an @apache.org address or a subscriber address or an
> address in LDAP.
>
> --- Below this line is a copy of the message.
>
> Return-Path: <re...@gmail.com>
> Received: (qmail 95559 invoked by uid 99); 28 Sep 2015 23:31:46 -0000
> Received: from Unknown (HELO spamd3-us-west.apache.org) (209.188.14.142)
>     by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 28 Sep 2015 23:31:46
> +0000
> Received: from localhost (localhost [127.0.0.1])
>         by spamd3-us-west.apache.org (ASF Mail Server at
> spamd3-us-west.apache.org) with ESMTP id 96E361809BA
>         for <us...@spark.apache.org>; Mon, 28 Sep 2015 23:31:45 +0000 (UTC)
> X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org
> X-Spam-Flag: NO
> X-Spam-Score: 3.129
> X-Spam-Level: ***
> X-Spam-Status: No, score=3.129 tagged_above=-999 required=6.31
>         tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1,
>         FREEMAIL_ENVFROM_END_DIGIT=0.25, HTML_MESSAGE=3,
>         RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, SPF_PASS=-0.001]
>         autolearn=disabled
> Authentication-Results: spamd3-us-west.apache.org (amavisd-new);
>         dkim=pass (2048-bit key) header.d=gmail.com
> Received: from mx1-us-west.apache.org ([10.40.0.8])
>         by localhost (spamd3-us-west.apache.org [10.40.0.10])
> (amavisd-new, port 10024)
>         with ESMTP id FAGoohFE7Y7A for <us...@spark.apache.org>;
>         Mon, 28 Sep 2015 23:31:44 +0000 (UTC)
> Received: from mail-la0-f51.google.com (mail-la0-f51.google.com
> [209.85.215.51])
>         by mx1-us-west.apache.org (ASF Mail Server at
> mx1-us-west.apache.org) with ESMTPS id 2ED40204C9
>         for <us...@spark.apache.org>; Mon, 28 Sep 2015 23:31:44 +0000 (UTC)
> Received: by labzv5 with SMTP id zv5so32919088lab.1
>         for <us...@spark.apache.org>; Mon, 28 Sep 2015 16:31:42 -0700 (PDT)
> DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed;
>         d=gmail.com; s=20120113;
>
> h=mime-version:in-reply-to:references:date:message-id:subject:from:to
>          :cc:content-type;
>         bh=F36l+I4dfDHTL7nQ0K9mAW4aVtPpVpYc0rWWpPNjt4c=;
>
> b=QfRdLEWf4clJqwkZSH7n0oHjXLNifWdhYxvCDZ+P37oSfM0vd/8Bx962hTflRQkD1q
>
>  2B3go7g8bpnQlhZgMRrZfT6hk7vUtNA3lOZjYeN+cPyoVRaBwm3LIID5vF4cw5hFAWaM
>
>  LUenU7E7b9kJY8JkyhIfpya8CLKz+Yo6EjCv3W6BAvv2YiNPgbOQkpx7u8y9dw0kHGig
>
>  1hv37Ey/DZpoKCgbSesv+sztYslevu+VBgxHFkveEyxH1saRr6OqTM7fnL2E6fP4E8qu
>
>  W81G1ZfNW1Pp9i5IcCb/9S7YTZDnBlUj4yROsOfNANRGmed71QpQD9l8NnAQXmeqoeNF
>          SyEg==
> MIME-Version: 1.0
> X-Received: by 10.25.213.75 with SMTP id m72mr4047578lfg.17.1443483102618;
>  Mon, 28 Sep 2015 16:31:42 -0700 (PDT)
> Received: by 10.25.207.18 with HTTP; Mon, 28 Sep 2015 16:31:42 -0700 (PDT)
> In-Reply-To: <CAPn6-YTZo+-A1HThmBO4mKO0sELrvxP7DZEpg3GoWN=Qz=
> 2LPA@mail.gmail.com>
> References: <
> CANgSV6-k+33GvgtiyNwhz2Gsbudf_WwwnazVUpbqe8QdCg_k3w@mail.gmail.com>
>         <CAPn6-YQ3Q-=HMrqz5FLLPx_HmjmHkHP7cwsPYvsxw-tb7a8P=
> g@mail.gmail.com>
>         <CAPn6-YTZo+-A1HThmBO4mKO0sELrvxP7DZEpg3GoWN=Qz=
> 2LPA@mail.gmail.com>
> Date: Mon, 28 Sep 2015 16:31:42 -0700
> Message-ID: <
> CANgSV69hyqBbVb8_8zShsTLrpDy-37fjNwYVXCe-XF7DphQ8oA@mail.gmail.com>
> Subject: Re: Spark streaming DStream state on worker
> From: Renyi Xiong <re...@gmail.com>
> To: Shixiong Zhu <zs...@gmail.com>
> Cc: "user@spark.apache.org" <us...@spark.apache.org>
> Content-Type: multipart/alternative; boundary=001a11411fde922c170520d71928
>
> --001a11411fde922c170520d71928
> Content-Type: text/plain; charset=UTF-8
>
> you answered my question I think that RDDs from same DStream not guaranteed
> to run on same worker
>
> On Thu, Sep 24, 2015 at 1:51 AM, Shixiong Zhu <zs...@gmail.com> wrote:
>
> > +user, -dev
> >
> > It's not clear about `compute` in your question. There are two `compute`
> > here.
> >
> > 1. DStream.compute: it always runs in the driver, and all RDDs are
> created
> > in the driver. E.g.,
> >
> > DStream.foreachRDD(rdd => rdd.count())
> >
> > "rdd.count()" is called in the driver.
> >
> > 2. RDD.compute: this will run in the executor and the location is not
> > guaranteed. E.g.,
> >
> > DStream.foreachRDD(rdd => rdd.foreach { v =>
> >     println(v)
> > })
> >
> > "println(v)" is called in the executor.
> >
> >
> > Best Regards,
> > Shixiong Zhu
> >
> > 2015-09-17 3:47 GMT+08:00 Renyi Xiong <re...@gmail.com>:
> >
> >> Hi,
> >>
> >> I want to do temporal join operation on DStream across RDDs, my question
> >> is: Are RDDs from same DStream always computed on same worker (except
> >> failover) ?
> >>
> >> thanks,
> >> Renyi.
> >>
> >
> >
> >
>
> --001a11411fde922c170520d71928
> Content-Type: text/html; charset=UTF-8
> Content-Transfer-Encoding: quoted-printable
>
> <div dir=3D"ltr">you answered my question I think that RDDs from same
> DStre=
> am not guaranteed to run on same worker</div><div
> class=3D"gmail_extra"><br=
> ><div class=3D"gmail_quote">On Thu, Sep 24, 2015 at 1:51 AM, Shixiong Zhu
> <=
> span dir=3D"ltr">&lt;<a href=3D"mailto:zsxwing@gmail.com"
> target=3D"_blank"=
> >zsxwing@gmail.com</a>&gt;</span> wrote:<br><blockquote
> class=3D"gmail_quot=
> e" style=3D"margin:0 0 0 .8ex;border-left:1px #ccc
> solid;padding-left:1ex">=
> <div dir=3D"ltr"><div class=3D"gmail_quote"><div dir=3D"ltr"><div>+user,
> -d=
> ev</div><div><div class=3D"h5"><div><br></div><div>It&#39;s not clear
> about=
>  `compute` in your question. There are two `compute`
> here.</div><div><br></=
> div><div>1. DStream.compute: it always runs in the driver, and all RDDs
> are=
>  created in the driver.
> E.g.,=C2=A0</div><div><br></div><div>DStream.foreac=
> hRDD(rdd =3D&gt;
> rdd.count())</div><div><br></div><div>&quot;rdd.count()&qu=
> ot; is called in the driver.</div><div><br></div><div>2. RDD.compute: this
> =
> will run in the executor and the location is not guaranteed.
> E.g.,</div><di=
> v><br></div><div>DStream.foreachRDD(rdd =3D&gt; rdd.foreach { v
> =3D&gt;</di=
> v><div>=C2=A0 =C2=A0
> println(v)</div><div>})<br></div><div><br></div><div>&=
> quot;println(v)&quot; is called in the
> executor.</div><br></div></div></div=
> ><div><div class=3D"h5"><div class=3D"gmail_extra"><br
> clear=3D"all"><div><=
> div><div dir=3D"ltr"><div><div dir=3D"ltr"><div><div dir=3D"ltr"><div><div
> =
> dir=3D"ltr"><p>Best Regards,</p><div>Shixiong
> Zhu</div></div></div></div></=
> div></div></div></div></div></div><div><div>
> <br><div class=3D"gmail_quote">2015-09-17 3:47 GMT+08:00 Renyi Xiong <span
> =
> dir=3D"ltr">&lt;<a href=3D"mailto:renyixiong0@gmail.com"
> target=3D"_blank">=
> renyixiong0@gmail.com</a>&gt;</span>:<br><blockquote
> class=3D"gmail_quote" =
> style=3D"margin:0 0 0 .8ex;border-left:1px #ccc
> solid;padding-left:1ex"><di=
> v dir=3D"ltr"><div>Hi,</div><div><br></div><div>I want to do=C2=A0temporal
> =
> join operation on DStream across RDDs, my question is: Are RDDs from same
> D=
> Stream always computed on same worker (except failover)
> ?</div><div><br></d=
> iv><div>thanks,</div><div>Renyi.</div></div>
> </blockquote></div><br></div></div></div>
> </div></div></div><br></div>
> </blockquote></div><br></div>
>
> --001a11411fde922c170520d71928--
>

Re: failure notice

Posted by Tathagata Das <td...@databricks.com>.
Unfortunately, there is not an obvious way to do this. I am guessing that
you want to partition your stream such that the same keys always go to the
same executor, right?

You could do it by writing a custom RDD. See ShuffledRDD
<https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala>.
That is what is used to do a lot of shuffling. See how it is used from
RDD.partitionByKey() or RDD.reduceByKey(). You could subclass it specify a
set of preferred locations, and the system will try to respect those
locations. These locations should be among the currently active executors.
You could either get the current list of executors from
SparkContext.getExecutorMemoryStatus(),

Hope this helps.

On Tue, Oct 6, 2015 at 8:27 AM, Renyi Xiong <re...@gmail.com> wrote:

> yes, it can recover on a different node. it uses write-ahead-log,
> checkpoints offsets of both ingress and egress (e.g. using zookeeper and/or
> kafka), replies on the streaming engine's deterministic operations.
>
> by replaying back a certain range of data based on checkpointed
> ingress offset (at least once semantic), state can be recovered, and
> filters out duplicate events based on checkpointed egress offset (at most
> once semantic)
>
> hope it makes sense.
>
> On Mon, Oct 5, 2015 at 3:11 PM, Tathagata Das <td...@databricks.com> wrote:
>
>> What happens when a whole node running  your " per node streaming engine
>> (built-in checkpoint and recovery)" fails? Can its checkpoint and recovery
>> mechanism handle whole node failure? Can you recover from the checkpoint on
>> a different node?
>>
>> Spark and Spark Streaming were designed with the idea that executors are
>> disposable, and there should not be any node-specific long term state that
>> you rely on unless you can recover that state on a different node.
>>
>> On Mon, Oct 5, 2015 at 3:03 PM, Renyi Xiong <re...@gmail.com>
>> wrote:
>>
>>> if RDDs from same DStream not guaranteed to run on same worker, then the
>>> question becomes:
>>>
>>> is it possible to specify an unlimited duration in ssc to have a
>>> continuous stream (as opposed to discretized).
>>>
>>> say, we have a per node streaming engine (built-in checkpoint and
>>> recovery) we'd like to integrate with spark streaming. can we have a
>>> never-ending batch (or RDD) this way?
>>>
>>> On Mon, Sep 28, 2015 at 4:31 PM, <MA...@apache.org> wrote:
>>>
>>>> Hi. This is the qmail-send program at apache.org.
>>>> I'm afraid I wasn't able to deliver your message to the following
>>>> addresses.
>>>> This is a permanent error; I've given up. Sorry it didn't work out.
>>>>
>>>> <us...@spark.apache.org>:
>>>> Must be sent from an @apache.org address or a subscriber address or an
>>>> address in LDAP.
>>>>
>>>> --- Below this line is a copy of the message.
>>>>
>>>> Return-Path: <re...@gmail.com>
>>>> Received: (qmail 95559 invoked by uid 99); 28 Sep 2015 23:31:46 -0000
>>>> Received: from Unknown (HELO spamd3-us-west.apache.org)
>>>> (209.188.14.142)
>>>>     by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 28 Sep 2015 23:31:46
>>>> +0000
>>>> Received: from localhost (localhost [127.0.0.1])
>>>>         by spamd3-us-west.apache.org (ASF Mail Server at
>>>> spamd3-us-west.apache.org) with ESMTP id 96E361809BA
>>>>         for <us...@spark.apache.org>; Mon, 28 Sep 2015 23:31:45 +0000
>>>> (UTC)
>>>> X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org
>>>> X-Spam-Flag: NO
>>>> X-Spam-Score: 3.129
>>>> X-Spam-Level: ***
>>>> X-Spam-Status: No, score=3.129 tagged_above=-999 required=6.31
>>>>         tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1,
>>>>         FREEMAIL_ENVFROM_END_DIGIT=0.25, HTML_MESSAGE=3,
>>>>         RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01,
>>>> SPF_PASS=-0.001]
>>>>         autolearn=disabled
>>>> Authentication-Results: spamd3-us-west.apache.org (amavisd-new);
>>>>         dkim=pass (2048-bit key) header.d=gmail.com
>>>> Received: from mx1-us-west.apache.org ([10.40.0.8])
>>>>         by localhost (spamd3-us-west.apache.org [10.40.0.10])
>>>> (amavisd-new, port 10024)
>>>>         with ESMTP id FAGoohFE7Y7A for <us...@spark.apache.org>;
>>>>         Mon, 28 Sep 2015 23:31:44 +0000 (UTC)
>>>> Received: from mail-la0-f51.google.com (mail-la0-f51.google.com
>>>> [209.85.215.51])
>>>>         by mx1-us-west.apache.org (ASF Mail Server at
>>>> mx1-us-west.apache.org) with ESMTPS id 2ED40204C9
>>>>         for <us...@spark.apache.org>; Mon, 28 Sep 2015 23:31:44 +0000
>>>> (UTC)
>>>> Received: by labzv5 with SMTP id zv5so32919088lab.1
>>>>         for <us...@spark.apache.org>; Mon, 28 Sep 2015 16:31:42 -0700
>>>> (PDT)
>>>> DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed;
>>>>         d=gmail.com; s=20120113;
>>>>
>>>> h=mime-version:in-reply-to:references:date:message-id:subject:from:to
>>>>          :cc:content-type;
>>>>         bh=F36l+I4dfDHTL7nQ0K9mAW4aVtPpVpYc0rWWpPNjt4c=;
>>>>
>>>> b=QfRdLEWf4clJqwkZSH7n0oHjXLNifWdhYxvCDZ+P37oSfM0vd/8Bx962hTflRQkD1q
>>>>
>>>>  2B3go7g8bpnQlhZgMRrZfT6hk7vUtNA3lOZjYeN+cPyoVRaBwm3LIID5vF4cw5hFAWaM
>>>>
>>>>  LUenU7E7b9kJY8JkyhIfpya8CLKz+Yo6EjCv3W6BAvv2YiNPgbOQkpx7u8y9dw0kHGig
>>>>
>>>>  1hv37Ey/DZpoKCgbSesv+sztYslevu+VBgxHFkveEyxH1saRr6OqTM7fnL2E6fP4E8qu
>>>>
>>>>  W81G1ZfNW1Pp9i5IcCb/9S7YTZDnBlUj4yROsOfNANRGmed71QpQD9l8NnAQXmeqoeNF
>>>>          SyEg==
>>>> MIME-Version: 1.0
>>>> X-Received: by 10.25.213.75 with SMTP id
>>>> m72mr4047578lfg.17.1443483102618;
>>>>  Mon, 28 Sep 2015 16:31:42 -0700 (PDT)
>>>> Received: by 10.25.207.18 with HTTP; Mon, 28 Sep 2015 16:31:42 -0700
>>>> (PDT)
>>>> In-Reply-To: <CAPn6-YTZo+-A1HThmBO4mKO0sELrvxP7DZEpg3GoWN=Qz=
>>>> 2LPA@mail.gmail.com>
>>>> References: <
>>>> CANgSV6-k+33GvgtiyNwhz2Gsbudf_WwwnazVUpbqe8QdCg_k3w@mail.gmail.com>
>>>>         <CAPn6-YQ3Q-=HMrqz5FLLPx_HmjmHkHP7cwsPYvsxw-tb7a8P=
>>>> g@mail.gmail.com>
>>>>         <CAPn6-YTZo+-A1HThmBO4mKO0sELrvxP7DZEpg3GoWN=Qz=
>>>> 2LPA@mail.gmail.com>
>>>> Date: Mon, 28 Sep 2015 16:31:42 -0700
>>>> Message-ID: <
>>>> CANgSV69hyqBbVb8_8zShsTLrpDy-37fjNwYVXCe-XF7DphQ8oA@mail.gmail.com>
>>>> Subject: Re: Spark streaming DStream state on worker
>>>> From: Renyi Xiong <re...@gmail.com>
>>>> To: Shixiong Zhu <zs...@gmail.com>
>>>> Cc: "user@spark.apache.org" <us...@spark.apache.org>
>>>> Content-Type: multipart/alternative;
>>>> boundary=001a11411fde922c170520d71928
>>>>
>>>> --001a11411fde922c170520d71928
>>>> Content-Type: text/plain; charset=UTF-8
>>>>
>>>> you answered my question I think that RDDs from same DStream not
>>>> guaranteed
>>>> to run on same worker
>>>>
>>>> On Thu, Sep 24, 2015 at 1:51 AM, Shixiong Zhu <zs...@gmail.com>
>>>> wrote:
>>>>
>>>> > +user, -dev
>>>> >
>>>> > It's not clear about `compute` in your question. There are two
>>>> `compute`
>>>> > here.
>>>> >
>>>> > 1. DStream.compute: it always runs in the driver, and all RDDs are
>>>> created
>>>> > in the driver. E.g.,
>>>> >
>>>> > DStream.foreachRDD(rdd => rdd.count())
>>>> >
>>>> > "rdd.count()" is called in the driver.
>>>> >
>>>> > 2. RDD.compute: this will run in the executor and the location is not
>>>> > guaranteed. E.g.,
>>>> >
>>>> > DStream.foreachRDD(rdd => rdd.foreach { v =>
>>>> >     println(v)
>>>> > })
>>>> >
>>>> > "println(v)" is called in the executor.
>>>> >
>>>> >
>>>> > Best Regards,
>>>> > Shixiong Zhu
>>>> >
>>>> > 2015-09-17 3:47 GMT+08:00 Renyi Xiong <re...@gmail.com>:
>>>> >
>>>> >> Hi,
>>>> >>
>>>> >> I want to do temporal join operation on DStream across RDDs, my
>>>> question
>>>> >> is: Are RDDs from same DStream always computed on same worker (except
>>>> >> failover) ?
>>>> >>
>>>> >> thanks,
>>>> >> Renyi.
>>>> >>
>>>> >
>>>> >
>>>> >
>>>>
>>>> --001a11411fde922c170520d71928
>>>> Content-Type: text/html; charset=UTF-8
>>>> Content-Transfer-Encoding: quoted-printable
>>>>
>>>> <div dir=3D"ltr">you answered my question I think that RDDs from same
>>>> DStre=
>>>> am not guaranteed to run on same worker</div><div
>>>> class=3D"gmail_extra"><br=
>>>> ><div class=3D"gmail_quote">On Thu, Sep 24, 2015 at 1:51 AM, Shixiong
>>>> Zhu <=
>>>> span dir=3D"ltr">&lt;<a href=3D"mailto:zsxwing@gmail.com"
>>>> target=3D"_blank"=
>>>> >zsxwing@gmail.com</a>&gt;</span> wrote:<br><blockquote
>>>> class=3D"gmail_quot=
>>>> e" style=3D"margin:0 0 0 .8ex;border-left:1px #ccc
>>>> solid;padding-left:1ex">=
>>>> <div dir=3D"ltr"><div class=3D"gmail_quote"><div
>>>> dir=3D"ltr"><div>+user, -d=
>>>> ev</div><div><div class=3D"h5"><div><br></div><div>It&#39;s not clear
>>>> about=
>>>>  `compute` in your question. There are two `compute`
>>>> here.</div><div><br></=
>>>> div><div>1. DStream.compute: it always runs in the driver, and all RDDs
>>>> are=
>>>>  created in the driver.
>>>> E.g.,=C2=A0</div><div><br></div><div>DStream.foreac=
>>>> hRDD(rdd =3D&gt;
>>>> rdd.count())</div><div><br></div><div>&quot;rdd.count()&qu=
>>>> ot; is called in the driver.</div><div><br></div><div>2. RDD.compute:
>>>> this =
>>>> will run in the executor and the location is not guaranteed.
>>>> E.g.,</div><di=
>>>> v><br></div><div>DStream.foreachRDD(rdd =3D&gt; rdd.foreach { v
>>>> =3D&gt;</di=
>>>> v><div>=C2=A0 =C2=A0
>>>> println(v)</div><div>})<br></div><div><br></div><div>&=
>>>> quot;println(v)&quot; is called in the
>>>> executor.</div><br></div></div></div=
>>>> ><div><div class=3D"h5"><div class=3D"gmail_extra"><br
>>>> clear=3D"all"><div><=
>>>> div><div dir=3D"ltr"><div><div dir=3D"ltr"><div><div
>>>> dir=3D"ltr"><div><div =
>>>> dir=3D"ltr"><p>Best Regards,</p><div>Shixiong
>>>> Zhu</div></div></div></div></=
>>>> div></div></div></div></div></div><div><div>
>>>> <br><div class=3D"gmail_quote">2015-09-17 3:47 GMT+08:00 Renyi Xiong
>>>> <span =
>>>> dir=3D"ltr">&lt;<a href=3D"mailto:renyixiong0@gmail.com"
>>>> target=3D"_blank">=
>>>> renyixiong0@gmail.com</a>&gt;</span>:<br><blockquote
>>>> class=3D"gmail_quote" =
>>>> style=3D"margin:0 0 0 .8ex;border-left:1px #ccc
>>>> solid;padding-left:1ex"><di=
>>>> v dir=3D"ltr"><div>Hi,</div><div><br></div><div>I want to
>>>> do=C2=A0temporal =
>>>> join operation on DStream across RDDs, my question is: Are RDDs from
>>>> same D=
>>>> Stream always computed on same worker (except failover)
>>>> ?</div><div><br></d=
>>>> iv><div>thanks,</div><div>Renyi.</div></div>
>>>> </blockquote></div><br></div></div></div>
>>>> </div></div></div><br></div>
>>>> </blockquote></div><br></div>
>>>>
>>>> --001a11411fde922c170520d71928--
>>>>
>>>
>>>
>>
>

Re: failure notice

Posted by Renyi Xiong <re...@gmail.com>.
yes, it can recover on a different node. it uses write-ahead-log,
checkpoints offsets of both ingress and egress (e.g. using zookeeper and/or
kafka), replies on the streaming engine's deterministic operations.

by replaying back a certain range of data based on checkpointed
ingress offset (at least once semantic), state can be recovered, and
filters out duplicate events based on checkpointed egress offset (at most
once semantic)

hope it makes sense.

On Mon, Oct 5, 2015 at 3:11 PM, Tathagata Das <td...@databricks.com> wrote:

> What happens when a whole node running  your " per node streaming engine
> (built-in checkpoint and recovery)" fails? Can its checkpoint and recovery
> mechanism handle whole node failure? Can you recover from the checkpoint on
> a different node?
>
> Spark and Spark Streaming were designed with the idea that executors are
> disposable, and there should not be any node-specific long term state that
> you rely on unless you can recover that state on a different node.
>
> On Mon, Oct 5, 2015 at 3:03 PM, Renyi Xiong <re...@gmail.com> wrote:
>
>> if RDDs from same DStream not guaranteed to run on same worker, then the
>> question becomes:
>>
>> is it possible to specify an unlimited duration in ssc to have a
>> continuous stream (as opposed to discretized).
>>
>> say, we have a per node streaming engine (built-in checkpoint and
>> recovery) we'd like to integrate with spark streaming. can we have a
>> never-ending batch (or RDD) this way?
>>
>> On Mon, Sep 28, 2015 at 4:31 PM, <MA...@apache.org> wrote:
>>
>>> Hi. This is the qmail-send program at apache.org.
>>> I'm afraid I wasn't able to deliver your message to the following
>>> addresses.
>>> This is a permanent error; I've given up. Sorry it didn't work out.
>>>
>>> <us...@spark.apache.org>:
>>> Must be sent from an @apache.org address or a subscriber address or an
>>> address in LDAP.
>>>
>>> --- Below this line is a copy of the message.
>>>
>>> Return-Path: <re...@gmail.com>
>>> Received: (qmail 95559 invoked by uid 99); 28 Sep 2015 23:31:46 -0000
>>> Received: from Unknown (HELO spamd3-us-west.apache.org) (209.188.14.142)
>>>     by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 28 Sep 2015 23:31:46
>>> +0000
>>> Received: from localhost (localhost [127.0.0.1])
>>>         by spamd3-us-west.apache.org (ASF Mail Server at
>>> spamd3-us-west.apache.org) with ESMTP id 96E361809BA
>>>         for <us...@spark.apache.org>; Mon, 28 Sep 2015 23:31:45 +0000
>>> (UTC)
>>> X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org
>>> X-Spam-Flag: NO
>>> X-Spam-Score: 3.129
>>> X-Spam-Level: ***
>>> X-Spam-Status: No, score=3.129 tagged_above=-999 required=6.31
>>>         tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1,
>>>         FREEMAIL_ENVFROM_END_DIGIT=0.25, HTML_MESSAGE=3,
>>>         RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01,
>>> SPF_PASS=-0.001]
>>>         autolearn=disabled
>>> Authentication-Results: spamd3-us-west.apache.org (amavisd-new);
>>>         dkim=pass (2048-bit key) header.d=gmail.com
>>> Received: from mx1-us-west.apache.org ([10.40.0.8])
>>>         by localhost (spamd3-us-west.apache.org [10.40.0.10])
>>> (amavisd-new, port 10024)
>>>         with ESMTP id FAGoohFE7Y7A for <us...@spark.apache.org>;
>>>         Mon, 28 Sep 2015 23:31:44 +0000 (UTC)
>>> Received: from mail-la0-f51.google.com (mail-la0-f51.google.com
>>> [209.85.215.51])
>>>         by mx1-us-west.apache.org (ASF Mail Server at
>>> mx1-us-west.apache.org) with ESMTPS id 2ED40204C9
>>>         for <us...@spark.apache.org>; Mon, 28 Sep 2015 23:31:44 +0000
>>> (UTC)
>>> Received: by labzv5 with SMTP id zv5so32919088lab.1
>>>         for <us...@spark.apache.org>; Mon, 28 Sep 2015 16:31:42 -0700
>>> (PDT)
>>> DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed;
>>>         d=gmail.com; s=20120113;
>>>
>>> h=mime-version:in-reply-to:references:date:message-id:subject:from:to
>>>          :cc:content-type;
>>>         bh=F36l+I4dfDHTL7nQ0K9mAW4aVtPpVpYc0rWWpPNjt4c=;
>>>
>>> b=QfRdLEWf4clJqwkZSH7n0oHjXLNifWdhYxvCDZ+P37oSfM0vd/8Bx962hTflRQkD1q
>>>
>>>  2B3go7g8bpnQlhZgMRrZfT6hk7vUtNA3lOZjYeN+cPyoVRaBwm3LIID5vF4cw5hFAWaM
>>>
>>>  LUenU7E7b9kJY8JkyhIfpya8CLKz+Yo6EjCv3W6BAvv2YiNPgbOQkpx7u8y9dw0kHGig
>>>
>>>  1hv37Ey/DZpoKCgbSesv+sztYslevu+VBgxHFkveEyxH1saRr6OqTM7fnL2E6fP4E8qu
>>>
>>>  W81G1ZfNW1Pp9i5IcCb/9S7YTZDnBlUj4yROsOfNANRGmed71QpQD9l8NnAQXmeqoeNF
>>>          SyEg==
>>> MIME-Version: 1.0
>>> X-Received: by 10.25.213.75 with SMTP id
>>> m72mr4047578lfg.17.1443483102618;
>>>  Mon, 28 Sep 2015 16:31:42 -0700 (PDT)
>>> Received: by 10.25.207.18 with HTTP; Mon, 28 Sep 2015 16:31:42 -0700
>>> (PDT)
>>> In-Reply-To: <CAPn6-YTZo+-A1HThmBO4mKO0sELrvxP7DZEpg3GoWN=Qz=
>>> 2LPA@mail.gmail.com>
>>> References: <
>>> CANgSV6-k+33GvgtiyNwhz2Gsbudf_WwwnazVUpbqe8QdCg_k3w@mail.gmail.com>
>>>         <CAPn6-YQ3Q-=HMrqz5FLLPx_HmjmHkHP7cwsPYvsxw-tb7a8P=
>>> g@mail.gmail.com>
>>>         <CAPn6-YTZo+-A1HThmBO4mKO0sELrvxP7DZEpg3GoWN=Qz=
>>> 2LPA@mail.gmail.com>
>>> Date: Mon, 28 Sep 2015 16:31:42 -0700
>>> Message-ID: <
>>> CANgSV69hyqBbVb8_8zShsTLrpDy-37fjNwYVXCe-XF7DphQ8oA@mail.gmail.com>
>>> Subject: Re: Spark streaming DStream state on worker
>>> From: Renyi Xiong <re...@gmail.com>
>>> To: Shixiong Zhu <zs...@gmail.com>
>>> Cc: "user@spark.apache.org" <us...@spark.apache.org>
>>> Content-Type: multipart/alternative;
>>> boundary=001a11411fde922c170520d71928
>>>
>>> --001a11411fde922c170520d71928
>>> Content-Type: text/plain; charset=UTF-8
>>>
>>> you answered my question I think that RDDs from same DStream not
>>> guaranteed
>>> to run on same worker
>>>
>>> On Thu, Sep 24, 2015 at 1:51 AM, Shixiong Zhu <zs...@gmail.com> wrote:
>>>
>>> > +user, -dev
>>> >
>>> > It's not clear about `compute` in your question. There are two
>>> `compute`
>>> > here.
>>> >
>>> > 1. DStream.compute: it always runs in the driver, and all RDDs are
>>> created
>>> > in the driver. E.g.,
>>> >
>>> > DStream.foreachRDD(rdd => rdd.count())
>>> >
>>> > "rdd.count()" is called in the driver.
>>> >
>>> > 2. RDD.compute: this will run in the executor and the location is not
>>> > guaranteed. E.g.,
>>> >
>>> > DStream.foreachRDD(rdd => rdd.foreach { v =>
>>> >     println(v)
>>> > })
>>> >
>>> > "println(v)" is called in the executor.
>>> >
>>> >
>>> > Best Regards,
>>> > Shixiong Zhu
>>> >
>>> > 2015-09-17 3:47 GMT+08:00 Renyi Xiong <re...@gmail.com>:
>>> >
>>> >> Hi,
>>> >>
>>> >> I want to do temporal join operation on DStream across RDDs, my
>>> question
>>> >> is: Are RDDs from same DStream always computed on same worker (except
>>> >> failover) ?
>>> >>
>>> >> thanks,
>>> >> Renyi.
>>> >>
>>> >
>>> >
>>> >
>>>
>>> --001a11411fde922c170520d71928
>>> Content-Type: text/html; charset=UTF-8
>>> Content-Transfer-Encoding: quoted-printable
>>>
>>> <div dir=3D"ltr">you answered my question I think that RDDs from same
>>> DStre=
>>> am not guaranteed to run on same worker</div><div
>>> class=3D"gmail_extra"><br=
>>> ><div class=3D"gmail_quote">On Thu, Sep 24, 2015 at 1:51 AM, Shixiong
>>> Zhu <=
>>> span dir=3D"ltr">&lt;<a href=3D"mailto:zsxwing@gmail.com"
>>> target=3D"_blank"=
>>> >zsxwing@gmail.com</a>&gt;</span> wrote:<br><blockquote
>>> class=3D"gmail_quot=
>>> e" style=3D"margin:0 0 0 .8ex;border-left:1px #ccc
>>> solid;padding-left:1ex">=
>>> <div dir=3D"ltr"><div class=3D"gmail_quote"><div dir=3D"ltr"><div>+user,
>>> -d=
>>> ev</div><div><div class=3D"h5"><div><br></div><div>It&#39;s not clear
>>> about=
>>>  `compute` in your question. There are two `compute`
>>> here.</div><div><br></=
>>> div><div>1. DStream.compute: it always runs in the driver, and all RDDs
>>> are=
>>>  created in the driver.
>>> E.g.,=C2=A0</div><div><br></div><div>DStream.foreac=
>>> hRDD(rdd =3D&gt;
>>> rdd.count())</div><div><br></div><div>&quot;rdd.count()&qu=
>>> ot; is called in the driver.</div><div><br></div><div>2. RDD.compute:
>>> this =
>>> will run in the executor and the location is not guaranteed.
>>> E.g.,</div><di=
>>> v><br></div><div>DStream.foreachRDD(rdd =3D&gt; rdd.foreach { v
>>> =3D&gt;</di=
>>> v><div>=C2=A0 =C2=A0
>>> println(v)</div><div>})<br></div><div><br></div><div>&=
>>> quot;println(v)&quot; is called in the
>>> executor.</div><br></div></div></div=
>>> ><div><div class=3D"h5"><div class=3D"gmail_extra"><br
>>> clear=3D"all"><div><=
>>> div><div dir=3D"ltr"><div><div dir=3D"ltr"><div><div
>>> dir=3D"ltr"><div><div =
>>> dir=3D"ltr"><p>Best Regards,</p><div>Shixiong
>>> Zhu</div></div></div></div></=
>>> div></div></div></div></div></div><div><div>
>>> <br><div class=3D"gmail_quote">2015-09-17 3:47 GMT+08:00 Renyi Xiong
>>> <span =
>>> dir=3D"ltr">&lt;<a href=3D"mailto:renyixiong0@gmail.com"
>>> target=3D"_blank">=
>>> renyixiong0@gmail.com</a>&gt;</span>:<br><blockquote
>>> class=3D"gmail_quote" =
>>> style=3D"margin:0 0 0 .8ex;border-left:1px #ccc
>>> solid;padding-left:1ex"><di=
>>> v dir=3D"ltr"><div>Hi,</div><div><br></div><div>I want to
>>> do=C2=A0temporal =
>>> join operation on DStream across RDDs, my question is: Are RDDs from
>>> same D=
>>> Stream always computed on same worker (except failover)
>>> ?</div><div><br></d=
>>> iv><div>thanks,</div><div>Renyi.</div></div>
>>> </blockquote></div><br></div></div></div>
>>> </div></div></div><br></div>
>>> </blockquote></div><br></div>
>>>
>>> --001a11411fde922c170520d71928--
>>>
>>
>>
>

Re: failure notice

Posted by Tathagata Das <td...@databricks.com>.
What happens when a whole node running  your " per node streaming engine
(built-in checkpoint and recovery)" fails? Can its checkpoint and recovery
mechanism handle whole node failure? Can you recover from the checkpoint on
a different node?

Spark and Spark Streaming were designed with the idea that executors are
disposable, and there should not be any node-specific long term state that
you rely on unless you can recover that state on a different node.

On Mon, Oct 5, 2015 at 3:03 PM, Renyi Xiong <re...@gmail.com> wrote:

> if RDDs from same DStream not guaranteed to run on same worker, then the
> question becomes:
>
> is it possible to specify an unlimited duration in ssc to have a
> continuous stream (as opposed to discretized).
>
> say, we have a per node streaming engine (built-in checkpoint and
> recovery) we'd like to integrate with spark streaming. can we have a
> never-ending batch (or RDD) this way?
>
> On Mon, Sep 28, 2015 at 4:31 PM, <MA...@apache.org> wrote:
>
>> Hi. This is the qmail-send program at apache.org.
>> I'm afraid I wasn't able to deliver your message to the following
>> addresses.
>> This is a permanent error; I've given up. Sorry it didn't work out.
>>
>> <us...@spark.apache.org>:
>> Must be sent from an @apache.org address or a subscriber address or an
>> address in LDAP.
>>
>> --- Below this line is a copy of the message.
>>
>> Return-Path: <re...@gmail.com>
>> Received: (qmail 95559 invoked by uid 99); 28 Sep 2015 23:31:46 -0000
>> Received: from Unknown (HELO spamd3-us-west.apache.org) (209.188.14.142)
>>     by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 28 Sep 2015 23:31:46
>> +0000
>> Received: from localhost (localhost [127.0.0.1])
>>         by spamd3-us-west.apache.org (ASF Mail Server at
>> spamd3-us-west.apache.org) with ESMTP id 96E361809BA
>>         for <us...@spark.apache.org>; Mon, 28 Sep 2015 23:31:45 +0000
>> (UTC)
>> X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org
>> X-Spam-Flag: NO
>> X-Spam-Score: 3.129
>> X-Spam-Level: ***
>> X-Spam-Status: No, score=3.129 tagged_above=-999 required=6.31
>>         tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1,
>>         FREEMAIL_ENVFROM_END_DIGIT=0.25, HTML_MESSAGE=3,
>>         RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, SPF_PASS=-0.001]
>>         autolearn=disabled
>> Authentication-Results: spamd3-us-west.apache.org (amavisd-new);
>>         dkim=pass (2048-bit key) header.d=gmail.com
>> Received: from mx1-us-west.apache.org ([10.40.0.8])
>>         by localhost (spamd3-us-west.apache.org [10.40.0.10])
>> (amavisd-new, port 10024)
>>         with ESMTP id FAGoohFE7Y7A for <us...@spark.apache.org>;
>>         Mon, 28 Sep 2015 23:31:44 +0000 (UTC)
>> Received: from mail-la0-f51.google.com (mail-la0-f51.google.com
>> [209.85.215.51])
>>         by mx1-us-west.apache.org (ASF Mail Server at
>> mx1-us-west.apache.org) with ESMTPS id 2ED40204C9
>>         for <us...@spark.apache.org>; Mon, 28 Sep 2015 23:31:44 +0000
>> (UTC)
>> Received: by labzv5 with SMTP id zv5so32919088lab.1
>>         for <us...@spark.apache.org>; Mon, 28 Sep 2015 16:31:42 -0700
>> (PDT)
>> DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed;
>>         d=gmail.com; s=20120113;
>>
>> h=mime-version:in-reply-to:references:date:message-id:subject:from:to
>>          :cc:content-type;
>>         bh=F36l+I4dfDHTL7nQ0K9mAW4aVtPpVpYc0rWWpPNjt4c=;
>>
>> b=QfRdLEWf4clJqwkZSH7n0oHjXLNifWdhYxvCDZ+P37oSfM0vd/8Bx962hTflRQkD1q
>>
>>  2B3go7g8bpnQlhZgMRrZfT6hk7vUtNA3lOZjYeN+cPyoVRaBwm3LIID5vF4cw5hFAWaM
>>
>>  LUenU7E7b9kJY8JkyhIfpya8CLKz+Yo6EjCv3W6BAvv2YiNPgbOQkpx7u8y9dw0kHGig
>>
>>  1hv37Ey/DZpoKCgbSesv+sztYslevu+VBgxHFkveEyxH1saRr6OqTM7fnL2E6fP4E8qu
>>
>>  W81G1ZfNW1Pp9i5IcCb/9S7YTZDnBlUj4yROsOfNANRGmed71QpQD9l8NnAQXmeqoeNF
>>          SyEg==
>> MIME-Version: 1.0
>> X-Received: by 10.25.213.75 with SMTP id m72mr4047578lfg.17.1443483102618;
>>  Mon, 28 Sep 2015 16:31:42 -0700 (PDT)
>> Received: by 10.25.207.18 with HTTP; Mon, 28 Sep 2015 16:31:42 -0700 (PDT)
>> In-Reply-To: <CAPn6-YTZo+-A1HThmBO4mKO0sELrvxP7DZEpg3GoWN=Qz=
>> 2LPA@mail.gmail.com>
>> References: <
>> CANgSV6-k+33GvgtiyNwhz2Gsbudf_WwwnazVUpbqe8QdCg_k3w@mail.gmail.com>
>>         <CAPn6-YQ3Q-=HMrqz5FLLPx_HmjmHkHP7cwsPYvsxw-tb7a8P=
>> g@mail.gmail.com>
>>         <CAPn6-YTZo+-A1HThmBO4mKO0sELrvxP7DZEpg3GoWN=Qz=
>> 2LPA@mail.gmail.com>
>> Date: Mon, 28 Sep 2015 16:31:42 -0700
>> Message-ID: <
>> CANgSV69hyqBbVb8_8zShsTLrpDy-37fjNwYVXCe-XF7DphQ8oA@mail.gmail.com>
>> Subject: Re: Spark streaming DStream state on worker
>> From: Renyi Xiong <re...@gmail.com>
>> To: Shixiong Zhu <zs...@gmail.com>
>> Cc: "user@spark.apache.org" <us...@spark.apache.org>
>> Content-Type: multipart/alternative; boundary=001a11411fde922c170520d71928
>>
>> --001a11411fde922c170520d71928
>> Content-Type: text/plain; charset=UTF-8
>>
>> you answered my question I think that RDDs from same DStream not
>> guaranteed
>> to run on same worker
>>
>> On Thu, Sep 24, 2015 at 1:51 AM, Shixiong Zhu <zs...@gmail.com> wrote:
>>
>> > +user, -dev
>> >
>> > It's not clear about `compute` in your question. There are two `compute`
>> > here.
>> >
>> > 1. DStream.compute: it always runs in the driver, and all RDDs are
>> created
>> > in the driver. E.g.,
>> >
>> > DStream.foreachRDD(rdd => rdd.count())
>> >
>> > "rdd.count()" is called in the driver.
>> >
>> > 2. RDD.compute: this will run in the executor and the location is not
>> > guaranteed. E.g.,
>> >
>> > DStream.foreachRDD(rdd => rdd.foreach { v =>
>> >     println(v)
>> > })
>> >
>> > "println(v)" is called in the executor.
>> >
>> >
>> > Best Regards,
>> > Shixiong Zhu
>> >
>> > 2015-09-17 3:47 GMT+08:00 Renyi Xiong <re...@gmail.com>:
>> >
>> >> Hi,
>> >>
>> >> I want to do temporal join operation on DStream across RDDs, my
>> question
>> >> is: Are RDDs from same DStream always computed on same worker (except
>> >> failover) ?
>> >>
>> >> thanks,
>> >> Renyi.
>> >>
>> >
>> >
>> >
>>
>> --001a11411fde922c170520d71928
>> Content-Type: text/html; charset=UTF-8
>> Content-Transfer-Encoding: quoted-printable
>>
>> <div dir=3D"ltr">you answered my question I think that RDDs from same
>> DStre=
>> am not guaranteed to run on same worker</div><div
>> class=3D"gmail_extra"><br=
>> ><div class=3D"gmail_quote">On Thu, Sep 24, 2015 at 1:51 AM, Shixiong Zhu
>> <=
>> span dir=3D"ltr">&lt;<a href=3D"mailto:zsxwing@gmail.com"
>> target=3D"_blank"=
>> >zsxwing@gmail.com</a>&gt;</span> wrote:<br><blockquote
>> class=3D"gmail_quot=
>> e" style=3D"margin:0 0 0 .8ex;border-left:1px #ccc
>> solid;padding-left:1ex">=
>> <div dir=3D"ltr"><div class=3D"gmail_quote"><div dir=3D"ltr"><div>+user,
>> -d=
>> ev</div><div><div class=3D"h5"><div><br></div><div>It&#39;s not clear
>> about=
>>  `compute` in your question. There are two `compute`
>> here.</div><div><br></=
>> div><div>1. DStream.compute: it always runs in the driver, and all RDDs
>> are=
>>  created in the driver.
>> E.g.,=C2=A0</div><div><br></div><div>DStream.foreac=
>> hRDD(rdd =3D&gt;
>> rdd.count())</div><div><br></div><div>&quot;rdd.count()&qu=
>> ot; is called in the driver.</div><div><br></div><div>2. RDD.compute:
>> this =
>> will run in the executor and the location is not guaranteed.
>> E.g.,</div><di=
>> v><br></div><div>DStream.foreachRDD(rdd =3D&gt; rdd.foreach { v
>> =3D&gt;</di=
>> v><div>=C2=A0 =C2=A0
>> println(v)</div><div>})<br></div><div><br></div><div>&=
>> quot;println(v)&quot; is called in the
>> executor.</div><br></div></div></div=
>> ><div><div class=3D"h5"><div class=3D"gmail_extra"><br
>> clear=3D"all"><div><=
>> div><div dir=3D"ltr"><div><div dir=3D"ltr"><div><div
>> dir=3D"ltr"><div><div =
>> dir=3D"ltr"><p>Best Regards,</p><div>Shixiong
>> Zhu</div></div></div></div></=
>> div></div></div></div></div></div><div><div>
>> <br><div class=3D"gmail_quote">2015-09-17 3:47 GMT+08:00 Renyi Xiong
>> <span =
>> dir=3D"ltr">&lt;<a href=3D"mailto:renyixiong0@gmail.com"
>> target=3D"_blank">=
>> renyixiong0@gmail.com</a>&gt;</span>:<br><blockquote
>> class=3D"gmail_quote" =
>> style=3D"margin:0 0 0 .8ex;border-left:1px #ccc
>> solid;padding-left:1ex"><di=
>> v dir=3D"ltr"><div>Hi,</div><div><br></div><div>I want to
>> do=C2=A0temporal =
>> join operation on DStream across RDDs, my question is: Are RDDs from same
>> D=
>> Stream always computed on same worker (except failover)
>> ?</div><div><br></d=
>> iv><div>thanks,</div><div>Renyi.</div></div>
>> </blockquote></div><br></div></div></div>
>> </div></div></div><br></div>
>> </blockquote></div><br></div>
>>
>> --001a11411fde922c170520d71928--
>>
>
>