You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@samza.apache.org by "Bae, Jae Hyeon" <me...@gmail.com> on 2015/07/10 21:36:43 UTC

Question on newBlockingQueue in BlockingEnvelopeMap

Hi Samza devs and users

I wrote customized Samza S3 consumer which downloads files from S3 and put
messages in BlockedEnvelopeMap. It was straightforward because there's a
nice example, filereader. I tried to a little optimize with
newBlockingQueue() method because I guess that single queue shared could be
fine because Samza container is single threaded. I added the following code:


public S3Consumer(String systemName, Config config, MetricsRegistry
registry) {
        queueSize = config.getInt("systems." + systemName + ".queue.size",
10000);
        bucket = config.get("systems." + systemName + ".bucket");
        prefix = config.get("systems." + systemName + ".prefix");

        queue = new LinkedBlockingQueue<>(queueSize);

        recordCounter = registry.newCounter(this.getClass().getName(),
"processed_records");
    }

@Override
    protected BlockingQueue<IncomingMessageEnvelope> newBlockingQueue() {
        return queue; // single queue
    }

Unfortunately, I observed significant message loss with this
implementation. I suspected its queue might have dropped messages, so I
changed newBlockingQueue() implementation same as filereader.

@Override
    protected BlockingQueue<IncomingMessageEnvelope> newBlockingQueue() {
        return new LinkedBlockingQueue<>(queueSize);
    }

Then, message loss didn't happen again.

Do you have any idea why it went wrong?

Thank you
Best, Jae

Re: Question on newBlockingQueue in BlockingEnvelopeMap

Posted by "Bae, Jae Hyeon" <me...@gmail.com>.
I expected it should have not lost messages but it did. After I fixed
overridden method, it was fixed.

Anyway, thanks a lot for responding.

On Fri, Jul 10, 2015 at 2:11 PM, Yan Fang <ya...@gmail.com> wrote:

> Hi Jae,
>
> I think the messages are not "lost", instead, they all go to one partition,
> in your "shared queue" implementation.
>
> If you check the code in BlockingEnvelopeMap line 123
> <
> https://github.com/apache/samza/blob/master/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java#L123
> >
> ,
> it puts all the messages in the queue in one partition.
>
> Thanks,
>
> Fang, Yan
> yanfang724@gmail.com
>
> On Fri, Jul 10, 2015 at 12:36 PM, Bae, Jae Hyeon <me...@gmail.com>
> wrote:
>
> > Hi Samza devs and users
> >
> > I wrote customized Samza S3 consumer which downloads files from S3 and
> put
> > messages in BlockedEnvelopeMap. It was straightforward because there's a
> > nice example, filereader. I tried to a little optimize with
> > newBlockingQueue() method because I guess that single queue shared could
> be
> > fine because Samza container is single threaded. I added the following
> > code:
> >
> >
> > public S3Consumer(String systemName, Config config, MetricsRegistry
> > registry) {
> >         queueSize = config.getInt("systems." + systemName +
> ".queue.size",
> > 10000);
> >         bucket = config.get("systems." + systemName + ".bucket");
> >         prefix = config.get("systems." + systemName + ".prefix");
> >
> >         queue = new LinkedBlockingQueue<>(queueSize);
> >
> >         recordCounter = registry.newCounter(this.getClass().getName(),
> > "processed_records");
> >     }
> >
> > @Override
> >     protected BlockingQueue<IncomingMessageEnvelope> newBlockingQueue() {
> >         return queue; // single queue
> >     }
> >
> > Unfortunately, I observed significant message loss with this
> > implementation. I suspected its queue might have dropped messages, so I
> > changed newBlockingQueue() implementation same as filereader.
> >
> > @Override
> >     protected BlockingQueue<IncomingMessageEnvelope> newBlockingQueue() {
> >         return new LinkedBlockingQueue<>(queueSize);
> >     }
> >
> > Then, message loss didn't happen again.
> >
> > Do you have any idea why it went wrong?
> >
> > Thank you
> > Best, Jae
> >
>

Re: Question on newBlockingQueue in BlockingEnvelopeMap

Posted by Yan Fang <ya...@gmail.com>.
Hi Jae,

I think the messages are not "lost", instead, they all go to one partition,
in your "shared queue" implementation.

If you check the code in BlockingEnvelopeMap line 123
<https://github.com/apache/samza/blob/master/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java#L123>
,
it puts all the messages in the queue in one partition.

Thanks,

Fang, Yan
yanfang724@gmail.com

On Fri, Jul 10, 2015 at 12:36 PM, Bae, Jae Hyeon <me...@gmail.com> wrote:

> Hi Samza devs and users
>
> I wrote customized Samza S3 consumer which downloads files from S3 and put
> messages in BlockedEnvelopeMap. It was straightforward because there's a
> nice example, filereader. I tried to a little optimize with
> newBlockingQueue() method because I guess that single queue shared could be
> fine because Samza container is single threaded. I added the following
> code:
>
>
> public S3Consumer(String systemName, Config config, MetricsRegistry
> registry) {
>         queueSize = config.getInt("systems." + systemName + ".queue.size",
> 10000);
>         bucket = config.get("systems." + systemName + ".bucket");
>         prefix = config.get("systems." + systemName + ".prefix");
>
>         queue = new LinkedBlockingQueue<>(queueSize);
>
>         recordCounter = registry.newCounter(this.getClass().getName(),
> "processed_records");
>     }
>
> @Override
>     protected BlockingQueue<IncomingMessageEnvelope> newBlockingQueue() {
>         return queue; // single queue
>     }
>
> Unfortunately, I observed significant message loss with this
> implementation. I suspected its queue might have dropped messages, so I
> changed newBlockingQueue() implementation same as filereader.
>
> @Override
>     protected BlockingQueue<IncomingMessageEnvelope> newBlockingQueue() {
>         return new LinkedBlockingQueue<>(queueSize);
>     }
>
> Then, message loss didn't happen again.
>
> Do you have any idea why it went wrong?
>
> Thank you
> Best, Jae
>