You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Ryan Blue <rb...@netflix.com.INVALID> on 2018/11/14 00:00:45 UTC

DataSourceV2 sync tomorrow

Hi everyone,
I just wanted to send out a reminder that there’s a DSv2 sync tomorrow at
17:00 PST, which is 01:00 UTC.

Here are some of the topics under discussion in the last couple of weeks:

   - Read API for v2 - see Wenchen’s doc
   <https://docs.google.com/document/d/1uUmKCpWLdh9vHxP7AWJ9EgbwB_U6T3EJYNjhISGmiQg/edit?ts=5be4868a#heading=h.2h7sf1665hzn>
   - Capabilities API - see the dev list thread
   <https://mail-archives.apache.org/mod_mbox/spark-dev/201811.mbox/%3CCAO4re1%3Doizqo1oFfVViK3bKWCp7MROeATXcWAEUY5%2B8Vpf6GGw%40mail.gmail.com%3E>
   - Using CatalogTableIdentifier to reliably separate v2 code paths - see PR
   #21978 <https://github.com/apache/spark/pull/21978>
   - A replacement for InternalRow

I know that a lot of people are also interested in combining the source API
for micro-batch and continuous streaming. Wenchen and I have been
discussing a way to do that and Wenchen has added it to the Read API doc as
Alternative #2. I think this would be a good thing to plan on discussing.

rb

Here’s some additional background on combining micro-batch and continuous
APIs:

The basic idea is to update how tasks end so that the same tasks can be
used in micro-batch or streaming. For tasks that are naturally limited like
data files, when the data is exhausted, Spark stops reading. For tasks that
are not limited, like a Kafka partition, Spark decides when to stop in
micro-batch mode by hitting a pre-determined LocalOffset or Spark can just
keep running in continuous mode.

Note that a task deciding to stop can happen in both modes, either when a
task is exhausted in micro-batch or when a stream needs to be reconfigured
in continuous.

Here’s the task reader API. The offset returned is optional so that a task
can avoid stopping if there isn’t a resumeable offset, like if it is in the
middle of an input file:

interface StreamPartitionReader<T> extends InputPartitionReader<T> {
  Optional<LocalOffset> currentOffset();
  boolean next() // from InputPartitionReader
  T get()        // from InputPartitionReader
}

The streaming code would look something like this:

Stream stream = scan.toStream()
StreamReaderFactory factory = stream.createReaderFactory()

while (true) {
  Offset start = stream.currentOffset()
  Offset end = if (isContinuousMode) {
    None
  } else {
    // rate limiting would happen here
    Some(stream.latestOffset())
  }

  InputPartition[] parts = stream.planInputPartitions(start)

  // returns when needsReconfiguration is true or all tasks finish
  runTasks(parts, factory, end)

  // the stream's current offset has been updated at the last epoch
}

-- 
Ryan Blue
Software Engineer
Netflix

Re: DataSourceV2 sync tomorrow

Posted by Ryan Blue <rb...@netflix.com.INVALID>.
The live stream link for this is
https://stream.meet.google.com/stream/6be59d80-04c7-44dc-9042-4f3b597fc8ba

Some people said that it didn't work last time. I'm not sure why that would
happen, but I don't use these much so I'm no expert. If you can't join the
live stream, then feel free to join the meet up.

I'll also plan on joining earlier than I did last time, in case we the
meet/hangout needs to be up for people to view the live stream.

rb

On Tue, Nov 13, 2018 at 4:00 PM Ryan Blue <rb...@netflix.com> wrote:

