You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@metron.apache.org by Ryan Merriman <me...@gmail.com> on 2019/01/18 18:54:39 UTC

[DISCUSS] Writer class refactor

Recently there was a bug reported by a user where a parser that emits
multiple messages from a single tuple doesn't work correctly:
https://issues.apache.org/jira/browse/METRON-1968.  This has exposed a
problem with how the writer classes work.

The fundamental issue is this:  the writer classes operate under the
assumption that there is a 1 to 1 mapping between tuples and messages to be
written.  A couple of examples:

KafkaWriter
<https://github.com/apache/metron/blob/master/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java#L236>
-
This class writes messages by iterating through the list of tuples and
fetching the message with the same index.  This is the cause of the Jira
above.  We could iterate through the message list instead but then we don't
know which tuples have been fully processed.  It would be possible for a
batch to be flushed before all messages from a tuple are passed to the
writer.

BulkWriterComponent
<https://github.com/apache/metron/blob/master/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java#L250>
- The tuple list size is used to determine when a batch should be flushed.
While inherently incorrect in my opinion (should be message list size),
this also causes an issue where only the first message from the last tuple
in a batch is written.

I do not believe there are easy fixes to these problems.  There is no way
to properly store the relationship between tuples and messages to be
written with the current BulkMessageWriter interface and BulkWriterResponse
class.  If we did have a way, how should we handle partial failures?  If
multiple messages are parsed from a tuple but only half of them are written
successfully, what should happen?  Should we replay the tuple?  Should we
just report the failed messages and continue on?  I think it may be a good
time to review our writer classes and consider a refactor.  Do others
agree?  Are there easy fixes I'm missing?

Assuming there is interest in refactoring, I will throw out some ideas for
consideration.  For those not as familiar with the writer classes, they are
organized as follows (in order from lowest to highest level):

Writers - These classes do the actual writing and implement the
BulkMessageWriter or MessageWriter interfaces.  There are 6 implementations
I can see including KafkaWriter, SolrWriter, ElasticsearchWriter,
HdfsWriter, etc.  There is also an implementation that adapts a
MessageWriter to a BulkMessageWriter (WriterToBulkWriter).  The result of a
writing operation is a BulkWriterResponse containing a list of either
successful or failed tuples.

Writer Containers - This includes the BulkWriterComponent and WriterHandler
classes.  These are responsible for batching and flushing messages,
handling errors and acking tuples.

Bolts - This includes ParserBolt, WriterBolt and BulkMessageWriterBolt.
These classes implement the Storm Bolt interfaces, setup writers/components
and execute tuples.

I think the first step is to reevaluate the separation of concerns for
these classes.  Here is how I would change from what we currently have:

Writers - These classes should only be concerned with writing messages and
reporting what happened.  They would also manage the lifecycle and
configuration of the underlying client libraries as they do now.  Instead
of accepting 2 separate lists, they should accept a data structure that
accurately represents the relationship between tuples and messages.

Writer Containers - These classes would continue to handling batching and
flushing but would only report the results of a flush rather than actually
doing the acking or error handling.

Bolts - These would now be responsible for acking and error reporting on
tuples.  They would transform a tuple into something the Writer Containers
can accept as input.

I think working through this and adjusting the contracts between the
different layers will be necessary to fix the bugs described above.  While
we're at it I think there are other improvements we could also make:

Decouple Storm - It would be beneficial to remove the dependency on tuples
in our writers and writer containers.  We could replace this with a simple
abstraction (an id would probably work fine).  This will allow us to more
easily port Metron to other streaming platforms.

Remove MessageWriter Interface - This is not being actively used as far as
I can tell.  Is that true?  Removing this will make our code simpler and
easier to follow (WriterHandler and WriterToBulkWriter classes can probably
go away).  I don't see any reason future writers, even those without bulk
writing capabilities, could not fit into the BulkMessageWriter interface.
A writer could either iterate through messages and write one at a time or
throw an exception.  As far as I know, the writer interfaces are not
something we advertise as extension points.  Is that true?

Consolidate our BulkMessageWriterBolt and WriterBolt classes - Is there any
reason we need both?

I'll add another item to the list that I consider absolutely necessary:  we
need better tests.  None of our integration tests or unit tests catch these
bugs.

This is a complex issue and there is a lot of information to process.  I
realize there are upgrade complications that may come with some of these
and probably other things I haven't thought of.  I will pause here and wait
for feedback or provide more clarification if needed.  In summary, here is
the feedback I'm requesting:

   - We have a problem with our writers.  Is there an easy fix or should we
   consider a broader refactor?
   - How should partial failures be handled when multiple messages are
   produced from a single tuple?  This could be tricky because we might not
   know there were failures until after some messages have already been
   written.
   - If we do decide to reevaluate our writer classes, what should the
   separate of concerns be?
   - Do we want to include other changes that may be optional but could
   improve our code?  Some of these may even make the refactor easier.

If someone does have an easy fix, we can work through that next.  Otherwise
we can go further into details and work on designing how the interfaces
should look after we make some high level decisions.  From there I think
we'll have a clear picture of how a refactor would look.  Thanks in advance
for your input.

Ryan

Re: [DISCUSS] Writer class refactor

Posted by Ryan Merriman <me...@gmail.com>.
Thanks Mike, very helpful to have all that context.  I'm in agreement with
everything you've said.  Accepting duplicates may be a tradeoff we accept
to keep performance high.

Your comments are centered around Kafka but how does this apply to other
writers?  Since we're now handling multiple messages that come from a
single tuple, how should we handle partial failures?  The flush() method on
the Kafka writer waits until all messages have been written so we don't
have to worry about partial failures/successes there.  What about the ES
writer?  The way it's implemented now it returns a status of which messages
were successfully written and which messages were not.  Is it possible to
make a bulk write with the ES client an atomic operation?  If not I think
we'll have to accept duplicates (if we're not already).  I think this is an
issue in general for any writer we may implement and we need to be clear
about how messages are acked in this case.

I agree with your suggestion of using Map<Tuple, List<JSONObject>> but have
a small change I would like to propose.  Instead of Tuple can we use a
transaction id (String type)?  I would prefer to see Storm dependencies be
moved up to the bolts.  Maintaining a relationship of Tuples to transaction
ids there should be trivial.

On Fri, Jan 18, 2019 at 5:25 PM Zeolla@GMail.com <ze...@gmail.com> wrote:

