You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flume.apache.org by Brendan Coffey <br...@ecofactor.com> on 2013/08/24 01:20:46 UTC

Can I replay events from the rolling file sink using the spooling directory source?

I'm using Flume 1.3.1, on both OS X 10.8.3 and CentOS 6.4.

I have an agent configuration in which, in some cases, events are
considered to be unprocessable, for potentially-transient reasons. In
this case, I want to be able to persist these events to disk, and to
"replay" them back into the agent later on. What I'm finding is that,
when I attempt to read the events back in, I'm not able to access the
event body in the way I expect.

I'm using the usual Java ByteStream dance to serialize an object into
bytes, and attach it as the Event body. Then I use the Rolling File
Sink to write the events to a file on disk. Later, I use the Spooling
Directory Source to read those files back.

However, I'm seeing java.io.EOFException when I try to deserialize the
objects out of the Event's body. Is there anything tricky to how the
file sink and file source handle the serialization?

Here's the sink's configuration:

efagent.sinks.persistenceSink.channel = persistenceChannel
efagent.sinks.persistenceSink.type = file_roll
efagent.sinks.persistenceSink.sink.directory =
/Library/Flume/Home/spool/failed_events
efagent.sinks.persistenceSink.sink.rollInterval = 0

And the source I'm trying to replay with:

efagent.sources.replaySource.type = spooldir
efagent.sources.replaySource.spoolDir = /Library/Flume/Home/spool/replay_spool
efagent.sources.replaySource.channels = mgmtChannel tenPackDataChannel
batchedTenPackDataChannel batchedSiteStatusChannel miscChannel
efagent.sources.replaySource.interceptors = decoder
efagent.sources.replaySource.interceptors.decoder.type =
com.ecofactor.collector.icontrol.utils.PersistedEventReplayInterceptor$Builder

Right now, the only thing the Interceptor is doing is printing out the
contents of the eevent message, and trying (and failing) to
deserialize it.

Please let me know if there's any other info I can provide to clarify
the situation.

Thanks!

Brendan Coffey
Senior Software Engineer
EcoFactor® Energy Management Solutions