> Hi everyone,
> I just wanted to send out a reminder that there’s a DSv2 sync tomorrow at
> 17:00 PST, which is 01:00 UTC.
>
> Here are some of the topics under discussion in the last couple of weeks:
>
>    - Read API for v2 - see Wenchen’s doc
>    <https://docs.google.com/document/d/1uUmKCpWLdh9vHxP7AWJ9EgbwB_U6T3EJYNjhISGmiQg/edit?ts=5be4868a#heading=h.2h7sf1665hzn>
>    - Capabilities API - see the dev list thread
>    <https://mail-archives.apache.org/mod_mbox/spark-dev/201811.mbox/%3CCAO4re1%3Doizqo1oFfVViK3bKWCp7MROeATXcWAEUY5%2B8Vpf6GGw%40mail.gmail.com%3E>
>    - Using CatalogTableIdentifier to reliably separate v2 code paths -
>    see PR #21978 <https://github.com/apache/spark/pull/21978>
>    - A replacement for InternalRow
>
> I know that a lot of people are also interested in combining the source
> API for micro-batch and continuous streaming. Wenchen and I have been
> discussing a way to do that and Wenchen has added it to the Read API doc as
> Alternative #2. I think this would be a good thing to plan on discussing.
>
> rb
>
> Here’s some additional background on combining micro-batch and continuous
> APIs:
>
> The basic idea is to update how tasks end so that the same tasks can be
> used in micro-batch or streaming. For tasks that are naturally limited like
> data files, when the data is exhausted, Spark stops reading. For tasks that
> are not limited, like a Kafka partition, Spark decides when to stop in
> micro-batch mode by hitting a pre-determined LocalOffset or Spark can just
> keep running in continuous mode.
>
> Note that a task deciding to stop can happen in both modes, either when a
> task is exhausted in micro-batch or when a stream needs to be reconfigured
> in continuous.
>
> Here’s the task reader API. The offset returned is optional so that a task
> can avoid stopping if there isn’t a resumeable offset, like if it is in the
> middle of an input file:
>
> interface StreamPartitionReader<T> extends InputPartitionReader<T> {
>   Optional<LocalOffset> currentOffset();
>   boolean next() // from InputPartitionReader
>   T get()        // from InputPartitionReader
> }
>
> The streaming code would look something like this:
>
> Stream stream = scan.toStream()
> StreamReaderFactory factory = stream.createReaderFactory()
>
> while (true) {
>   Offset start = stream.currentOffset()
>   Offset end = if (isContinuousMode) {
>     None
>   } else {
>     // rate limiting would happen here
>     Some(stream.latestOffset())
>   }
>
>   InputPartition[] parts = stream.planInputPartitions(start)
>
>   // returns when needsReconfiguration is true or all tasks finish
>   runTasks(parts, factory, end)
>
>   // the stream's current offset has been updated at the last epoch
> }
>
> --
> Ryan Blue
> Software Engineer
> Netflix
>


-- 
Ryan Blue
Software Engineer
Netflix

Re: DataSourceV2 sync tomorrow

Posted by Ryan Blue <rb...@netflix.com.INVALID>.
Below are my notes from the sync yesterday. Thanks to everyone that
participated!
For the format of this sync, I think it would help to make a small change.
Since we have so many people and it take a long time to introduce everyone,
let’s try to get to the content faster by not doing the round of
introductions and topic gathering. Instead, please send your topics to the
sync thread on this list ahead of time. Just make sure I have them and I’ll
add them to the invite and agenda, along with any links for background.

I also want to add a quick note about the live stream. After running a
couple of tests, it looks like live streams only work within an
organization. In the future, I won’t add a live stream since no one but
people from Netflix can join.

Last, here are the notes:

*Attendees*

Ryan Blue - Netflix
John Zhuge - Netflix
Yuanjian Li - Baidu - Interested in Catalog API
Felix Cheung - Uber
Hyukjin Kwon - Hortonworks
Vinoo Ganesh - Palantir
Soumya Sanyal - ?
Bruce Robbins - Cloudera
Alessandro Bellina - Oath, here to learn
Jamison Bennett - Cloudera - Interested in Catalog API
Anton Okolnychyi - Apple
Gengliang Wang - DataBricks - ORC source
Wenchen Fan - DataBricks
Dilip Biswal - IBM - Push-down of new operators like limit
Kevin Yu - IBM
Matt Cheah - Palantir - Interested in CBO
Austin Nobis - Cloudera
Jungtaek Lim - Hortonworks - Interested in exactly-once semantics
Vikram Agrawal - Custom metrics
Sribasti Chakravarti

*Suggested Topics*

   - DSv2 API changes
      - New proposal
      - Alternative #1: Combining Scan with Batch or Stream
      - Alternative #2: Combining micro-batch and continuous APIs
   - Capabilities API
   - CatalogTableIdentifier
   - Push-down API
   - CBO and stats API
   - Exactly-once semantics