> Totally on board with everybody's comments above this point.
>
> Jon
>
> On Fri, Jan 18, 2019, 6:07 PM Michael Miklavcic <
> michael.miklavcic@gmail.com> wrote:
>
>> Thanks for the write up, Ryan. I had to touch on some of this when
>> refactoring the kafka writer away from the async model so we could
>> guarantee delivery. We had potential to drop messages before that change
>> because of the async producer calls, which would ack the Storm tuple as
>> soon as the writer returned.
>>
>>    - https://github.com/apache/metron/pull/1045
>>
>> We'll want to talk about these fixes/updates in context of our message
>> delivery semantics, both in Storm and Kafka. As it currently stands, we do
>> NOT use Storm Trident, which means we have at-least-once message
>> processing
>> in Storm. There is an inherent possibility that we will publish duplicate
>> messages in some instances. From a Kafka perspective, we have the same
>> issue. As of Kafka 0.11.0, they provide a way to get exactly-once
>> semantics, but I'm not sure we've done much to explicitly achieve that.
>>
>>    - https://kafka.apache.org/10/documentation.html#semantics
>>
>> From a Kafka delivery guarantee perspective, it appears we're currently
>> setting # required acks to 1 by default. This means we get commit
>> confirmation as soon as the leader has written the message to its local
>> log. In this case should the leader fail immediately after acknowledging
>> the record but before the followers have replicated it then the record
>> will
>> be lost. We could investigate settings acks=all or acks=-1, but this would
>> be a tradeoff in performance for us.
>>
>>    -
>>
>> https://github.com/apache/metron/blob/341960b91f8fe742d5cf947633b7edd2275587d5/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java#L87
>>    - https://kafka.apache.org/10/documentation/#producerconfigs
>>
>> Per the KafkaProducer documentation, the flush() command will wait until
>> all messages are batched and sent, and will return with either success
>> (acked) or an error. "A request is considered completed when it is
>> successfully acknowledged according to the acks configuration you have
>> specified or else it results in an error."
>>
>>    -
>>
>> https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html
>>
>> With this combination of factors, I believe we can continue to guarantee
>> at-least-once semantics in the writer, regardless of batch size. To your
>> point about not passing 2 separate lists, I suggest that we modify the API
>> by passing in something like Map<Tuple, List<JSONObject>> so that the
>> tuples always get acked with respect to their messages. This way we can
>> avoid the tuple-message batch boundary problem by ensuring we only ack a
>> tuple when all associated messages are successfully written to Kafka.
>>
>> Best,
>> Mike
>>
>>
>> On Fri, Jan 18, 2019 at 1:31 PM Otto Fowler <ot...@gmail.com>
>> wrote:
>>
>> > Agreed
>> >
>> >
>> > On January 18, 2019 at 14:52:32, Ryan Merriman (merrimanr@gmail.com)
>> > wrote:
>> >
>> > I am on board with that. In that case, I think it's even more important
>> > that we get the Writer interfaces right.
>> >
>> > On Fri, Jan 18, 2019 at 1:34 PM Otto Fowler <ot...@gmail.com>
>> > wrote:
>> >
>> > > I think that the writers should be loaded as, and act as extension
>> > points,
>> > > such that it is possible to have 3rd party writers, and would
>> structure
>> > > them as such.
>> > >
>> > >
>> > >
>> > > On January 18, 2019 at 13:55:00, Ryan Merriman (merrimanr@gmail.com)
>> > > wrote:
>> > >
>> > > Recently there was a bug reported by a user where a parser that emits
>> > > multiple messages from a single tuple doesn't work correctly:
>> > > https://issues.apache.org/jira/browse/METRON-1968. This has exposed a
>> > > problem with how the writer classes work.
>> > >
>> > > The fundamental issue is this: the writer classes operate under the
>> > > assumption that there is a 1 to 1 mapping between tuples and messages
>> to
>> > > be
>> > > written. A couple of examples:
>> > >
>> > > KafkaWriter
>> > > <
>> > >
>> >
>> >
>> https://github.com/apache/metron/blob/master/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java#L236
>> > >
>> >
>> > >
>> > > -
>> > > This class writes messages by iterating through the list of tuples and
>> > > fetching the message with the same index. This is the cause of the
>> Jira
>> > > above. We could iterate through the message list instead but then we
>> > don't
>> > > know which tuples have been fully processed. It would be possible for
>> a
>> > > batch to be flushed before all messages from a tuple are passed to the
>> > > writer.
>> > >
>> > > BulkWriterComponent
>> > > <
>> > >
>> >
>> >
>> https://github.com/apache/metron/blob/master/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java#L250
>> > >
>> >
>> > >
>> > > - The tuple list size is used to determine when a batch should be
>> > flushed.
>> > > While inherently incorrect in my opinion (should be message list
>> size),
>> > > this also causes an issue where only the first message from the last
>> > tuple
>> > > in a batch is written.
>> > >
>> > > I do not believe there are easy fixes to these problems. There is no
>> way
>> > > to properly store the relationship between tuples and messages to be
>> > > written with the current BulkMessageWriter interface and
>> > > BulkWriterResponse
>> > > class. If we did have a way, how should we handle partial failures? If
>> > > multiple messages are parsed from a tuple but only half of them are
>> > > written
>> > > successfully, what should happen? Should we replay the tuple? Should
>> we
>> > > just report the failed messages and continue on? I think it may be a
>> good
>> > > time to review our writer classes and consider a refactor. Do others
>> > > agree? Are there easy fixes I'm missing?
>> > >
>> > > Assuming there is interest in refactoring, I will throw out some ideas
>> > for
>> > > consideration. For those not as familiar with the writer classes, they
>> > are
>> > > organized as follows (in order from lowest to highest level):
>> > >
>> > > Writers - These classes do the actual writing and implement the
>> > > BulkMessageWriter or MessageWriter interfaces. There are 6
>> > implementations
>> > > I can see including KafkaWriter, SolrWriter, ElasticsearchWriter,
>> > > HdfsWriter, etc. There is also an implementation that adapts a
>> > > MessageWriter to a BulkMessageWriter (WriterToBulkWriter). The result
>> of
>> > a
>> > > writing operation is a BulkWriterResponse containing a list of either
>> > > successful or failed tuples.
>> > >
>> > > Writer Containers - This includes the BulkWriterComponent and
>> > > WriterHandler
>> > > classes. These are responsible for batching and flushing messages,
>> > > handling errors and acking tuples.
>> > >
>> > > Bolts - This includes ParserBolt, WriterBolt and
>> BulkMessageWriterBolt.
>> > > These classes implement the Storm Bolt interfaces, setup
>> > > writers/components
>> > > and execute tuples.
>> > >
>> > > I think the first step is to reevaluate the separation of concerns for
>> > > these classes. Here is how I would change from what we currently have:
>> > >
>> > > Writers - These classes should only be concerned with writing messages
>> > and
>> > > reporting what happened. They would also manage the lifecycle and
>> > > configuration of the underlying client libraries as they do now.
>> Instead
>> > > of accepting 2 separate lists, they should accept a data structure
>> that
>> > > accurately represents the relationship between tuples and messages.
>> > >
>> > > Writer Containers - These classes would continue to handling batching
>> and
>> > > flushing but would only report the results of a flush rather than
>> > actually
>> > > doing the acking or error handling.
>> > >
>> > > Bolts - These would now be responsible for acking and error reporting
>> on
>> > > tuples. They would transform a tuple into something the Writer
>> Containers
>> > > can accept as input.
>> > >
>> > > I think working through this and adjusting the contracts between the
>> > > different layers will be necessary to fix the bugs described above.
>> While
>> > > we're at it I think there are other improvements we could also make:
>> > >
>> > > Decouple Storm - It would be beneficial to remove the dependency on
>> > tuples
>> > > in our writers and writer containers. We could replace this with a
>> simple
>> > > abstraction (an id would probably work fine). This will allow us to
>> more
>> > > easily port Metron to other streaming platforms.
>> > >
>> > > Remove MessageWriter Interface - This is not being actively used as
>> far
>> > as
>> > > I can tell. Is that true? Removing this will make our code simpler and
>> > > easier to follow (WriterHandler and WriterToBulkWriter classes can
>> > > probably
>> > > go away). I don't see any reason future writers, even those without
>> bulk
>> > > writing capabilities, could not fit into the BulkMessageWriter
>> interface.
>> > > A writer could either iterate through messages and write one at a
>> time or
>> > > throw an exception. As far as I know, the writer interfaces are not
>> > > something we advertise as extension points. Is that true?
>> > >
>> > > Consolidate our BulkMessageWriterBolt and WriterBolt classes - Is
>> there
>> > > any
>> > > reason we need both?
>> > >
>> > > I'll add another item to the list that I consider absolutely
>> necessary:
>> > we
>> > > need better tests. None of our integration tests or unit tests catch
>> > these
>> > > bugs.
>> > >
>> > > This is a complex issue and there is a lot of information to process.
>> I
>> > > realize there are upgrade complications that may come with some of
>> these
>> > > and probably other things I haven't thought of. I will pause here and
>> > wait
>> > > for feedback or provide more clarification if needed. In summary,
>> here is
>> > > the feedback I'm requesting:
>> > >
>> > > - We have a problem with our writers. Is there an easy fix or should
>> we
>> > > consider a broader refactor?
>> > > - How should partial failures be handled when multiple messages are
>> > > produced from a single tuple? This could be tricky because we might
>> not
>> > > know there were failures until after some messages have already been
>> > > written.
>> > > - If we do decide to reevaluate our writer classes, what should the
>> > > separate of concerns be?
>> > > - Do we want to include other changes that may be optional but could
>> > > improve our code? Some of these may even make the refactor easier.
>> > >
>> > > If someone does have an easy fix, we can work through that next.
>> > Otherwise
>> > > we can go further into details and work on designing how the
>> interfaces
>> > > should look after we make some high level decisions. From there I
>> think
>> > > we'll have a clear picture of how a refactor would look. Thanks in
>> > advance
>> > > for your input.
>> > >
>> > > Ryan
>> > >
>> > >
>> >
>>
> --
>
> Jon Zeolla
>

