You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@samza.apache.org by Thees Gieselmann <t....@mytaxi.com> on 2016/07/27 07:31:56 UTC

HdfsWriter opens a Bucket for every new file

Hello,

using the HdfsWriter provided by samza package samza-hdfs we try to write
events to HDFS.
After the newest patches in version 0.10.1 the Bug regarding closing files
was fixed. But the Bucketer does not seem to work with any of the given
HdfsWriter implementations.
Every new event sent to the hdfs outputstream system is creating a new file
on HDFS. According to
documentation "systems.hdfsstream.producer.hdfs.write.batch.size.bytes=67108864"
should have taken care of appending events to an existing file until the
bytes threshold is reached.
Is this a known bug or have I missed sth in my implementation.

Code Snippet:


> # HDFS System
>
> systems.hdfsstream.samza.factory=org.apache.samza.system.hdfs.HdfsSystemFactory
>
> systems.hdfsstream.producer.hdfs.writer.class=org.apache.samza.system.hdfs.writer.BinarySequenceFileHdfsWriter
> systems.hdfsstream.producer.hdfs.write.batch.size.bytes=67108864
> systems.hdfsstream.samza.msg.serde=json
>
> # The base dir for HDFS output. The default Bucketer for SequenceFile
> HdfsWriters
>
> systems.hdfsstream.producer.hdfs.base.output.dir=/user/hive/warehouse/foobar
> # Bucket into following
>
> systems.hdfsstream.producer.hdfs.bucketer.class=org.apache.samza.system.hdfs.writer.JobNameDateTimeBucketer
> systems.hdfsstream.producer.hdfs.bucketer.date.path.format=yyyy
>
>


> private final SystemStream outputStream;
> outputStream = new SystemStream("hdfsstream", "foobarTask");
> >
>
>

@Override
> public void process(IncomingMessageEnvelope envelope, MessageCollector
> messageCollector, TaskCoordinator taskCoordinator) throws
> ClassNotFoundException, SQLException
> {
>     final Timer.Context context = registry.timer("foobar").time();
>     try
>     {
>         String incoming = (String) envelope.getMessage();
>         GsonBuilder gsonBuilder = new
> GsonBuilder().registerTypeAdapter(DateTime.class, new DateTimeConverter());
>         Gson gson = gsonBuilder.create();
>         SomeClass message = gson.fromJson(incoming, SomeClass.class);
>
>         try{
>             messageCollector.send(new
> OutgoingMessageEnvelope(outputStream, gson.toJson(message)));
>             registry.counter("foobar").inc();
>         }
>         catch (Exception e)
>         {
>             LOGGER.error("error with message: ", e);
>             registry.counter("failedProcessCounter").inc();
>         }
>     }
>     finally
>     {
>         context.stop();
>     }
> }



Kind regards
Thees Gieselmann

-- 
Board of Directors: Jan-Niclaus Mewes, Claas Heiland
Commercial Register: HRB 110377

The information and attached file(s) (if any) contained in this email is 
confidential and may be legally privileged. It is intended solely for the 
addressee. Any access to this email by persons other than the addressee is 
prohibited. If you are not the addressee for whom this email is intended, 
you may not disclose, copy, distribute or store this email. If you receive 
this in error, please delete and email confirmation to the sender. Thank 
you for your cooperation.

Re: HdfsWriter opens a Bucket for every new file

Posted by Yi Pan <ni...@gmail.com>.
@Thees,

Could you open a JIRA to track this issue? And could you also describe the
issue in more specific details in the JIRA? e.g. when you mentioned that
"HdfsWriter opens a Bucket for every new file", do you mean that HDFSWriter
will open a new file everytime a new event is sent via HdfsSystemProducer?
Could you also attach the full config and container logs as well in the
JIRA?

Thanks!

-Yi

On Wed, Jul 27, 2016 at 12:31 AM, Thees Gieselmann <t....@mytaxi.com>
wrote:

> Hello,
>
> using the HdfsWriter provided by samza package samza-hdfs we try to write
> events to HDFS.
> After the newest patches in version 0.10.1 the Bug regarding closing files
> was fixed. But the Bucketer does not seem to work with any of the given
> HdfsWriter implementations.
> Every new event sent to the hdfs outputstream system is creating a new file
> on HDFS. According to
> documentation
> "systems.hdfsstream.producer.hdfs.write.batch.size.bytes=67108864"
> should have taken care of appending events to an existing file until the
> bytes threshold is reached.
> Is this a known bug or have I missed sth in my implementation.
>
> Code Snippet:
>
>
> > # HDFS System
> >
> >
> systems.hdfsstream.samza.factory=org.apache.samza.system.hdfs.HdfsSystemFactory
> >
> >
> systems.hdfsstream.producer.hdfs.writer.class=org.apache.samza.system.hdfs.writer.BinarySequenceFileHdfsWriter
> > systems.hdfsstream.producer.hdfs.write.batch.size.bytes=67108864
> > systems.hdfsstream.samza.msg.serde=json
> >
> > # The base dir for HDFS output. The default Bucketer for SequenceFile
> > HdfsWriters
> >
> >
> systems.hdfsstream.producer.hdfs.base.output.dir=/user/hive/warehouse/foobar
> > # Bucket into following
> >
> >
> systems.hdfsstream.producer.hdfs.bucketer.class=org.apache.samza.system.hdfs.writer.JobNameDateTimeBucketer
> > systems.hdfsstream.producer.hdfs.bucketer.date.path.format=yyyy
> >
> >
>
>
> > private final SystemStream outputStream;
> > outputStream = new SystemStream("hdfsstream", "foobarTask");
> > >
> >
> >
>
> @Override
> > public void process(IncomingMessageEnvelope envelope, MessageCollector
> > messageCollector, TaskCoordinator taskCoordinator) throws
> > ClassNotFoundException, SQLException
> > {
> >     final Timer.Context context = registry.timer("foobar").time();
> >     try
> >     {
> >         String incoming = (String) envelope.getMessage();
> >         GsonBuilder gsonBuilder = new
> > GsonBuilder().registerTypeAdapter(DateTime.class, new
> DateTimeConverter());
> >         Gson gson = gsonBuilder.create();
> >         SomeClass message = gson.fromJson(incoming, SomeClass.class);
> >
> >         try{
> >             messageCollector.send(new
> > OutgoingMessageEnvelope(outputStream, gson.toJson(message)));
> >             registry.counter("foobar").inc();
> >         }
> >         catch (Exception e)
> >         {
> >             LOGGER.error("error with message: ", e);
> >             registry.counter("failedProcessCounter").inc();
> >         }
> >     }
> >     finally
> >     {
> >         context.stop();
> >     }
> > }
>
>
>
> Kind regards
> Thees Gieselmann
>
> --
> Board of Directors: Jan-Niclaus Mewes, Claas Heiland
> Commercial Register: HRB 110377
>
> The information and attached file(s) (if any) contained in this email is
> confidential and may be legally privileged. It is intended solely for the
> addressee. Any access to this email by persons other than the addressee is
> prohibited. If you are not the addressee for whom this email is intended,
> you may not disclose, copy, distribute or store this email. If you receive
> this in error, please delete and email confirmation to the sender. Thank
> you for your cooperation.
>