You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@apex.apache.org by Thomas Weise <th...@datatorrent.com> on 2016/02/26 19:54:20 UTC

Re: [APEXMALHAR-1965] Create a WAL in Malhar

Tushar, any update on this?


On Sun, Jan 24, 2016 at 9:19 PM, Thomas Weise <th...@datatorrent.com>
wrote:

> Package should be then be org.apache.apex.malhar.lib.wal
>
> I would suggest to later put the WindowDataManager there as well. Chandni?
>
>
> On Thu, Jan 21, 2016 at 11:04 PM, Tushar Gosavi <tu...@datatorrent.com>
> wrote:
>
>> I will put these utilities under library, as they do not depend on
>> external
>> packages. What should be package name, com.datatorrent.library.wal?
>> or should we switch to apache apex namespace like org.apache.apex.lib.wal
>> ?
>>
>> - Tushar.
>>
>>
>> On Thu, Jan 21, 2016 at 10:21 PM, Thomas Weise <th...@datatorrent.com>
>> wrote:
>>
>> > Makes sense. Let's keep the basic WAL infrastructure and patterns on
>> top of
>> > it separate.
>> >
>> >
>> > On Thu, Jan 21, 2016 at 8:48 AM, Tushar Gosavi <tu...@datatorrent.com>
>> > wrote:
>> >
>> > > Hi Chandni,
>> > >
>> > > There is no change in plan, I will be working on basic WAL
>> > infrastructure.
>> > > I have not discussed about how to implement WindowDataManager using
>> WAL,
>> > > I know that you will be working on it.
>> > >
>> > > This mail is for declaring interface for Write Ahead Log and File
>> System
>> > > based
>> > > implementations for Write Ahead Log. The Classes I will be adding are
>> > > - WALReader - interface
>> > > - WALWriter - interface
>> > > - FSWALReader - Implementation of WALReader which writes to FS.
>> > > - FSWALWriter - Implementation of WALWriter which writes to FS.
>> > > - RollingFSWALReader - Implementation
>> > > - RollingFSWALWriter - Implementation
>> > > - WALManager - Interface.
>> > > - AbstractWALManager - Which is abstracted out functionality of
>> > WALManager
>> > > in HDHT.
>> > >
>> > > RecoverableOperator is not for idempotency, It is an abstracted out
>> > pattern
>> > > for the
>> > > operator like HDHT. It can added later if it is a common pattern.
>> > >
>> > > - Tushar.
>> > >
>> > > On Thu, Jan 21, 2016 at 6:07 PM, Chandni Singh <
>> chandni@datatorrent.com>
>> > > wrote:
>> > >
>> > > > Hi Tushar,
>> > > > I thought the plan was to change WindowDataManager to use WAL once
>> WAL
>> > is
>> > > > added to Malhar. I don't know when did this plan change.
>> > > >
>> > > > There is already a ticket created for it and I was going to work on
>> it
>> > > once
>> > > > WAL is moved to Malhar.
>> > > >
>> > > > https://datatorrent.atlassian.net/browse/SPOI-7116
>> > > >
>> > > > Also a related discussion happened here:
>> > > >
>> > > >
>> > > >
>> > >
>> >
>> http://mail-archives.apache.org/mod_mbox/apex-dev/201511.mbox/%3CCAKaBFBma3rtg-UV7H8Ao_cFZ3nR4u8QpKbJmuEggmrtgE2h5Kw@mail.gmail.com%3E
>> > > >
>> > > > I don't think we should add another utility which does what
>> > > > WindowDataManager does because
>> > > >
>> > > > 1. Duplicate utilities which try to achieve the same thing.
>> > > >
>> > > > 2. A bigger impact on operators that already work with
>> > WindowDataManager.
>> > > > This will make them backward incompatible as well.
>> > > >
>> > > > 3. IMO creating an abstract RecoverableOperator is not a flexible
>> > design.
>> > > > Idempotency entails higher cost. When it is a pluggable component of
>> > the
>> > > > operator, user has a choice to turn it off by setting NOOP data
>> manager
>> > > > when they don't care about idempotency.
>> > > >
>> > > > 4. IMO it is always better to enhance/change existing components
>> > instead
>> > > of
>> > > > adding new ones which are used for the same purpose.
>> > > >
>> > > > Thanks,
>> > > >
>> > > > Chandni
>> > > > On Jan 21, 2016 3:06 PM, "Tushar Gosavi" <tu...@datatorrent.com>
>> > wrote:
>> > > >
>> > > > > Hi All,
>> > > > > We am planing to add Utility classes in Malhar for providing Write
>> > > Ahead
>> > > > > Log capability to the operators.
>> > > > >
>> > > > > Motivation:
>> > > > > Reconstructing state of operator - Some operators keep huge state
>> in
>> > > > > memory, which causes check-pointing overhead and slows down
>> > > > > the processing if checkpoints are happening frequently. Such
>> operator
>> > > can
>> > > > > benefit by keeping in memory state as transient and allow
>> > > > > reconstructing of state from the stored tuples. The operator will
>> > write
>> > > > > tuples as they are arriving in the WAL and will process them.
>> > > > > During recovery of operator the tuples from WAL is read back to
>> > > > reconstruct
>> > > > > the in-memory state. (The processing of tuples needs to be
>> > idempotent).
>> > > > >
>> > > > > The operator which maintain their state on file system will also
>> > > benefit
>> > > > > from this, as they do not have to update persisted state
>> frequently.
>> > > They
>> > > > > could update persistent state after enough data is available in
>> > memory
>> > > > and
>> > > > > apply that data to persistent state. On failure in-memory state
>> > > > > will be reconstructed from the WAL.
>> > > > >
>> > > > > You can think of this functionality similar to the buffer server
>> only
>> > > > > persisted on the HDFS, and operator can explicitly manage purging.
>> > > > >
>> > > > > WAL can also be used to provide implementation for
>> WindowDataManager,
>> > > > which
>> > > > > keeps information about beginWindow, endWindow markers and
>> > information
>> > > > > about tuples between them and this information will be used to
>> replay
>> > > the
>> > > > > tuples in same order. Using WAL will result in fewer files as
>> compare
>> > > to
>> > > > > FSWindowDataManager.
>> > > > >
>> > > > > General Design
>> > > > > We will introduce two Interfaces
>> > > > >
>> > > > > WALWriter - this will have following methods
>> > > > >  - append : Append the data at the end of the WAL
>> > > > >  - getOffet : Return offset which will need to be tracked for
>> > recovery.
>> > > > >  - flush    : make sure that data is persisted on the external
>> > storage.
>> > > > >
>> > > > > WALReader - This will provide iterator like interface for
>> providing
>> > > > access
>> > > > > to the WAL.
>> > > > >  - seek  : seek at a particular offset.
>> > > > >  - advance : read the entry, returns valid if entry is available
>> > > > >  - get : get the current entry read by advanced.
>> > > > >  - getOffset return current offset in the WAL.
>> > > > >
>> > > > > The following implementation will be provided which works with DFS
>> > > > > FileSystems. These
>> > > > > classes will take a serializer for converting data to byte array
>> > before
>> > > > > writing and
>> > > > > converting object from byte array while reading.
>> > > > >
>> > > > > - FSWALReader implements WALReader
>> > > > > - FSWALWriter implements WALWriter
>> > > > >
>> > > > >
>> > > > > RollingFSWalReader, RollingFSWALWriter this implementation will
>> > support
>> > > > > rolling files based on
>> > > > > size of the log. These will internally use FSWALReader and
>> > FSWALWriter
>> > > > for
>> > > > > writing log segments.
>> > > > >
>> > > > > WALManager - This interface will provide following method
>> > > > >  - setup() setup WAL implementation and serializer to use.
>> > > > >  - runRecovery(Recoverable obj) where recoverable is also an
>> > interface
>> > > > > having just one method recovere(tuple) to recover the tuple which
>> is
>> > > read
>> > > > > from the WAL.
>> > > > >  - setStart(WALPointer start) set the marker, during recovery WAL
>> > will
>> > > > > start reading tuples from this offset. operator will call this
>> method
>> > > to
>> > > > > specify that the data before start pointer is not needed.
>> > > > >
>> > > > >
>> > > > > For example the Abstract implementaion of RecoverableOperator can
>> be
>> > > > > ```java
>> > > > > public abstract class RecoverableOperator<T> extends BaseOperator
>> > > > >   implements WAL.Recoverable<T>, Operator.CheckpointListener
>> > > > > {
>> > > > >   private WalManager<T> wm;
>> > > > >
>> > > > >   public transient DefaultInputPort<T> input = new
>> > > DefaultInputPort<T>()
>> > > > >   {
>> > > > >     @Override
>> > > > >     public void process(T t)
>> > > > >     {
>> > > > >       processTuple(t, false);
>> > > > >     }
>> > > > >   };
>> > > > >
>> > > > >   public void setup(Context.OperatorContext context)
>> > > > >   {
>> > > > >     wm.setup(context, new Serializer<T>());
>> > > > >     /* build any in-memory state */
>> > > > >     wm.runRecovery(this);
>> > > > >   }
>> > > > >
>> > > > >   void processTuple(T tuple, boolean recovery) throws IOException
>> > > > >   {
>> > > > >     // if this is called as part of normal processing, write it to
>> > the
>> > > > WAL.
>> > > > >     // in case of recovery, don't write tuple again to the WAL.
>> > > > >     if (!recovery)
>> > > > >       wm.write(tuple);
>> > > > >     processTuple(tuple);
>> > > > >   }
>> > > > >
>> > > > >   public void recoveryTuple(T tuple)
>> > > > >   {
>> > > > >     processTuple(tuple, true);
>> > > > >   }
>> > > > >
>> > > > >   public void committed(int id) {
>> > > > >      // update the pointer if before pointer is not needed.
>> > > > >      wm.setStart(pointer);
>> > > > >   }
>> > > > >
>> > > > >   protected abstract void processTuple(T tuple);
>> > > > > }
>> > > > > ```
>> > > > >
>> > > > > Let me know about your thought.
>> > > > >
>> > > > >  -Tushar.
>> > > > >
>> > > >
>> > >
>> >
>>
>
>