Re: [DISCUSS] Writer class refactor

Posted by "Zeolla@GMail.com" <ze...@gmail.com>.
Totally on board with everybody's comments above this point.

Jon

On Fri, Jan 18, 2019, 6:07 PM Michael Miklavcic <mi...@gmail.com>
wrote:

> Thanks for the write up, Ryan. I had to touch on some of this when
> refactoring the kafka writer away from the async model so we could
> guarantee delivery. We had potential to drop messages before that change
> because of the async producer calls, which would ack the Storm tuple as
> soon as the writer returned.
>
>    - https://github.com/apache/metron/pull/1045
>
> We'll want to talk about these fixes/updates in context of our message
> delivery semantics, both in Storm and Kafka. As it currently stands, we do
> NOT use Storm Trident, which means we have at-least-once message processing
> in Storm. There is an inherent possibility that we will publish duplicate
> messages in some instances. From a Kafka perspective, we have the same
> issue. As of Kafka 0.11.0, they provide a way to get exactly-once
> semantics, but I'm not sure we've done much to explicitly achieve that.
>
>    - https://kafka.apache.org/10/documentation.html#semantics
>
> From a Kafka delivery guarantee perspective, it appears we're currently
> setting # required acks to 1 by default. This means we get commit
> confirmation as soon as the leader has written the message to its local
> log. In this case should the leader fail immediately after acknowledging
> the record but before the followers have replicated it then the record will
> be lost. We could investigate settings acks=all or acks=-1, but this would
> be a tradeoff in performance for us.
>
>    -
>
> https://github.com/apache/metron/blob/341960b91f8fe742d5cf947633b7edd2275587d5/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java#L87
>    - https://kafka.apache.org/10/documentation/#producerconfigs
>
> Per the KafkaProducer documentation, the flush() command will wait until
> all messages are batched and sent, and will return with either success
> (acked) or an error. "A request is considered completed when it is
> successfully acknowledged according to the acks configuration you have
> specified or else it results in an error."
>
>    -
>
> https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html
>
> With this combination of factors, I believe we can continue to guarantee
> at-least-once semantics in the writer, regardless of batch size. To your
> point about not passing 2 separate lists, I suggest that we modify the API
> by passing in something like Map<Tuple, List<JSONObject>> so that the
> tuples always get acked with respect to their messages. This way we can
> avoid the tuple-message batch boundary problem by ensuring we only ack a
> tuple when all associated messages are successfully written to Kafka.
>
> Best,
> Mike
>
>
> On Fri, Jan 18, 2019 at 1:31 PM Otto Fowler <ot...@gmail.com>
> wrote:
>
> > Agreed
> >
> >
> > On January 18, 2019 at 14:52:32, Ryan Merriman (merrimanr@gmail.com)
> > wrote:
> >
> > I am on board with that. In that case, I think it's even more important
> > that we get the Writer interfaces right.
> >
> > On Fri, Jan 18, 2019 at 1:34 PM Otto Fowler <ot...@gmail.com>
> > wrote:
> >
> > > I think that the writers should be loaded as, and act as extension
> > points,
> > > such that it is possible to have 3rd party writers, and would structure
> > > them as such.
> > >
> > >
> > >
> > > On January 18, 2019 at 13:55:00, Ryan Merriman (merrimanr@gmail.com)
> > > wrote:
> > >
> > > Recently there was a bug reported by a user where a parser that emits
> > > multiple messages from a single tuple doesn't work correctly:
> > > https://issues.apache.org/jira/browse/METRON-1968. This has exposed a
> > > problem with how the writer classes work.
> > >
> > > The fundamental issue is this: the writer classes operate under the
> > > assumption that there is a 1 to 1 mapping between tuples and messages
> to
> > > be
> > > written. A couple of examples:
> > >
> > > KafkaWriter
> > > <
> > >
> >
> >
> https://github.com/apache/metron/blob/master/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java#L236
> > >
> >
> > >
> > > -
> > > This class writes messages by iterating through the list of tuples and
> > > fetching the message with the same index. This is the cause of the Jira
> > > above. We could iterate through the message list instead but then we
> > don't
> > > know which tuples have been fully processed. It would be possible for a
> > > batch to be flushed before all messages from a tuple are passed to the
> > > writer.
> > >
> > > BulkWriterComponent
> > > <
> > >
> >
> >
> https://github.com/apache/metron/blob/master/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java#L250
> > >
> >
> > >
> > > - The tuple list size is used to determine when a batch should be
> > flushed.
> > > While inherently incorrect in my opinion (should be message list size),
> > > this also causes an issue where only the first message from the last
> > tuple
> > > in a batch is written.
> > >
> > > I do not believe there are easy fixes to these problems. There is no
> way
> > > to properly store the relationship between tuples and messages to be
> > > written with the current BulkMessageWriter interface and
> > > BulkWriterResponse
> > > class. If we did have a way, how should we handle partial failures? If
> > > multiple messages are parsed from a tuple but only half of them are
> > > written
> > > successfully, what should happen? Should we replay the tuple? Should we
> > > just report the failed messages and continue on? I think it may be a
> good
> > > time to review our writer classes and consider a refactor. Do others
> > > agree? Are there easy fixes I'm missing?
> > >
> > > Assuming there is interest in refactoring, I will throw out some ideas
> > for
> > > consideration. For those not as familiar with the writer classes, they
> > are
> > > organized as follows (in order from lowest to highest level):
> > >
> > > Writers - These classes do the actual writing and implement the
> > > BulkMessageWriter or MessageWriter interfaces. There are 6
> > implementations
> > > I can see including KafkaWriter, SolrWriter, ElasticsearchWriter,
> > > HdfsWriter, etc. There is also an implementation that adapts a
> > > MessageWriter to a BulkMessageWriter (WriterToBulkWriter). The result
> of
> > a
> > > writing operation is a BulkWriterResponse containing a list of either
> > > successful or failed tuples.
> > >
> > > Writer Containers - This includes the BulkWriterComponent and
> > > WriterHandler
> > > classes. These are responsible for batching and flushing messages,
> > > handling errors and acking tuples.
> > >
> > > Bolts - This includes ParserBolt, WriterBolt and BulkMessageWriterBolt.
> > > These classes implement the Storm Bolt interfaces, setup
> > > writers/components
> > > and execute tuples.
> > >
> > > I think the first step is to reevaluate the separation of concerns for
> > > these classes. Here is how I would change from what we currently have:
> > >
> > > Writers - These classes should only be concerned with writing messages
> > and
> > > reporting what happened. They would also manage the lifecycle and
> > > configuration of the underlying client libraries as they do now.
> Instead
> > > of accepting 2 separate lists, they should accept a data structure that
> > > accurately represents the relationship between tuples and messages.
> > >
> > > Writer Containers - These classes would continue to handling batching
> and
> > > flushing but would only report the results of a flush rather than
> > actually
> > > doing the acking or error handling.
> > >
> > > Bolts - These would now be responsible for acking and error reporting
> on
> > > tuples. They would transform a tuple into something the Writer
> Containers
> > > can accept as input.
> > >
> > > I think working through this and adjusting the contracts between the
> > > different layers will be necessary to fix the bugs described above.
> While
> > > we're at it I think there are other improvements we could also make:
> > >
> > > Decouple Storm - It would be beneficial to remove the dependency on
> > tuples
> > > in our writers and writer containers. We could replace this with a
> simple
> > > abstraction (an id would probably work fine). This will allow us to
> more
> > > easily port Metron to other streaming platforms.
> > >
> > > Remove MessageWriter Interface - This is not being actively used as far
> > as
> > > I can tell. Is that true? Removing this will make our code simpler and
> > > easier to follow (WriterHandler and WriterToBulkWriter classes can
> > > probably
> > > go away). I don't see any reason future writers, even those without
> bulk
> > > writing capabilities, could not fit into the BulkMessageWriter
> interface.
> > > A writer could either iterate through messages and write one at a time
> or
> > > throw an exception. As far as I know, the writer interfaces are not
> > > something we advertise as extension points. Is that true?
> > >
> > > Consolidate our BulkMessageWriterBolt and WriterBolt classes - Is there
> > > any
> > > reason we need both?
> > >
> > > I'll add another item to the list that I consider absolutely necessary:
> > we
> > > need better tests. None of our integration tests or unit tests catch
> > these
> > > bugs.
> > >
> > > This is a complex issue and there is a lot of information to process. I
> > > realize there are upgrade complications that may come with some of
> these
> > > and probably other things I haven't thought of. I will pause here and
> > wait
> > > for feedback or provide more clarification if needed. In summary, here
> is
> > > the feedback I'm requesting:
> > >
> > > - We have a problem with our writers. Is there an easy fix or should we
> > > consider a broader refactor?
> > > - How should partial failures be handled when multiple messages are
> > > produced from a single tuple? This could be tricky because we might not
> > > know there were failures until after some messages have already been
> > > written.
> > > - If we do decide to reevaluate our writer classes, what should the
> > > separate of concerns be?
> > > - Do we want to include other changes that may be optional but could
> > > improve our code? Some of these may even make the refactor easier.
> > >
> > > If someone does have an easy fix, we can work through that next.
> > Otherwise
> > > we can go further into details and work on designing how the interfaces
> > > should look after we make some high level decisions. From there I think
> > > we'll have a clear picture of how a refactor would look. Thanks in
> > advance
> > > for your input.
> > >
> > > Ryan
> > >
> > >
> >
>
-- 