*Discussion*

The entire discussion was about the DSv2 API changes in Wenchen’s design
doc.

   - Wenchen went through the current status and the new proposal.
      - Not many questions, the API and behavior are clear and
      understandable.
      - Some discussion, started by Dilip about how join push-down will
      work. Ryan noted that we just need to make sure that the design doesn’t
      preclude reasonable options for later. Wenchen suggested one such option,
      to add methods to push a join into the ScanBuilder. It isn’t clear how
      exactly this will work, but consensus seemed to be that this
will not break
      later designs. Dilip has a join push-down design doc (please reply with a
      link!).
      - Consensus was to go with the new proposal.
   - Wenchen went through alternative #1, which merges Scan into the next
   layer to produce BatchScan, MicroBatchStreamScan, ContinuousStreamScan
      - Ryan’s commentary: concerned that Scan is a distinct concept and
      may be useful in implementations. Would merging it into other
objects cause
      duplication or force an inheritance hierarchy? Clearly, the
names show that
      it is mixing two concepts: BatchScan = Batch + Scan
      - Matt commented that it seems unlikely that Scan will be
      independently useful
      - Wenchen noted that we can merge later if it isn’t useful
      - Ryan noted that separate interfaces give the most flexibility for
      implementations. An implementation can create BatchScan that
extends both.
      - Conclusion: keep the interfaces separate for now and reassess later.
   - Ryan went through alternative #2, which merges micro-batch and
   continuous read interfaces
      - To merge execution code, Spark would be responsible for stopping
      tasks. Tasks would attempt to read forever and Spark determines
whether to
      run a batch or run forever.
      - Some tasks are naturally limited, like data files added to a table.
      Spark would need to handle tasks stopping themselves early.
      - Some tasks are naturally boundless, like Kafka topic partitions.
      Tasks would need to provide offsets for Spark to decide when to stop
      reading.
      - The resulting task reader behavior is awkward and no longer fits
      either naturally limited (must provide “offset”) nor naturally boundless
      tasks (why stop early? why use micro-batch?)
      - Conclusion was to have simpler APIs by keeping modes separate.


On Tue, Nov 13, 2018 at 4:00 PM Ryan Blue <rb...@netflix.com> wrote:

> Hi everyone,
> I just wanted to send out a reminder that there’s a DSv2 sync tomorrow at
> 17:00 PST, which is 01:00 UTC.
>
> Here are some of the topics under discussion in the last couple of weeks:
>
>    - Read API for v2 - see Wenchen’s doc
>    <https://docs.google.com/document/d/1uUmKCpWLdh9vHxP7AWJ9EgbwB_U6T3EJYNjhISGmiQg/edit?ts=5be4868a#heading=h.2h7sf1665hzn>
>    - Capabilities API - see the dev list thread
>    <https://mail-archives.apache.org/mod_mbox/spark-dev/201811.mbox/%3CCAO4re1%3Doizqo1oFfVViK3bKWCp7MROeATXcWAEUY5%2B8Vpf6GGw%40mail.gmail.com%3E>
>    - Using CatalogTableIdentifier to reliably separate v2 code paths -
>    see PR #21978 <https://github.com/apache/spark/pull/21978>
>    - A replacement for InternalRow
>
> I know that a lot of people are also interested in combining the source
> API for micro-batch and continuous streaming. Wenchen and I have been
> discussing a way to do that and Wenchen has added it to the Read API doc as
> Alternative #2. I think this would be a good thing to plan on discussing.
>
> rb
>
> Here’s some additional background on combining micro-batch and continuous
> APIs:
>
> The basic idea is to update how tasks end so that the same tasks can be
> used in micro-batch or streaming. For tasks that are naturally limited like
> data files, when the data is exhausted, Spark stops reading. For tasks that
> are not limited, like a Kafka partition, Spark decides when to stop in
> micro-batch mode by hitting a pre-determined LocalOffset or Spark can just
> keep running in continuous mode.
>
> Note that a task deciding to stop can happen in both modes, either when a
> task is exhausted in micro-batch or when a stream needs to be reconfigured
> in continuous.
>
> Here’s the task reader API. The offset returned is optional so that a task
> can avoid stopping if there isn’t a resumeable offset, like if it is in the
> middle of an input file:
>
> interface StreamPartitionReader<T> extends InputPartitionReader<T> {
>   Optional<LocalOffset> currentOffset();
>   boolean next() // from InputPartitionReader
>   T get()        // from InputPartitionReader
> }
>
> The streaming code would look something like this:
>
> Stream stream = scan.toStream()
> StreamReaderFactory factory = stream.createReaderFactory()
>
> while (true) {
>   Offset start = stream.currentOffset()
>   Offset end = if (isContinuousMode) {
>     None
>   } else {
>     // rate limiting would happen here
>     Some(stream.latestOffset())
>   }
>
>   InputPartition[] parts = stream.planInputPartitions(start)
>
>   // returns when needsReconfiguration is true or all tasks finish
>   runTasks(parts, factory, end)
>
>   // the stream's current offset has been updated at the last epoch
> }
>
> --
> Ryan Blue
> Software Engineer
> Netflix
>


