You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@samza.apache.org by "Bae, Jae Hyeon" <me...@gmail.com> on 2015/07/10 21:36:43 UTC
Question on newBlockingQueue in BlockingEnvelopeMap
Hi Samza devs and users
I wrote customized Samza S3 consumer which downloads files from S3 and put
messages in BlockedEnvelopeMap. It was straightforward because there's a
nice example, filereader. I tried to a little optimize with
newBlockingQueue() method because I guess that single queue shared could be
fine because Samza container is single threaded. I added the following code:
public S3Consumer(String systemName, Config config, MetricsRegistry
registry) {
queueSize = config.getInt("systems." + systemName + ".queue.size",
10000);
bucket = config.get("systems." + systemName + ".bucket");
prefix = config.get("systems." + systemName + ".prefix");
queue = new LinkedBlockingQueue<>(queueSize);
recordCounter = registry.newCounter(this.getClass().getName(),
"processed_records");
}
@Override
protected BlockingQueue<IncomingMessageEnvelope> newBlockingQueue() {
return queue; // single queue
}
Unfortunately, I observed significant message loss with this
implementation. I suspected its queue might have dropped messages, so I
changed newBlockingQueue() implementation same as filereader.
@Override
protected BlockingQueue<IncomingMessageEnvelope> newBlockingQueue() {
return new LinkedBlockingQueue<>(queueSize);
}
Then, message loss didn't happen again.
Do you have any idea why it went wrong?
Thank you
Best, Jae
Re: Question on newBlockingQueue in BlockingEnvelopeMap
Posted by "Bae, Jae Hyeon" <me...@gmail.com>.
I expected it should have not lost messages but it did. After I fixed
overridden method, it was fixed.
Anyway, thanks a lot for responding.
On Fri, Jul 10, 2015 at 2:11 PM, Yan Fang <ya...@gmail.com> wrote:
> Hi Jae,
>
> I think the messages are not "lost", instead, they all go to one partition,
> in your "shared queue" implementation.
>
> If you check the code in BlockingEnvelopeMap line 123
> <
> https://github.com/apache/samza/blob/master/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java#L123
> >
> ,
> it puts all the messages in the queue in one partition.
>
> Thanks,
>
> Fang, Yan
> yanfang724@gmail.com
>
> On Fri, Jul 10, 2015 at 12:36 PM, Bae, Jae Hyeon <me...@gmail.com>
> wrote:
>
> > Hi Samza devs and users
> >
> > I wrote customized Samza S3 consumer which downloads files from S3 and
> put
> > messages in BlockedEnvelopeMap. It was straightforward because there's a
> > nice example, filereader. I tried to a little optimize with
> > newBlockingQueue() method because I guess that single queue shared could
> be
> > fine because Samza container is single threaded. I added the following
> > code:
> >
> >
> > public S3Consumer(String systemName, Config config, MetricsRegistry
> > registry) {
> > queueSize = config.getInt("systems." + systemName +
> ".queue.size",
> > 10000);
> > bucket = config.get("systems." + systemName + ".bucket");
> > prefix = config.get("systems." + systemName + ".prefix");
> >
> > queue = new LinkedBlockingQueue<>(queueSize);
> >
> > recordCounter = registry.newCounter(this.getClass().getName(),
> > "processed_records");
> > }
> >
> > @Override
> > protected BlockingQueue<IncomingMessageEnvelope> newBlockingQueue() {
> > return queue; // single queue
> > }
> >
> > Unfortunately, I observed significant message loss with this
> > implementation. I suspected its queue might have dropped messages, so I
> > changed newBlockingQueue() implementation same as filereader.
> >
> > @Override
> > protected BlockingQueue<IncomingMessageEnvelope> newBlockingQueue() {
> > return new LinkedBlockingQueue<>(queueSize);
> > }
> >
> > Then, message loss didn't happen again.
> >
> > Do you have any idea why it went wrong?
> >
> > Thank you
> > Best, Jae
> >
>
Re: Question on newBlockingQueue in BlockingEnvelopeMap
Posted by Yan Fang <ya...@gmail.com>.
Hi Jae,
I think the messages are not "lost", instead, they all go to one partition,
in your "shared queue" implementation.
If you check the code in BlockingEnvelopeMap line 123
<https://github.com/apache/samza/blob/master/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java#L123>
,
it puts all the messages in the queue in one partition.
Thanks,
Fang, Yan
yanfang724@gmail.com
On Fri, Jul 10, 2015 at 12:36 PM, Bae, Jae Hyeon <me...@gmail.com> wrote:
> Hi Samza devs and users
>
> I wrote customized Samza S3 consumer which downloads files from S3 and put
> messages in BlockedEnvelopeMap. It was straightforward because there's a
> nice example, filereader. I tried to a little optimize with
> newBlockingQueue() method because I guess that single queue shared could be
> fine because Samza container is single threaded. I added the following
> code:
>
>
> public S3Consumer(String systemName, Config config, MetricsRegistry
> registry) {
> queueSize = config.getInt("systems." + systemName + ".queue.size",
> 10000);
> bucket = config.get("systems." + systemName + ".bucket");
> prefix = config.get("systems." + systemName + ".prefix");
>
> queue = new LinkedBlockingQueue<>(queueSize);
>
> recordCounter = registry.newCounter(this.getClass().getName(),
> "processed_records");
> }
>
> @Override
> protected BlockingQueue<IncomingMessageEnvelope> newBlockingQueue() {
> return queue; // single queue
> }
>
> Unfortunately, I observed significant message loss with this
> implementation. I suspected its queue might have dropped messages, so I
> changed newBlockingQueue() implementation same as filereader.
>
> @Override
> protected BlockingQueue<IncomingMessageEnvelope> newBlockingQueue() {
> return new LinkedBlockingQueue<>(queueSize);
> }
>
> Then, message loss didn't happen again.
>
> Do you have any idea why it went wrong?
>
> Thank you
> Best, Jae
>