Jon Zeolla

Re: [DISCUSS] Writer class refactor

Posted by Michael Miklavcic <mi...@gmail.com>.
Thanks for the write up, Ryan. I had to touch on some of this when
refactoring the kafka writer away from the async model so we could
guarantee delivery. We had potential to drop messages before that change
because of the async producer calls, which would ack the Storm tuple as
soon as the writer returned.

   - https://github.com/apache/metron/pull/1045

We'll want to talk about these fixes/updates in context of our message
delivery semantics, both in Storm and Kafka. As it currently stands, we do
NOT use Storm Trident, which means we have at-least-once message processing
in Storm. There is an inherent possibility that we will publish duplicate
messages in some instances. From a Kafka perspective, we have the same
issue. As of Kafka 0.11.0, they provide a way to get exactly-once
semantics, but I'm not sure we've done much to explicitly achieve that.

   - https://kafka.apache.org/10/documentation.html#semantics

From a Kafka delivery guarantee perspective, it appears we're currently
setting # required acks to 1 by default. This means we get commit
confirmation as soon as the leader has written the message to its local
log. In this case should the leader fail immediately after acknowledging
the record but before the followers have replicated it then the record will
be lost. We could investigate settings acks=all or acks=-1, but this would
be a tradeoff in performance for us.

   -
   https://github.com/apache/metron/blob/341960b91f8fe742d5cf947633b7edd2275587d5/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java#L87
   - https://kafka.apache.org/10/documentation/#producerconfigs