-- 
Ryan Blue
Software Engineer
Netflix

Re: DataSourceV2 sync tomorrow

Posted by Cody Koeninger <co...@koeninger.org>.
Am I the only one for whom the livestream link didn't work last time?
Would like to be able to at least watch the discussion this time
around.
On Tue, Nov 13, 2018 at 6:01 PM Ryan Blue <rb...@netflix.com.invalid> wrote:
>
> Hi everyone,
> I just wanted to send out a reminder that there’s a DSv2 sync tomorrow at 17:00 PST, which is 01:00 UTC.
>
> Here are some of the topics under discussion in the last couple of weeks:
>
> Read API for v2 - see Wenchen’s doc
> Capabilities API - see the dev list thread
> Using CatalogTableIdentifier to reliably separate v2 code paths - see PR #21978
> A replacement for InternalRow
>
> I know that a lot of people are also interested in combining the source API for micro-batch and continuous streaming. Wenchen and I have been discussing a way to do that and Wenchen has added it to the Read API doc as Alternative #2. I think this would be a good thing to plan on discussing.
>
> rb
>
> Here’s some additional background on combining micro-batch and continuous APIs:
>
> The basic idea is to update how tasks end so that the same tasks can be used in micro-batch or streaming. For tasks that are naturally limited like data files, when the data is exhausted, Spark stops reading. For tasks that are not limited, like a Kafka partition, Spark decides when to stop in micro-batch mode by hitting a pre-determined LocalOffset or Spark can just keep running in continuous mode.
>
> Note that a task deciding to stop can happen in both modes, either when a task is exhausted in micro-batch or when a stream needs to be reconfigured in continuous.
>
> Here’s the task reader API. The offset returned is optional so that a task can avoid stopping if there isn’t a resumeable offset, like if it is in the middle of an input file:
>
> interface StreamPartitionReader<T> extends InputPartitionReader<T> {
>   Optional<LocalOffset> currentOffset();
>   boolean next() // from InputPartitionReader
>   T get()        // from InputPartitionReader
> }
>
> The streaming code would look something like this:
>
> Stream stream = scan.toStream()
> StreamReaderFactory factory = stream.createReaderFactory()
>
> while (true) {
>   Offset start = stream.currentOffset()
>   Offset end = if (isContinuousMode) {
>     None
>   } else {
>     // rate limiting would happen here
>     Some(stream.latestOffset())
>   }
>
>   InputPartition[] parts = stream.planInputPartitions(start)
>
>   // returns when needsReconfiguration is true or all tasks finish
>   runTasks(parts, factory, end)
>
>   // the stream's current offset has been updated at the last epoch
> }
>
> --
> Ryan Blue
> Software Engineer
> Netflix

---------------------------------------------------------------------
To unsubscribe e-mail: dev-unsubscribe@spark.apache.org


Re: DataSourceV2 sync tomorrow

Posted by Ryan Blue <rb...@netflix.com.INVALID>.
Jamison, I've added you to the invite. If anyone else wants to be invited,
please send me a request. You can send it directly to me to avoid too many
messages on this thread.