Re: [APEXMALHAR-1965] Create a WAL in Malhar

Posted by Tushar Gosavi <tu...@datatorrent.com>.
I have created a pull request with the changes
https://github.com/apache/incubator-apex-malhar/pull/204

-Tushar.


On Sat, Feb 27, 2016 at 12:24 AM, Thomas Weise <th...@datatorrent.com>
wrote:

> Tushar, any update on this?
>
>
> On Sun, Jan 24, 2016 at 9:19 PM, Thomas Weise <th...@datatorrent.com>
> wrote:
>
> > Package should be then be org.apache.apex.malhar.lib.wal
> >
> > I would suggest to later put the WindowDataManager there as well.
> Chandni?
> >
> >
> > On Thu, Jan 21, 2016 at 11:04 PM, Tushar Gosavi <tu...@datatorrent.com>
> > wrote:
> >
> >> I will put these utilities under library, as they do not depend on
> >> external
> >> packages. What should be package name, com.datatorrent.library.wal?
> >> or should we switch to apache apex namespace like
> org.apache.apex.lib.wal
> >> ?
> >>
> >> - Tushar.
> >>
> >>
> >> On Thu, Jan 21, 2016 at 10:21 PM, Thomas Weise <th...@datatorrent.com>
> >> wrote:
> >>
> >> > Makes sense. Let's keep the basic WAL infrastructure and patterns on
> >> top of
> >> > it separate.
> >> >
> >> >
> >> > On Thu, Jan 21, 2016 at 8:48 AM, Tushar Gosavi <
> tushar@datatorrent.com>
> >> > wrote:
> >> >
> >> > > Hi Chandni,
> >> > >
> >> > > There is no change in plan, I will be working on basic WAL
> >> > infrastructure.
> >> > > I have not discussed about how to implement WindowDataManager using
> >> WAL,
> >> > > I know that you will be working on it.
> >> > >
> >> > > This mail is for declaring interface for Write Ahead Log and File
> >> System
> >> > > based
> >> > > implementations for Write Ahead Log. The Classes I will be adding
> are
> >> > > - WALReader - interface
> >> > > - WALWriter - interface
> >> > > - FSWALReader - Implementation of WALReader which writes to FS.
> >> > > - FSWALWriter - Implementation of WALWriter which writes to FS.
> >> > > - RollingFSWALReader - Implementation
> >> > > - RollingFSWALWriter - Implementation
> >> > > - WALManager - Interface.
> >> > > - AbstractWALManager - Which is abstracted out functionality of
> >> > WALManager
> >> > > in HDHT.
> >> > >
> >> > > RecoverableOperator is not for idempotency, It is an abstracted out
> >> > pattern
> >> > > for the
> >> > > operator like HDHT. It can added later if it is a common pattern.
> >> > >
> >> > > - Tushar.
> >> > >
> >> > > On Thu, Jan 21, 2016 at 6:07 PM, Chandni Singh <
> >> chandni@datatorrent.com>
> >> > > wrote:
> >> > >
> >> > > > Hi Tushar,
> >> > > > I thought the plan was to change WindowDataManager to use WAL once
> >> WAL
> >> > is
> >> > > > added to Malhar. I don't know when did this plan change.
> >> > > >
> >> > > > There is already a ticket created for it and I was going to work
> on
> >> it
> >> > > once
> >> > > > WAL is moved to Malhar.
> >> > > >
> >> > > > https://datatorrent.atlassian.net/browse/SPOI-7116
> >> > > >
> >> > > > Also a related discussion happened here:
> >> > > >
> >> > > >
> >> > > >
> >> > >
> >> >
> >>
> http://mail-archives.apache.org/mod_mbox/apex-dev/201511.mbox/%3CCAKaBFBma3rtg-UV7H8Ao_cFZ3nR4u8QpKbJmuEggmrtgE2h5Kw@mail.gmail.com%3E
> >> > > >
> >> > > > I don't think we should add another utility which does what
> >> > > > WindowDataManager does because
> >> > > >
> >> > > > 1. Duplicate utilities which try to achieve the same thing.
> >> > > >
> >> > > > 2. A bigger impact on operators that already work with
> >> > WindowDataManager.
> >> > > > This will make them backward incompatible as well.
> >> > > >
> >> > > > 3. IMO creating an abstract RecoverableOperator is not a flexible
> >> > design.
> >> > > > Idempotency entails higher cost. When it is a pluggable component
> of
> >> > the
> >> > > > operator, user has a choice to turn it off by setting NOOP data
> >> manager
> >> > > > when they don't care about idempotency.
> >> > > >
> >> > > > 4. IMO it is always better to enhance/change existing components
> >> > instead
> >> > > of
> >> > > > adding new ones which are used for the same purpose.
> >> > > >
> >> > > > Thanks,
> >> > > >
> >> > > > Chandni
> >> > > > On Jan 21, 2016 3:06 PM, "Tushar Gosavi" <tu...@datatorrent.com>
> >> > wrote:
> >> > > >
> >> > > > > Hi All,
> >> > > > > We am planing to add Utility classes in Malhar for providing
> Write
> >> > > Ahead
> >> > > > > Log capability to the operators.
> >> > > > >
> >> > > > > Motivation:
> >> > > > > Reconstructing state of operator - Some operators keep huge
> state
> >> in
> >> > > > > memory, which causes check-pointing overhead and slows down
> >> > > > > the processing if checkpoints are happening frequently. Such
> >> operator
> >> > > can
> >> > > > > benefit by keeping in memory state as transient and allow
> >> > > > > reconstructing of state from the stored tuples. The operator
> will
> >> > write
> >> > > > > tuples as they are arriving in the WAL and will process them.
> >> > > > > During recovery of operator the tuples from WAL is read back to
> >> > > > reconstruct
> >> > > > > the in-memory state. (The processing of tuples needs to be
> >> > idempotent).
> >> > > > >
> >> > > > > The operator which maintain their state on file system will also
> >> > > benefit
> >> > > > > from this, as they do not have to update persisted state
> >> frequently.
> >> > > They
> >> > > > > could update persistent state after enough data is available in
> >> > memory
> >> > > > and
> >> > > > > apply that data to persistent state. On failure in-memory state
> >> > > > > will be reconstructed from the WAL.
> >> > > > >
> >> > > > > You can think of this functionality similar to the buffer server
> >> only
> >> > > > > persisted on the HDFS, and operator can explicitly manage
> purging.
> >> > > > >
> >> > > > > WAL can also be used to provide implementation for
> >> WindowDataManager,
> >> > > > which
> >> > > > > keeps information about beginWindow, endWindow markers and
> >> > information
> >> > > > > about tuples between them and this information will be used to
> >> replay
> >> > > the
> >> > > > > tuples in same order. Using WAL will result in fewer files as
> >> compare
> >> > > to
> >> > > > > FSWindowDataManager.
> >> > > > >
> >> > > > > General Design
> >> > > > > We will introduce two Interfaces
> >> > > > >
> >> > > > > WALWriter - this will have following methods
> >> > > > >  - append : Append the data at the end of the WAL
> >> > > > >  - getOffet : Return offset which will need to be tracked for
> >> > recovery.
> >> > > > >  - flush    : make sure that data is persisted on the external
> >> > storage.
> >> > > > >
> >> > > > > WALReader - This will provide iterator like interface for
> >> providing
> >> > > > access
> >> > > > > to the WAL.
> >> > > > >  - seek  : seek at a particular offset.
> >> > > > >  - advance : read the entry, returns valid if entry is available
> >> > > > >  - get : get the current entry read by advanced.
> >> > > > >  - getOffset return current offset in the WAL.
> >> > > > >
> >> > > > > The following implementation will be provided which works with
> DFS
> >> > > > > FileSystems. These
> >> > > > > classes will take a serializer for converting data to byte array
> >> > before
> >> > > > > writing and
> >> > > > > converting object from byte array while reading.
> >> > > > >
> >> > > > > - FSWALReader implements WALReader
> >> > > > > - FSWALWriter implements WALWriter
> >> > > > >
> >> > > > >
> >> > > > > RollingFSWalReader, RollingFSWALWriter this implementation will
> >> > support
> >> > > > > rolling files based on
> >> > > > > size of the log. These will internally use FSWALReader and
> >> > FSWALWriter
> >> > > > for
> >> > > > > writing log segments.
> >> > > > >
> >> > > > > WALManager - This interface will provide following method
> >> > > > >  - setup() setup WAL implementation and serializer to use.
> >> > > > >  - runRecovery(Recoverable obj) where recoverable is also an
> >> > interface
> >> > > > > having just one method recovere(tuple) to recover the tuple
> which
> >> is
> >> > > read
> >> > > > > from the WAL.
> >> > > > >  - setStart(WALPointer start) set the marker, during recovery
> WAL
> >> > will
> >> > > > > start reading tuples from this offset. operator will call this
> >> method
> >> > > to
> >> > > > > specify that the data before start pointer is not needed.
> >> > > > >
> >> > > > >
> >> > > > > For example the Abstract implementaion of RecoverableOperator
> can
> >> be
> >> > > > > ```java
> >> > > > > public abstract class RecoverableOperator<T> extends
> BaseOperator
> >> > > > >   implements WAL.Recoverable<T>, Operator.CheckpointListener
> >> > > > > {
> >> > > > >   private WalManager<T> wm;
> >> > > > >
> >> > > > >   public transient DefaultInputPort<T> input = new
> >> > > DefaultInputPort<T>()
> >> > > > >   {
> >> > > > >     @Override
> >> > > > >     public void process(T t)
> >> > > > >     {
> >> > > > >       processTuple(t, false);
> >> > > > >     }
> >> > > > >   };
> >> > > > >
> >> > > > >   public void setup(Context.OperatorContext context)
> >> > > > >   {
> >> > > > >     wm.setup(context, new Serializer<T>());
> >> > > > >     /* build any in-memory state */
> >> > > > >     wm.runRecovery(this);
> >> > > > >   }
> >> > > > >
> >> > > > >   void processTuple(T tuple, boolean recovery) throws
> IOException
> >> > > > >   {
> >> > > > >     // if this is called as part of normal processing, write it
> to
> >> > the
> >> > > > WAL.
> >> > > > >     // in case of recovery, don't write tuple again to the WAL.
> >> > > > >     if (!recovery)
> >> > > > >       wm.write(tuple);
> >> > > > >     processTuple(tuple);
> >> > > > >   }
> >> > > > >
> >> > > > >   public void recoveryTuple(T tuple)
> >> > > > >   {
> >> > > > >     processTuple(tuple, true);
> >> > > > >   }
> >> > > > >
> >> > > > >   public void committed(int id) {
> >> > > > >      // update the pointer if before pointer is not needed.
> >> > > > >      wm.setStart(pointer);
> >> > > > >   }
> >> > > > >
> >> > > > >   protected abstract void processTuple(T tuple);
> >> > > > > }
> >> > > > > ```
> >> > > > >
> >> > > > > Let me know about your thought.
> >> > > > >
> >> > > > >  -Tushar.
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> >
> >
>