Per the KafkaProducer documentation, the flush() command will wait until
all messages are batched and sent, and will return with either success
(acked) or an error. "A request is considered completed when it is
successfully acknowledged according to the acks configuration you have
specified or else it results in an error."

   -
   https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html

With this combination of factors, I believe we can continue to guarantee
at-least-once semantics in the writer, regardless of batch size. To your
point about not passing 2 separate lists, I suggest that we modify the API
by passing in something like Map<Tuple, List<JSONObject>> so that the
tuples always get acked with respect to their messages. This way we can
avoid the tuple-message batch boundary problem by ensuring we only ack a
tuple when all associated messages are successfully written to Kafka.

Best,
Mike


On Fri, Jan 18, 2019 at 1:31 PM Otto Fowler <ot...@gmail.com> wrote:

> Agreed
>
>
> On January 18, 2019 at 14:52:32, Ryan Merriman (merrimanr@gmail.com)
> wrote:
>
> I am on board with that. In that case, I think it's even more important
> that we get the Writer interfaces right.
>
> On Fri, Jan 18, 2019 at 1:34 PM Otto Fowler <ot...@gmail.com>
> wrote:
>
> > I think that the writers should be loaded as, and act as extension
> points,
> > such that it is possible to have 3rd party writers, and would structure
> > them as such.
> >
> >
> >
> > On January 18, 2019 at 13:55:00, Ryan Merriman (merrimanr@gmail.com)
> > wrote:
> >
> > Recently there was a bug reported by a user where a parser that emits
> > multiple messages from a single tuple doesn't work correctly:
> > https://issues.apache.org/jira/browse/METRON-1968. This has exposed a
> > problem with how the writer classes work.
> >
> > The fundamental issue is this: the writer classes operate under the
> > assumption that there is a 1 to 1 mapping between tuples and messages to
> > be
> > written. A couple of examples:
> >
> > KafkaWriter
> > <
> >
>
> https://github.com/apache/metron/blob/master/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java#L236
> >
>
> >
> > -
> > This class writes messages by iterating through the list of tuples and
> > fetching the message with the same index. This is the cause of the Jira
> > above. We could iterate through the message list instead but then we
> don't
> > know which tuples have been fully processed. It would be possible for a
> > batch to be flushed before all messages from a tuple are passed to the
> > writer.
> >
> > BulkWriterComponent
> > <
> >
>
> https://github.com/apache/metron/blob/master/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java#L250
> >
>
> >
> > - The tuple list size is used to determine when a batch should be
> flushed.
> > While inherently incorrect in my opinion (should be message list size),
> > this also causes an issue where only the first message from the last
> tuple
> > in a batch is written.
> >
> > I do not believe there are easy fixes to these problems. There is no way
> > to properly store the relationship between tuples and messages to be
> > written with the current BulkMessageWriter interface and
> > BulkWriterResponse
> > class. If we did have a way, how should we handle partial failures? If
> > multiple messages are parsed from a tuple but only half of them are
> > written
> > successfully, what should happen? Should we replay the tuple? Should we
> > just report the failed messages and continue on? I think it may be a good
> > time to review our writer classes and consider a refactor. Do others
> > agree? Are there easy fixes I'm missing?
> >
> > Assuming there is interest in refactoring, I will throw out some ideas
> for
> > consideration. For those not as familiar with the writer classes, they
> are
> > organized as follows (in order from lowest to highest level):
> >
> > Writers - These classes do the actual writing and implement the
> > BulkMessageWriter or MessageWriter interfaces. There are 6
> implementations
> > I can see including KafkaWriter, SolrWriter, ElasticsearchWriter,
> > HdfsWriter, etc. There is also an implementation that adapts a
> > MessageWriter to a BulkMessageWriter (WriterToBulkWriter). The result of
> a
> > writing operation is a BulkWriterResponse containing a list of either
> > successful or failed tuples.
> >
> > Writer Containers - This includes the BulkWriterComponent and
> > WriterHandler
> > classes. These are responsible for batching and flushing messages,
> > handling errors and acking tuples.
> >
> > Bolts - This includes ParserBolt, WriterBolt and BulkMessageWriterBolt.
> > These classes implement the Storm Bolt interfaces, setup
> > writers/components
> > and execute tuples.
> >
> > I think the first step is to reevaluate the separation of concerns for
> > these classes. Here is how I would change from what we currently have:
> >
> > Writers - These classes should only be concerned with writing messages
> and
> > reporting what happened. They would also manage the lifecycle and
> > configuration of the underlying client libraries as they do now. Instead
> > of accepting 2 separate lists, they should accept a data structure that
> > accurately represents the relationship between tuples and messages.
> >
> > Writer Containers - These classes would continue to handling batching and
> > flushing but would only report the results of a flush rather than
> actually
> > doing the acking or error handling.
> >
> > Bolts - These would now be responsible for acking and error reporting on
> > tuples. They would transform a tuple into something the Writer Containers
> > can accept as input.
> >
> > I think working through this and adjusting the contracts between the
> > different layers will be necessary to fix the bugs described above. While
> > we're at it I think there are other improvements we could also make:
> >
> > Decouple Storm - It would be beneficial to remove the dependency on
> tuples
> > in our writers and writer containers. We could replace this with a simple
> > abstraction (an id would probably work fine). This will allow us to more
> > easily port Metron to other streaming platforms.
> >
> > Remove MessageWriter Interface - This is not being actively used as far
> as
> > I can tell. Is that true? Removing this will make our code simpler and
> > easier to follow (WriterHandler and WriterToBulkWriter classes can
> > probably
> > go away). I don't see any reason future writers, even those without bulk
> > writing capabilities, could not fit into the BulkMessageWriter interface.
> > A writer could either iterate through messages and write one at a time or
> > throw an exception. As far as I know, the writer interfaces are not
> > something we advertise as extension points. Is that true?
> >
> > Consolidate our BulkMessageWriterBolt and WriterBolt classes - Is there
> > any
> > reason we need both?
> >
> > I'll add another item to the list that I consider absolutely necessary:
> we
> > need better tests. None of our integration tests or unit tests catch
> these
> > bugs.
> >
> > This is a complex issue and there is a lot of information to process. I
> > realize there are upgrade complications that may come with some of these
> > and probably other things I haven't thought of. I will pause here and
> wait
> > for feedback or provide more clarification if needed. In summary, here is
> > the feedback I'm requesting:
> >
> > - We have a problem with our writers. Is there an easy fix or should we
> > consider a broader refactor?
> > - How should partial failures be handled when multiple messages are
> > produced from a single tuple? This could be tricky because we might not
> > know there were failures until after some messages have already been
> > written.
> > - If we do decide to reevaluate our writer classes, what should the
> > separate of concerns be?
> > - Do we want to include other changes that may be optional but could
> > improve our code? Some of these may even make the refactor easier.
> >
> > If someone does have an easy fix, we can work through that next.
> Otherwise
> > we can go further into details and work on designing how the interfaces
> > should look after we make some high level decisions. From there I think
> > we'll have a clear picture of how a refactor would look. Thanks in
> advance
> > for your input.
> >
> > Ryan
> >
> >
>