On Wed, Nov 14, 2018 at 8:57 AM Jamison Bennett
<ja...@cloudera.com.invalid> wrote:

> Hi Spark Team,
>
> I am interested in joining this meeting because I am interested in the
> data source v2 APIs. I couldn't find information about this meeting, so
> could someone please share the link?
>
> Thanks,
>
> Jamison Bennett
>
> Cloudera Software Engineer
>
> jamison.bennett@cloudera.com
>
> 515 Congress Ave, Suite 1212   |   Austin, TX   |   78701
>
>
> On Wed, Nov 14, 2018 at 1:51 AM Arun Mahadevan <ar...@apache.org> wrote:
>
>> IMO, the currentOffset should not be optional.
>> For continuous mode I assume this offset gets periodically check pointed
>> (so mandatory) ?
>> For the micro batch mode the currentOffset would be the start offset for
>> a micro-batch.
>>
>> And if the micro-batch could be executed without knowing the 'latest'
>> offset (say until 'next' returns false), we only need the current offset
>> (to figure out the offset boundaries of a micro-batch) and may be then the
>> 'latest' offset is not needed at all.
>>
>> - Arun
>>
>>
>> On Tue, 13 Nov 2018 at 16:01, Ryan Blue <rb...@netflix.com.invalid>
>> wrote:
>>
>>> Hi everyone,
>>> I just wanted to send out a reminder that there’s a DSv2 sync tomorrow
>>> at 17:00 PST, which is 01:00 UTC.
>>>
>>> Here are some of the topics under discussion in the last couple of weeks:
>>>
>>>    - Read API for v2 - see Wenchen’s doc
>>>    <https://docs.google.com/document/d/1uUmKCpWLdh9vHxP7AWJ9EgbwB_U6T3EJYNjhISGmiQg/edit?ts=5be4868a#heading=h.2h7sf1665hzn>
>>>    - Capabilities API - see the dev list thread
>>>    <https://mail-archives.apache.org/mod_mbox/spark-dev/201811.mbox/%3CCAO4re1%3Doizqo1oFfVViK3bKWCp7MROeATXcWAEUY5%2B8Vpf6GGw%40mail.gmail.com%3E>
>>>    - Using CatalogTableIdentifier to reliably separate v2 code paths -
>>>    see PR #21978 <https://github.com/apache/spark/pull/21978>
>>>    - A replacement for InternalRow
>>>
>>> I know that a lot of people are also interested in combining the source
>>> API for micro-batch and continuous streaming. Wenchen and I have been
>>> discussing a way to do that and Wenchen has added it to the Read API doc as
>>> Alternative #2. I think this would be a good thing to plan on discussing.
>>>
>>> rb
>>>
>>> Here’s some additional background on combining micro-batch and
>>> continuous APIs:
>>>
>>> The basic idea is to update how tasks end so that the same tasks can be
>>> used in micro-batch or streaming. For tasks that are naturally limited like
>>> data files, when the data is exhausted, Spark stops reading. For tasks that
>>> are not limited, like a Kafka partition, Spark decides when to stop in
>>> micro-batch mode by hitting a pre-determined LocalOffset or Spark can just
>>> keep running in continuous mode.
>>>
>>> Note that a task deciding to stop can happen in both modes, either when
>>> a task is exhausted in micro-batch or when a stream needs to be
>>> reconfigured in continuous.
>>>
>>> Here’s the task reader API. The offset returned is optional so that a
>>> task can avoid stopping if there isn’t a resumeable offset, like if it is
>>> in the middle of an input file:
>>>
>>> interface StreamPartitionReader<T> extends InputPartitionReader<T> {
>>>   Optional<LocalOffset> currentOffset();
>>>   boolean next() // from InputPartitionReader
>>>   T get()        // from InputPartitionReader
>>> }
>>>
>>> The streaming code would look something like this:
>>>
>>> Stream stream = scan.toStream()
>>> StreamReaderFactory factory = stream.createReaderFactory()
>>>
>>> while (true) {
>>>   Offset start = stream.currentOffset()
>>>   Offset end = if (isContinuousMode) {
>>>     None
>>>   } else {
>>>     // rate limiting would happen here
>>>     Some(stream.latestOffset())
>>>   }
>>>
>>>   InputPartition[] parts = stream.planInputPartitions(start)
>>>
>>>   // returns when needsReconfiguration is true or all tasks finish
>>>   runTasks(parts, factory, end)
>>>
>>>   // the stream's current offset has been updated at the last epoch
>>> }
>>>
>>> --
>>> Ryan Blue
>>> Software Engineer
>>> Netflix
>>>
>>