Re: [DISCUSS] Writer class refactor

Posted by Otto Fowler <ot...@gmail.com>.
Agreed


On January 18, 2019 at 14:52:32, Ryan Merriman (merrimanr@gmail.com) wrote:

I am on board with that. In that case, I think it's even more important
that we get the Writer interfaces right.

On Fri, Jan 18, 2019 at 1:34 PM Otto Fowler <ot...@gmail.com>
wrote:

> I think that the writers should be loaded as, and act as extension
points,
> such that it is possible to have 3rd party writers, and would structure
> them as such.
>
>
>
> On January 18, 2019 at 13:55:00, Ryan Merriman (merrimanr@gmail.com)
> wrote:
>
> Recently there was a bug reported by a user where a parser that emits
> multiple messages from a single tuple doesn't work correctly:
> https://issues.apache.org/jira/browse/METRON-1968. This has exposed a
> problem with how the writer classes work.
>
> The fundamental issue is this: the writer classes operate under the
> assumption that there is a 1 to 1 mapping between tuples and messages to
> be
> written. A couple of examples:
>
> KafkaWriter
> <
>
https://github.com/apache/metron/blob/master/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java#L236>

>
> -
> This class writes messages by iterating through the list of tuples and
> fetching the message with the same index. This is the cause of the Jira
> above. We could iterate through the message list instead but then we
don't
> know which tuples have been fully processed. It would be possible for a
> batch to be flushed before all messages from a tuple are passed to the
> writer.
>
> BulkWriterComponent
> <
>
https://github.com/apache/metron/blob/master/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java#L250>

>
> - The tuple list size is used to determine when a batch should be
flushed.
> While inherently incorrect in my opinion (should be message list size),
> this also causes an issue where only the first message from the last
tuple
> in a batch is written.
>
> I do not believe there are easy fixes to these problems. There is no way
> to properly store the relationship between tuples and messages to be
> written with the current BulkMessageWriter interface and
> BulkWriterResponse
> class. If we did have a way, how should we handle partial failures? If
> multiple messages are parsed from a tuple but only half of them are
> written
> successfully, what should happen? Should we replay the tuple? Should we
> just report the failed messages and continue on? I think it may be a good
> time to review our writer classes and consider a refactor. Do others
> agree? Are there easy fixes I'm missing?
>
> Assuming there is interest in refactoring, I will throw out some ideas
for
> consideration. For those not as familiar with the writer classes, they
are
> organized as follows (in order from lowest to highest level):
>
> Writers - These classes do the actual writing and implement the
> BulkMessageWriter or MessageWriter interfaces. There are 6
implementations
> I can see including KafkaWriter, SolrWriter, ElasticsearchWriter,
> HdfsWriter, etc. There is also an implementation that adapts a
> MessageWriter to a BulkMessageWriter (WriterToBulkWriter). The result of
a
> writing operation is a BulkWriterResponse containing a list of either
> successful or failed tuples.
>
> Writer Containers - This includes the BulkWriterComponent and
> WriterHandler
> classes. These are responsible for batching and flushing messages,
> handling errors and acking tuples.
>
> Bolts - This includes ParserBolt, WriterBolt and BulkMessageWriterBolt.
> These classes implement the Storm Bolt interfaces, setup
> writers/components
> and execute tuples.
>
> I think the first step is to reevaluate the separation of concerns for
> these classes. Here is how I would change from what we currently have:
>
> Writers - These classes should only be concerned with writing messages
and
> reporting what happened. They would also manage the lifecycle and
> configuration of the underlying client libraries as they do now. Instead
> of accepting 2 separate lists, they should accept a data structure that
> accurately represents the relationship between tuples and messages.
>
> Writer Containers - These classes would continue to handling batching and
> flushing but would only report the results of a flush rather than
actually
> doing the acking or error handling.
>
> Bolts - These would now be responsible for acking and error reporting on
> tuples. They would transform a tuple into something the Writer Containers
> can accept as input.
>
> I think working through this and adjusting the contracts between the
> different layers will be necessary to fix the bugs described above. While
> we're at it I think there are other improvements we could also make:
>
> Decouple Storm - It would be beneficial to remove the dependency on
tuples
> in our writers and writer containers. We could replace this with a simple
> abstraction (an id would probably work fine). This will allow us to more
> easily port Metron to other streaming platforms.
>
> Remove MessageWriter Interface - This is not being actively used as far
as
> I can tell. Is that true? Removing this will make our code simpler and
> easier to follow (WriterHandler and WriterToBulkWriter classes can
> probably
> go away). I don't see any reason future writers, even those without bulk
> writing capabilities, could not fit into the BulkMessageWriter interface.
> A writer could either iterate through messages and write one at a time or
> throw an exception. As far as I know, the writer interfaces are not
> something we advertise as extension points. Is that true?
>
> Consolidate our BulkMessageWriterBolt and WriterBolt classes - Is there
> any
> reason we need both?
>
> I'll add another item to the list that I consider absolutely necessary:
we
> need better tests. None of our integration tests or unit tests catch
these
> bugs.
>
> This is a complex issue and there is a lot of information to process. I
> realize there are upgrade complications that may come with some of these
> and probably other things I haven't thought of. I will pause here and
wait
> for feedback or provide more clarification if needed. In summary, here is
> the feedback I'm requesting:
>
> - We have a problem with our writers. Is there an easy fix or should we
> consider a broader refactor?
> - How should partial failures be handled when multiple messages are
> produced from a single tuple? This could be tricky because we might not
> know there were failures until after some messages have already been
> written.
> - If we do decide to reevaluate our writer classes, what should the
> separate of concerns be?
> - Do we want to include other changes that may be optional but could
> improve our code? Some of these may even make the refactor easier.
>
> If someone does have an easy fix, we can work through that next.
Otherwise
> we can go further into details and work on designing how the interfaces
> should look after we make some high level decisions. From there I think
> we'll have a clear picture of how a refactor would look. Thanks in
advance
> for your input.
>
> Ryan
>
>

Re: [DISCUSS] Writer class refactor

Posted by Ryan Merriman <me...@gmail.com>.
I am on board with that.  In that case, I think it's even more important
that we get the Writer interfaces right.

On Fri, Jan 18, 2019 at 1:34 PM Otto Fowler <ot...@gmail.com> wrote:

> I think that the writers should be loaded as, and act as extension points,
> such that it is possible to have 3rd party writers, and would structure
> them as such.
>
>
>
> On January 18, 2019 at 13:55:00, Ryan Merriman (merrimanr@gmail.com)
> wrote:
>
> Recently there was a bug reported by a user where a parser that emits
> multiple messages from a single tuple doesn't work correctly:
> https://issues.apache.org/jira/browse/METRON-1968. This has exposed a
> problem with how the writer classes work.
>
> The fundamental issue is this: the writer classes operate under the
> assumption that there is a 1 to 1 mapping between tuples and messages to
> be
> written. A couple of examples:
>
> KafkaWriter
> <
> https://github.com/apache/metron/blob/master/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java#L236>
>
> -
> This class writes messages by iterating through the list of tuples and
> fetching the message with the same index. This is the cause of the Jira
> above. We could iterate through the message list instead but then we don't
> know which tuples have been fully processed. It would be possible for a
> batch to be flushed before all messages from a tuple are passed to the
> writer.
>
> BulkWriterComponent
> <
> https://github.com/apache/metron/blob/master/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java#L250>
>
> - The tuple list size is used to determine when a batch should be flushed.
> While inherently incorrect in my opinion (should be message list size),
> this also causes an issue where only the first message from the last tuple
> in a batch is written.
>
> I do not believe there are easy fixes to these problems. There is no way
> to properly store the relationship between tuples and messages to be
> written with the current BulkMessageWriter interface and
> BulkWriterResponse
> class. If we did have a way, how should we handle partial failures? If
> multiple messages are parsed from a tuple but only half of them are
> written
> successfully, what should happen? Should we replay the tuple? Should we
> just report the failed messages and continue on? I think it may be a good
> time to review our writer classes and consider a refactor. Do others
> agree? Are there easy fixes I'm missing?
>
> Assuming there is interest in refactoring, I will throw out some ideas for
> consideration. For those not as familiar with the writer classes, they are
> organized as follows (in order from lowest to highest level):
>
> Writers - These classes do the actual writing and implement the
> BulkMessageWriter or MessageWriter interfaces. There are 6 implementations
> I can see including KafkaWriter, SolrWriter, ElasticsearchWriter,
> HdfsWriter, etc. There is also an implementation that adapts a
> MessageWriter to a BulkMessageWriter (WriterToBulkWriter). The result of a
> writing operation is a BulkWriterResponse containing a list of either
> successful or failed tuples.
>
> Writer Containers - This includes the BulkWriterComponent and
> WriterHandler
> classes. These are responsible for batching and flushing messages,
> handling errors and acking tuples.
>
> Bolts - This includes ParserBolt, WriterBolt and BulkMessageWriterBolt.
> These classes implement the Storm Bolt interfaces, setup
> writers/components
> and execute tuples.
>
> I think the first step is to reevaluate the separation of concerns for
> these classes. Here is how I would change from what we currently have:
>
> Writers - These classes should only be concerned with writing messages and
> reporting what happened. They would also manage the lifecycle and
> configuration of the underlying client libraries as they do now. Instead
> of accepting 2 separate lists, they should accept a data structure that
> accurately represents the relationship between tuples and messages.
>
> Writer Containers - These classes would continue to handling batching and
> flushing but would only report the results of a flush rather than actually
> doing the acking or error handling.
>
> Bolts - These would now be responsible for acking and error reporting on
> tuples. They would transform a tuple into something the Writer Containers
> can accept as input.
>
> I think working through this and adjusting the contracts between the
> different layers will be necessary to fix the bugs described above. While
> we're at it I think there are other improvements we could also make:
>
> Decouple Storm - It would be beneficial to remove the dependency on tuples
> in our writers and writer containers. We could replace this with a simple
> abstraction (an id would probably work fine). This will allow us to more
> easily port Metron to other streaming platforms.
>
> Remove MessageWriter Interface - This is not being actively used as far as
> I can tell. Is that true? Removing this will make our code simpler and
> easier to follow (WriterHandler and WriterToBulkWriter classes can
> probably
> go away). I don't see any reason future writers, even those without bulk
> writing capabilities, could not fit into the BulkMessageWriter interface.
> A writer could either iterate through messages and write one at a time or
> throw an exception. As far as I know, the writer interfaces are not
> something we advertise as extension points. Is that true?
>
> Consolidate our BulkMessageWriterBolt and WriterBolt classes - Is there
> any
> reason we need both?
>
> I'll add another item to the list that I consider absolutely necessary: we
> need better tests. None of our integration tests or unit tests catch these
> bugs.
>
> This is a complex issue and there is a lot of information to process. I
> realize there are upgrade complications that may come with some of these
> and probably other things I haven't thought of. I will pause here and wait
> for feedback or provide more clarification if needed. In summary, here is
> the feedback I'm requesting:
>
> - We have a problem with our writers. Is there an easy fix or should we
> consider a broader refactor?
> - How should partial failures be handled when multiple messages are
> produced from a single tuple? This could be tricky because we might not
> know there were failures until after some messages have already been
> written.
> - If we do decide to reevaluate our writer classes, what should the
> separate of concerns be?
> - Do we want to include other changes that may be optional but could
> improve our code? Some of these may even make the refactor easier.
>
> If someone does have an easy fix, we can work through that next. Otherwise
> we can go further into details and work on designing how the interfaces
> should look after we make some high level decisions. From there I think
> we'll have a clear picture of how a refactor would look. Thanks in advance
> for your input.
>
> Ryan
>
>