-- 
Ryan Blue
Software Engineer
Netflix

Re: DataSourceV2 sync tomorrow

Posted by Jamison Bennett <ja...@cloudera.com.INVALID>.
Hi Spark Team,

I am interested in joining this meeting because I am interested in the data
source v2 APIs. I couldn't find information about this meeting, so could
someone please share the link?

Thanks,

Jamison Bennett

Cloudera Software Engineer

jamison.bennett@cloudera.com

515 Congress Ave, Suite 1212   |   Austin, TX   |   78701


On Wed, Nov 14, 2018 at 1:51 AM Arun Mahadevan <ar...@apache.org> wrote:

> IMO, the currentOffset should not be optional.
> For continuous mode I assume this offset gets periodically check pointed
> (so mandatory) ?
> For the micro batch mode the currentOffset would be the start offset for a
> micro-batch.
>
> And if the micro-batch could be executed without knowing the 'latest'
> offset (say until 'next' returns false), we only need the current offset
> (to figure out the offset boundaries of a micro-batch) and may be then the
> 'latest' offset is not needed at all.
>
> - Arun
>
>
> On Tue, 13 Nov 2018 at 16:01, Ryan Blue <rb...@netflix.com.invalid> wrote:
>
>> Hi everyone,
>> I just wanted to send out a reminder that there’s a DSv2 sync tomorrow at
>> 17:00 PST, which is 01:00 UTC.
>>
>> Here are some of the topics under discussion in the last couple of weeks:
>>
>>    - Read API for v2 - see Wenchen’s doc
>>    <https://docs.google.com/document/d/1uUmKCpWLdh9vHxP7AWJ9EgbwB_U6T3EJYNjhISGmiQg/edit?ts=5be4868a#heading=h.2h7sf1665hzn>
>>    - Capabilities API - see the dev list thread
>>    <https://mail-archives.apache.org/mod_mbox/spark-dev/201811.mbox/%3CCAO4re1%3Doizqo1oFfVViK3bKWCp7MROeATXcWAEUY5%2B8Vpf6GGw%40mail.gmail.com%3E>
>>    - Using CatalogTableIdentifier to reliably separate v2 code paths -
>>    see PR #21978 <https://github.com/apache/spark/pull/21978>
>>    - A replacement for InternalRow
>>
>> I know that a lot of people are also interested in combining the source
>> API for micro-batch and continuous streaming. Wenchen and I have been
>> discussing a way to do that and Wenchen has added it to the Read API doc as
>> Alternative #2. I think this would be a good thing to plan on discussing.
>>
>> rb
>>
>> Here’s some additional background on combining micro-batch and continuous
>> APIs:
>>
>> The basic idea is to update how tasks end so that the same tasks can be
>> used in micro-batch or streaming. For tasks that are naturally limited like
>> data files, when the data is exhausted, Spark stops reading. For tasks that
>> are not limited, like a Kafka partition, Spark decides when to stop in
>> micro-batch mode by hitting a pre-determined LocalOffset or Spark can just
>> keep running in continuous mode.
>>
>> Note that a task deciding to stop can happen in both modes, either when a
>> task is exhausted in micro-batch or when a stream needs to be reconfigured
>> in continuous.
>>
>> Here’s the task reader API. The offset returned is optional so that a
>> task can avoid stopping if there isn’t a resumeable offset, like if it is
>> in the middle of an input file:
>>
>> interface StreamPartitionReader<T> extends InputPartitionReader<T> {
>>   Optional<LocalOffset> currentOffset();
>>   boolean next() // from InputPartitionReader
>>   T get()        // from InputPartitionReader
>> }
>>
>> The streaming code would look something like this:
>>
>> Stream stream = scan.toStream()
>> StreamReaderFactory factory = stream.createReaderFactory()
>>
>> while (true) {
>>   Offset start = stream.currentOffset()
>>   Offset end = if (isContinuousMode) {
>>     None
>>   } else {
>>     // rate limiting would happen here
>>     Some(stream.latestOffset())
>>   }
>>
>>   InputPartition[] parts = stream.planInputPartitions(start)
>>
>>   // returns when needsReconfiguration is true or all tasks finish
>>   runTasks(parts, factory, end)
>>
>>   // the stream's current offset has been updated at the last epoch
>> }
>>
>> --
>> Ryan Blue
>> Software Engineer
>> Netflix
>>
>