Re: [DISCUSS] Writer class refactor

Posted by Otto Fowler <ot...@gmail.com>.
I think that the writers should be loaded as, and act as extension points,
such that it is possible to have 3rd party writers, and would structure
them as such.



On January 18, 2019 at 13:55:00, Ryan Merriman (merrimanr@gmail.com) wrote:

Recently there was a bug reported by a user where a parser that emits
multiple messages from a single tuple doesn't work correctly:
https://issues.apache.org/jira/browse/METRON-1968. This has exposed a
problem with how the writer classes work.

The fundamental issue is this: the writer classes operate under the
assumption that there is a 1 to 1 mapping between tuples and messages to be
written. A couple of examples:

KafkaWriter
<
https://github.com/apache/metron/blob/master/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java#L236>

-
This class writes messages by iterating through the list of tuples and
fetching the message with the same index. This is the cause of the Jira
above. We could iterate through the message list instead but then we don't
know which tuples have been fully processed. It would be possible for a
batch to be flushed before all messages from a tuple are passed to the
writer.

BulkWriterComponent
<
https://github.com/apache/metron/blob/master/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java#L250>

- The tuple list size is used to determine when a batch should be flushed.
While inherently incorrect in my opinion (should be message list size),
this also causes an issue where only the first message from the last tuple
in a batch is written.

I do not believe there are easy fixes to these problems. There is no way
to properly store the relationship between tuples and messages to be
written with the current BulkMessageWriter interface and BulkWriterResponse
class. If we did have a way, how should we handle partial failures? If
multiple messages are parsed from a tuple but only half of them are written
successfully, what should happen? Should we replay the tuple? Should we
just report the failed messages and continue on? I think it may be a good
time to review our writer classes and consider a refactor. Do others
agree? Are there easy fixes I'm missing?

Assuming there is interest in refactoring, I will throw out some ideas for
consideration. For those not as familiar with the writer classes, they are
organized as follows (in order from lowest to highest level):

Writers - These classes do the actual writing and implement the
BulkMessageWriter or MessageWriter interfaces. There are 6 implementations
I can see including KafkaWriter, SolrWriter, ElasticsearchWriter,
HdfsWriter, etc. There is also an implementation that adapts a
MessageWriter to a BulkMessageWriter (WriterToBulkWriter). The result of a
writing operation is a BulkWriterResponse containing a list of either
successful or failed tuples.

Writer Containers - This includes the BulkWriterComponent and WriterHandler
classes. These are responsible for batching and flushing messages,
handling errors and acking tuples.

Bolts - This includes ParserBolt, WriterBolt and BulkMessageWriterBolt.
These classes implement the Storm Bolt interfaces, setup writers/components
and execute tuples.

I think the first step is to reevaluate the separation of concerns for
these classes. Here is how I would change from what we currently have:

Writers - These classes should only be concerned with writing messages and
reporting what happened. They would also manage the lifecycle and
configuration of the underlying client libraries as they do now. Instead
of accepting 2 separate lists, they should accept a data structure that
accurately represents the relationship between tuples and messages.

Writer Containers - These classes would continue to handling batching and
flushing but would only report the results of a flush rather than actually
doing the acking or error handling.

Bolts - These would now be responsible for acking and error reporting on
tuples. They would transform a tuple into something the Writer Containers
can accept as input.

I think working through this and adjusting the contracts between the
different layers will be necessary to fix the bugs described above. While
we're at it I think there are other improvements we could also make:

Decouple Storm - It would be beneficial to remove the dependency on tuples
in our writers and writer containers. We could replace this with a simple
abstraction (an id would probably work fine). This will allow us to more
easily port Metron to other streaming platforms.

Remove MessageWriter Interface - This is not being actively used as far as
I can tell. Is that true? Removing this will make our code simpler and
easier to follow (WriterHandler and WriterToBulkWriter classes can probably
go away). I don't see any reason future writers, even those without bulk
writing capabilities, could not fit into the BulkMessageWriter interface.
A writer could either iterate through messages and write one at a time or
throw an exception. As far as I know, the writer interfaces are not
something we advertise as extension points. Is that true?

Consolidate our BulkMessageWriterBolt and WriterBolt classes - Is there any
reason we need both?

I'll add another item to the list that I consider absolutely necessary: we
need better tests. None of our integration tests or unit tests catch these
bugs.

This is a complex issue and there is a lot of information to process. I
realize there are upgrade complications that may come with some of these
and probably other things I haven't thought of. I will pause here and wait
for feedback or provide more clarification if needed. In summary, here is
the feedback I'm requesting:

- We have a problem with our writers. Is there an easy fix or should we
consider a broader refactor?
- How should partial failures be handled when multiple messages are
produced from a single tuple? This could be tricky because we might not
know there were failures until after some messages have already been
written.
- If we do decide to reevaluate our writer classes, what should the
separate of concerns be?
- Do we want to include other changes that may be optional but could
improve our code? Some of these may even make the refactor easier.

If someone does have an easy fix, we can work through that next. Otherwise
we can go further into details and work on designing how the interfaces
should look after we make some high level decisions. From there I think
we'll have a clear picture of how a refactor would look. Thanks in advance
for your input.

Ryan