Re: DataSourceV2 sync tomorrow

Posted by Arun Mahadevan <ar...@apache.org>.
IMO, the currentOffset should not be optional.
For continuous mode I assume this offset gets periodically check pointed
(so mandatory) ?
For the micro batch mode the currentOffset would be the start offset for a
micro-batch.

And if the micro-batch could be executed without knowing the 'latest'
offset (say until 'next' returns false), we only need the current offset
(to figure out the offset boundaries of a micro-batch) and may be then the
'latest' offset is not needed at all.

- Arun


On Tue, 13 Nov 2018 at 16:01, Ryan Blue <rb...@netflix.com.invalid> wrote:

> Hi everyone,
> I just wanted to send out a reminder that there’s a DSv2 sync tomorrow at
> 17:00 PST, which is 01:00 UTC.
>
> Here are some of the topics under discussion in the last couple of weeks:
>
>    - Read API for v2 - see Wenchen’s doc
>    <https://docs.google.com/document/d/1uUmKCpWLdh9vHxP7AWJ9EgbwB_U6T3EJYNjhISGmiQg/edit?ts=5be4868a#heading=h.2h7sf1665hzn>
>    - Capabilities API - see the dev list thread
>    <https://mail-archives.apache.org/mod_mbox/spark-dev/201811.mbox/%3CCAO4re1%3Doizqo1oFfVViK3bKWCp7MROeATXcWAEUY5%2B8Vpf6GGw%40mail.gmail.com%3E>
>    - Using CatalogTableIdentifier to reliably separate v2 code paths -
>    see PR #21978 <https://github.com/apache/spark/pull/21978>
>    - A replacement for InternalRow
>
> I know that a lot of people are also interested in combining the source
> API for micro-batch and continuous streaming. Wenchen and I have been
> discussing a way to do that and Wenchen has added it to the Read API doc as
> Alternative #2. I think this would be a good thing to plan on discussing.
>
> rb
>
> Here’s some additional background on combining micro-batch and continuous
> APIs:
>
> The basic idea is to update how tasks end so that the same tasks can be
> used in micro-batch or streaming. For tasks that are naturally limited like
> data files, when the data is exhausted, Spark stops reading. For tasks that
> are not limited, like a Kafka partition, Spark decides when to stop in
> micro-batch mode by hitting a pre-determined LocalOffset or Spark can just
> keep running in continuous mode.
>
> Note that a task deciding to stop can happen in both modes, either when a
> task is exhausted in micro-batch or when a stream needs to be reconfigured
> in continuous.
>
> Here’s the task reader API. The offset returned is optional so that a task
> can avoid stopping if there isn’t a resumeable offset, like if it is in the
> middle of an input file:
>
> interface StreamPartitionReader<T> extends InputPartitionReader<T> {
>   Optional<LocalOffset> currentOffset();
>   boolean next() // from InputPartitionReader
>   T get()        // from InputPartitionReader
> }
>
> The streaming code would look something like this:
>
> Stream stream = scan.toStream()
> StreamReaderFactory factory = stream.createReaderFactory()
>
> while (true) {
>   Offset start = stream.currentOffset()
>   Offset end = if (isContinuousMode) {
>     None
>   } else {
>     // rate limiting would happen here
>     Some(stream.latestOffset())
>   }
>
>   InputPartition[] parts = stream.planInputPartitions(start)
>
>   // returns when needsReconfiguration is true or all tasks finish
>   runTasks(parts, factory, end)
>
>   // the stream's current offset has been updated at the last epoch
> }
>
> --
> Ryan Blue
> Software Engineer
> Netflix
>