You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flume.apache.org by Andrew Jones <an...@gmail.com> on 2012/11/14 17:55:11 UTC

Custom sink - "close() called when transaction is OPEN" error

Hi,

I have a custom sink which has been working fine, but recently I have
started seeing this error in the logs:

Unable to deliver event. Exception follows.
java.lang.IllegalStateException: close() called when transaction is OPEN -
you must either commit or rollback first
        at
com.google.common.base.Preconditions.checkState(Preconditions.java:176)
...


After having a google and finding
https://issues.apache.org/jira/browse/FLUME-1089, I have double checked I
am using the correct try, catch, finally idiom that other sinks use, and I
seem to be doing the same. I do the following:

public Status process() throws EventDeliveryException {
Status status = Status.READY;

Channel channel = getChannel();
 Transaction transaction = channel.getTransaction();

try {
transaction.begin();

                        // does a bit of processing and
                        // writes out the event to MongoDB

                        transaction.commit();

} catch (Throwable t) {
transaction.rollback();

if (t instanceof Error) {
 throw (Error) t;
} else if  (t instanceof EventDeliveryException) {
throw (EventDeliveryException) t;
 } else if (t instanceof ChannelException) {
logger.error("Brodie Log Sink " + getName() + ": Unable to get event from" +
 " channel " + channel.getName() + ". Exception follows.", t);
status = Status.BACKOFF;
 } else {
throw new EventDeliveryException("Failed to send events", t);
}
 } finally {
transaction.close();
}

return status;
}

}

All of this code came from looking at other sinks (Avro and HDFS), so I am
pretty sure its correct.

Can anyone see anything that might be a problem, or is there anything else
I can do to avoid this error?

Thanks,
Andrew

Re: Custom sink - "close() called when transaction is OPEN" error

Posted by Roshan Naik <ro...@hortonworks.com>.
Brock.. I assume you mean "abort" when you say "destroy".. if so yes I too
think that closing should abort an uncommitted transaction. I had the same
thought when reading through the memory channel implementation.

-roshan



On Thu, Nov 15, 2012 at 4:50 AM, Brock Noland <br...@cloudera.com> wrote:

> Can you log the Throwable as the first thing in the catch block to see
> if something and what it is, is being thrown?
>
> Transactions are thread local so if for some reason the the sequencing
> gets messed up on an earlier call the process, every call to
> transaction will thrown an exception including begin.
>
>
> https://github.com/apache/flume/blob/trunk/flume-ng-core/src/main/java/org/apache/flume/channel/BasicTransactionSemantics.java
>
> As I stated in FLUME-1089 I think that when close is called it should
> forcefully destroy the transaction like JDBC close() but I have not
> got much agreement.
>
>
> On Thu, Nov 15, 2012 at 5:24 AM, Andrew Jones <an...@gmail.com>
> wrote:
> > We are using Flume 1.2.0. We have a custom source, although it passes
> > through an Avro Sink and Source before getting to the sink. We are now
> using
> > the memory channel, although had just switched from the JDBC channel
> when we
> > started seeing these errors, so maybe that's something to do with it?
> >
> > I tried wrapping transaction.rollback(); in a try catch and logging in
> the
> > catch, but it wasn't called, so I don't think the rollback is throwing an
> > error.
> >
> > I think it may have something to do with switching channels, as right
> after
> > Flume reloaded the config we started getting errors. I have restarted the
> > flume node manually and we are still getting the error.
> >
> > Thanks,
> > Andrew
> >
> >
> > On 14 November 2012 20:02, Hari Shreedharan <hs...@cloudera.com>
> > wrote:
> >>
> >> Which version of Flume are you using? It looks like the transaction was
> >> never rolled back or committed. It is likely that the rollback method
> too
> >> threw some exception, and the rollback was not successful. Also, what
> >> channel are you using?
> >>
> >>
> >> Thanks,
> >> Hari
> >>
> >> --
> >> Hari Shreedharan
> >>
> >> On Wednesday, November 14, 2012 at 8:55 AM, Andrew Jones wrote:
> >>
> >> Hi,
> >>
> >> I have a custom sink which has been working fine, but recently I have
> >> started seeing this error in the logs:
> >>
> >> Unable to deliver event. Exception follows.
> >> java.lang.IllegalStateException: close() called when transaction is
> OPEN -
> >> you must either commit or rollback first
> >>         at
> >> com.google.common.base.Preconditions.checkState(Preconditions.java:176)
> >> ...
> >>
> >>
> >> After having a google and finding
> >> https://issues.apache.org/jira/browse/FLUME-1089, I have double
> checked I am
> >> using the correct try, catch, finally idiom that other sinks use, and I
> seem
> >> to be doing the same. I do the following:
> >>
> >> public Status process() throws EventDeliveryException {
> >> Status status = Status.READY;
> >>
> >> Channel channel = getChannel();
> >> Transaction transaction = channel.getTransaction();
> >>
> >> try {
> >> transaction.begin();
> >>
> >>                         // does a bit of processing and
> >>                         // writes out the event to MongoDB
> >>
> >>                         transaction.commit();
> >>
> >> } catch (Throwable t) {
> >> transaction.rollback();
> >>
> >> if (t instanceof Error) {
> >> throw (Error) t;
> >> } else if  (t instanceof EventDeliveryException) {
> >> throw (EventDeliveryException) t;
> >> } else if (t instanceof ChannelException) {
> >> logger.error("Brodie Log Sink " + getName() + ": Unable to get event
> from"
> >> +
> >> " channel " + channel.getName() + ". Exception follows.", t);
> >> status = Status.BACKOFF;
> >> } else {
> >> throw new EventDeliveryException("Failed to send events", t);
> >> }
> >> } finally {
> >> transaction.close();
> >> }
> >>
> >> return status;
> >> }
> >>
> >> }
> >>
> >> All of this code came from looking at other sinks (Avro and HDFS), so I
> am
> >> pretty sure its correct.
> >>
> >> Can anyone see anything that might be a problem, or is there anything
> else
> >> I can do to avoid this error?
> >>
> >> Thanks,
> >> Andrew
> >>
> >>
> >
>
>
>
> --
> Apache MRUnit - Unit testing MapReduce -
> http://incubator.apache.org/mrunit/
>

Re: Custom sink - "close() called when transaction is OPEN" error

Posted by Andrew Jones <an...@gmail.com>.
OK, got it. I had this:

Event event = channel.take();
if (event == null) {
return Status.BACKOFF;
}

I changed it to:

if (event == null) {
        transaction.commit();
return Status.BACKOFF;
}

And it looks like it has fixed the issue.

Thanks a lot for your help. Much appreciated!

Andrew


On 16 November 2012 16:02, Brock Noland <br...@cloudera.com> wrote:

> If channel.take() returns null, no commit or rollback is called....
>
> Checkout how logger sink handles this:
>
>
> https://git-wip-us.apache.org/repos/asf?p=flume.git;a=blob;f=flume-ng-core/src/main/java/org/apache/flume/sink/LoggerSink.java;h=128fa8427af633c0c7c50093f8f6c4ef9bb8ae76;hb=HEAD
>
> brock
>
> On Fri, Nov 16, 2012 at 9:45 AM, Andrew Jones <an...@gmail.com>
> wrote:
> > Sure.
> >
> > Sink: http://pastebin.com/N6zh73hU
> > Config: http://pastebin.com/Tc2MH9iV
> >
> > Thanks.
> >
> >
> > On 16 November 2012 15:32, Brock Noland <br...@cloudera.com> wrote:
> >>
> >> Would you be able to send the source of your sink via pastbin in
> >> addition to your config?
> >>
> >> On Fri, Nov 16, 2012 at 9:21 AM, Andrew Jones <an...@gmail.com>
> >> wrote:
> >> > I tried logging the first throwable, but now that is just the
> >> > IllegalStateException.
> >> >
> >> > Today I have been looking at Flume-1.3.0rc3 and I have noticed the
> same
> >> > problem. This is using the Avro source, File channel and our custom
> >> > sink.
> >> > After Flume reloads its config, the first error message comes when the
> >> > Avro
> >> > source starts up:
> >> >
> >> > 16 Nov 2012 16:04:25,237 INFO  [lifecycleSupervisor-1-4]
> >> > (org.apache.flume.source.AvroSource.start:142)  - Starting Avro source
> >> > source: { bindAddress: 0.0.0.0, port: 36060 }...
> >> > 16 Nov 2012 16:04:25,484 ERROR
> >> > [SinkRunner-PollingRunner-DefaultSinkProcessor]
> >> > (org.apache.flume.SinkRunner$PollingRunner.run:160)  - Unable to
> deliver
> >> > event. Exception follows.
> >> > java.lang.IllegalStateException: close() called when transaction is
> OPEN
> >> > -
> >> > you must either commit or rollback first
> >> >         at
> >> >
> com.google.common.base.Preconditions.checkState(Preconditions.java:176)
> >> >         at
> >> >
> >> >
> org.apache.flume.channel.BasicTransactionSemantics.close(BasicTransactionSemantics.java:179)
> >> >         at
> >> >
> >> >
> com.arm.pd.brodie.flume.sink.ResultSink.processSingle(ResultSink.java:440)
> >> >         at
> >> > com.arm.pd.brodie.flume.sink.ResultSink.process(ResultSink.java:172)
> >> >         at
> >> >
> >> >
> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
> >> >         at
> >> > org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
> >> >         at java.lang.Thread.run(Thread.java:636)
> >> > 16 Nov 2012 16:04:25,594 INFO  [lifecycleSupervisor-1-4]
> >> > (org.apache.flume.instrumentation.MonitoredCounterGroup.register:89)
>  -
> >> > Monitoried counter group for type: SOURCE, name: source, registered
> >> > successfully.
> >> > 16 Nov 2012 16:04:25,595 INFO  [lifecycleSupervisor-1-4]
> >> > (org.apache.flume.instrumentation.MonitoredCounterGroup.start:73)  -
> >> > Component type: SOURCE, name: source started
> >> > 16 Nov 2012 16:04:25,595 INFO  [lifecycleSupervisor-1-4]
> >> > (org.apache.flume.source.AvroSource.start:168)  - Avro source source
> >> > started.
> >> >
> >> > I then continually get errors from the Sink, presumably as its been
> >> > called
> >> > periodically to check for events in the channel. So is it possible its
> >> > the
> >> > Avro source causing the issue?
> >> >
> >> > There should have been nothing persisted in the file channel when
> >> > restarting.
> >> >
> >> > When the transaction gets messed up like this, is there a way to
> refresh
> >> > it,
> >> > preferably without losing any data?
> >> >
> >> > I am still able to send things to flume and they get processed and
> >> > inserted
> >> > by my sink, so it still seems to work OK.
> >> >
> >> > Thanks,
> >> > Andrew
> >> >
> >> >
> >> >
> >> > On 15 November 2012 12:50, Brock Noland <br...@cloudera.com> wrote:
> >> >>
> >> >> Can you log the Throwable as the first thing in the catch block to
> see
> >> >> if something and what it is, is being thrown?
> >> >>
> >> >> Transactions are thread local so if for some reason the the
> sequencing
> >> >> gets messed up on an earlier call the process, every call to
> >> >> transaction will thrown an exception including begin.
> >> >>
> >> >>
> >> >>
> >> >>
> https://github.com/apache/flume/blob/trunk/flume-ng-core/src/main/java/org/apache/flume/channel/BasicTransactionSemantics.java
> >> >>
> >> >> As I stated in FLUME-1089 I think that when close is called it should
> >> >> forcefully destroy the transaction like JDBC close() but I have not
> >> >> got much agreement.
> >> >>
> >> >>
> >> >> On Thu, Nov 15, 2012 at 5:24 AM, Andrew Jones <
> andrewjones86@gmail.com>
> >> >> wrote:
> >> >> > We are using Flume 1.2.0. We have a custom source, although it
> passes
> >> >> > through an Avro Sink and Source before getting to the sink. We are
> >> >> > now
> >> >> > using
> >> >> > the memory channel, although had just switched from the JDBC
> channel
> >> >> > when we
> >> >> > started seeing these errors, so maybe that's something to do with
> it?
> >> >> >
> >> >> > I tried wrapping transaction.rollback(); in a try catch and logging
> >> >> > in
> >> >> > the
> >> >> > catch, but it wasn't called, so I don't think the rollback is
> >> >> > throwing
> >> >> > an
> >> >> > error.
> >> >> >
> >> >> > I think it may have something to do with switching channels, as
> right
> >> >> > after
> >> >> > Flume reloaded the config we started getting errors. I have
> restarted
> >> >> > the
> >> >> > flume node manually and we are still getting the error.
> >> >> >
> >> >> > Thanks,
> >> >> > Andrew
> >> >> >
> >> >> >
> >> >> > On 14 November 2012 20:02, Hari Shreedharan
> >> >> > <hs...@cloudera.com>
> >> >> > wrote:
> >> >> >>
> >> >> >> Which version of Flume are you using? It looks like the
> transaction
> >> >> >> was
> >> >> >> never rolled back or committed. It is likely that the rollback
> >> >> >> method
> >> >> >> too
> >> >> >> threw some exception, and the rollback was not successful. Also,
> >> >> >> what
> >> >> >> channel are you using?
> >> >> >>
> >> >> >>
> >> >> >> Thanks,
> >> >> >> Hari
> >> >> >>
> >> >> >> --
> >> >> >> Hari Shreedharan
> >> >> >>
> >> >> >> On Wednesday, November 14, 2012 at 8:55 AM, Andrew Jones wrote:
> >> >> >>
> >> >> >> Hi,
> >> >> >>
> >> >> >> I have a custom sink which has been working fine, but recently I
> >> >> >> have
> >> >> >> started seeing this error in the logs:
> >> >> >>
> >> >> >> Unable to deliver event. Exception follows.
> >> >> >> java.lang.IllegalStateException: close() called when transaction
> is
> >> >> >> OPEN -
> >> >> >> you must either commit or rollback first
> >> >> >>         at
> >> >> >>
> >> >> >>
> com.google.common.base.Preconditions.checkState(Preconditions.java:176)
> >> >> >> ...
> >> >> >>
> >> >> >>
> >> >> >> After having a google and finding
> >> >> >> https://issues.apache.org/jira/browse/FLUME-1089, I have double
> >> >> >> checked
> >> >> >> I am
> >> >> >> using the correct try, catch, finally idiom that other sinks use,
> >> >> >> and I
> >> >> >> seem
> >> >> >> to be doing the same. I do the following:
> >> >> >>
> >> >> >> public Status process() throws EventDeliveryException {
> >> >> >> Status status = Status.READY;
> >> >> >>
> >> >> >> Channel channel = getChannel();
> >> >> >> Transaction transaction = channel.getTransaction();
> >> >> >>
> >> >> >> try {
> >> >> >> transaction.begin();
> >> >> >>
> >> >> >>                         // does a bit of processing and
> >> >> >>                         // writes out the event to MongoDB
> >> >> >>
> >> >> >>                         transaction.commit();
> >> >> >>
> >> >> >> } catch (Throwable t) {
> >> >> >> transaction.rollback();
> >> >> >>
> >> >> >> if (t instanceof Error) {
> >> >> >> throw (Error) t;
> >> >> >> } else if  (t instanceof EventDeliveryException) {
> >> >> >> throw (EventDeliveryException) t;
> >> >> >> } else if (t instanceof ChannelException) {
> >> >> >> logger.error("Brodie Log Sink " + getName() + ": Unable to get
> event
> >> >> >> from"
> >> >> >> +
> >> >> >> " channel " + channel.getName() + ". Exception follows.", t);
> >> >> >> status = Status.BACKOFF;
> >> >> >> } else {
> >> >> >> throw new EventDeliveryException("Failed to send events", t);
> >> >> >> }
> >> >> >> } finally {
> >> >> >> transaction.close();
> >> >> >> }
> >> >> >>
> >> >> >> return status;
> >> >> >> }
> >> >> >>
> >> >> >> }
> >> >> >>
> >> >> >> All of this code came from looking at other sinks (Avro and HDFS),
> >> >> >> so I
> >> >> >> am
> >> >> >> pretty sure its correct.
> >> >> >>
> >> >> >> Can anyone see anything that might be a problem, or is there
> >> >> >> anything
> >> >> >> else
> >> >> >> I can do to avoid this error?
> >> >> >>
> >> >> >> Thanks,
> >> >> >> Andrew
> >> >> >>
> >> >> >>
> >> >> >
> >> >>
> >> >>
> >> >>
> >> >> --
> >> >> Apache MRUnit - Unit testing MapReduce -
> >> >> http://incubator.apache.org/mrunit/
> >> >
> >> >
> >>
> >>
> >>
> >> --
> >> Apache MRUnit - Unit testing MapReduce -
> >> http://incubator.apache.org/mrunit/
> >
> >
>
>
>
> --
> Apache MRUnit - Unit testing MapReduce -
> http://incubator.apache.org/mrunit/
>

Re: Custom sink - "close() called when transaction is OPEN" error

Posted by Brock Noland <br...@cloudera.com>.
If channel.take() returns null, no commit or rollback is called....

Checkout how logger sink handles this:

https://git-wip-us.apache.org/repos/asf?p=flume.git;a=blob;f=flume-ng-core/src/main/java/org/apache/flume/sink/LoggerSink.java;h=128fa8427af633c0c7c50093f8f6c4ef9bb8ae76;hb=HEAD

brock

On Fri, Nov 16, 2012 at 9:45 AM, Andrew Jones <an...@gmail.com> wrote:
> Sure.
>
> Sink: http://pastebin.com/N6zh73hU
> Config: http://pastebin.com/Tc2MH9iV
>
> Thanks.
>
>
> On 16 November 2012 15:32, Brock Noland <br...@cloudera.com> wrote:
>>
>> Would you be able to send the source of your sink via pastbin in
>> addition to your config?
>>
>> On Fri, Nov 16, 2012 at 9:21 AM, Andrew Jones <an...@gmail.com>
>> wrote:
>> > I tried logging the first throwable, but now that is just the
>> > IllegalStateException.
>> >
>> > Today I have been looking at Flume-1.3.0rc3 and I have noticed the same
>> > problem. This is using the Avro source, File channel and our custom
>> > sink.
>> > After Flume reloads its config, the first error message comes when the
>> > Avro
>> > source starts up:
>> >
>> > 16 Nov 2012 16:04:25,237 INFO  [lifecycleSupervisor-1-4]
>> > (org.apache.flume.source.AvroSource.start:142)  - Starting Avro source
>> > source: { bindAddress: 0.0.0.0, port: 36060 }...
>> > 16 Nov 2012 16:04:25,484 ERROR
>> > [SinkRunner-PollingRunner-DefaultSinkProcessor]
>> > (org.apache.flume.SinkRunner$PollingRunner.run:160)  - Unable to deliver
>> > event. Exception follows.
>> > java.lang.IllegalStateException: close() called when transaction is OPEN
>> > -
>> > you must either commit or rollback first
>> >         at
>> > com.google.common.base.Preconditions.checkState(Preconditions.java:176)
>> >         at
>> >
>> > org.apache.flume.channel.BasicTransactionSemantics.close(BasicTransactionSemantics.java:179)
>> >         at
>> >
>> > com.arm.pd.brodie.flume.sink.ResultSink.processSingle(ResultSink.java:440)
>> >         at
>> > com.arm.pd.brodie.flume.sink.ResultSink.process(ResultSink.java:172)
>> >         at
>> >
>> > org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>> >         at
>> > org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>> >         at java.lang.Thread.run(Thread.java:636)
>> > 16 Nov 2012 16:04:25,594 INFO  [lifecycleSupervisor-1-4]
>> > (org.apache.flume.instrumentation.MonitoredCounterGroup.register:89)  -
>> > Monitoried counter group for type: SOURCE, name: source, registered
>> > successfully.
>> > 16 Nov 2012 16:04:25,595 INFO  [lifecycleSupervisor-1-4]
>> > (org.apache.flume.instrumentation.MonitoredCounterGroup.start:73)  -
>> > Component type: SOURCE, name: source started
>> > 16 Nov 2012 16:04:25,595 INFO  [lifecycleSupervisor-1-4]
>> > (org.apache.flume.source.AvroSource.start:168)  - Avro source source
>> > started.
>> >
>> > I then continually get errors from the Sink, presumably as its been
>> > called
>> > periodically to check for events in the channel. So is it possible its
>> > the
>> > Avro source causing the issue?
>> >
>> > There should have been nothing persisted in the file channel when
>> > restarting.
>> >
>> > When the transaction gets messed up like this, is there a way to refresh
>> > it,
>> > preferably without losing any data?
>> >
>> > I am still able to send things to flume and they get processed and
>> > inserted
>> > by my sink, so it still seems to work OK.
>> >
>> > Thanks,
>> > Andrew
>> >
>> >
>> >
>> > On 15 November 2012 12:50, Brock Noland <br...@cloudera.com> wrote:
>> >>
>> >> Can you log the Throwable as the first thing in the catch block to see
>> >> if something and what it is, is being thrown?
>> >>
>> >> Transactions are thread local so if for some reason the the sequencing
>> >> gets messed up on an earlier call the process, every call to
>> >> transaction will thrown an exception including begin.
>> >>
>> >>
>> >>
>> >> https://github.com/apache/flume/blob/trunk/flume-ng-core/src/main/java/org/apache/flume/channel/BasicTransactionSemantics.java
>> >>
>> >> As I stated in FLUME-1089 I think that when close is called it should
>> >> forcefully destroy the transaction like JDBC close() but I have not
>> >> got much agreement.
>> >>
>> >>
>> >> On Thu, Nov 15, 2012 at 5:24 AM, Andrew Jones <an...@gmail.com>
>> >> wrote:
>> >> > We are using Flume 1.2.0. We have a custom source, although it passes
>> >> > through an Avro Sink and Source before getting to the sink. We are
>> >> > now
>> >> > using
>> >> > the memory channel, although had just switched from the JDBC channel
>> >> > when we
>> >> > started seeing these errors, so maybe that's something to do with it?
>> >> >
>> >> > I tried wrapping transaction.rollback(); in a try catch and logging
>> >> > in
>> >> > the
>> >> > catch, but it wasn't called, so I don't think the rollback is
>> >> > throwing
>> >> > an
>> >> > error.
>> >> >
>> >> > I think it may have something to do with switching channels, as right
>> >> > after
>> >> > Flume reloaded the config we started getting errors. I have restarted
>> >> > the
>> >> > flume node manually and we are still getting the error.
>> >> >
>> >> > Thanks,
>> >> > Andrew
>> >> >
>> >> >
>> >> > On 14 November 2012 20:02, Hari Shreedharan
>> >> > <hs...@cloudera.com>
>> >> > wrote:
>> >> >>
>> >> >> Which version of Flume are you using? It looks like the transaction
>> >> >> was
>> >> >> never rolled back or committed. It is likely that the rollback
>> >> >> method
>> >> >> too
>> >> >> threw some exception, and the rollback was not successful. Also,
>> >> >> what
>> >> >> channel are you using?
>> >> >>
>> >> >>
>> >> >> Thanks,
>> >> >> Hari
>> >> >>
>> >> >> --
>> >> >> Hari Shreedharan
>> >> >>
>> >> >> On Wednesday, November 14, 2012 at 8:55 AM, Andrew Jones wrote:
>> >> >>
>> >> >> Hi,
>> >> >>
>> >> >> I have a custom sink which has been working fine, but recently I
>> >> >> have
>> >> >> started seeing this error in the logs:
>> >> >>
>> >> >> Unable to deliver event. Exception follows.
>> >> >> java.lang.IllegalStateException: close() called when transaction is
>> >> >> OPEN -
>> >> >> you must either commit or rollback first
>> >> >>         at
>> >> >>
>> >> >> com.google.common.base.Preconditions.checkState(Preconditions.java:176)
>> >> >> ...
>> >> >>
>> >> >>
>> >> >> After having a google and finding
>> >> >> https://issues.apache.org/jira/browse/FLUME-1089, I have double
>> >> >> checked
>> >> >> I am
>> >> >> using the correct try, catch, finally idiom that other sinks use,
>> >> >> and I
>> >> >> seem
>> >> >> to be doing the same. I do the following:
>> >> >>
>> >> >> public Status process() throws EventDeliveryException {
>> >> >> Status status = Status.READY;
>> >> >>
>> >> >> Channel channel = getChannel();
>> >> >> Transaction transaction = channel.getTransaction();
>> >> >>
>> >> >> try {
>> >> >> transaction.begin();
>> >> >>
>> >> >>                         // does a bit of processing and
>> >> >>                         // writes out the event to MongoDB
>> >> >>
>> >> >>                         transaction.commit();
>> >> >>
>> >> >> } catch (Throwable t) {
>> >> >> transaction.rollback();
>> >> >>
>> >> >> if (t instanceof Error) {
>> >> >> throw (Error) t;
>> >> >> } else if  (t instanceof EventDeliveryException) {
>> >> >> throw (EventDeliveryException) t;
>> >> >> } else if (t instanceof ChannelException) {
>> >> >> logger.error("Brodie Log Sink " + getName() + ": Unable to get event
>> >> >> from"
>> >> >> +
>> >> >> " channel " + channel.getName() + ". Exception follows.", t);
>> >> >> status = Status.BACKOFF;
>> >> >> } else {
>> >> >> throw new EventDeliveryException("Failed to send events", t);
>> >> >> }
>> >> >> } finally {
>> >> >> transaction.close();
>> >> >> }
>> >> >>
>> >> >> return status;
>> >> >> }
>> >> >>
>> >> >> }
>> >> >>
>> >> >> All of this code came from looking at other sinks (Avro and HDFS),
>> >> >> so I
>> >> >> am
>> >> >> pretty sure its correct.
>> >> >>
>> >> >> Can anyone see anything that might be a problem, or is there
>> >> >> anything
>> >> >> else
>> >> >> I can do to avoid this error?
>> >> >>
>> >> >> Thanks,
>> >> >> Andrew
>> >> >>
>> >> >>
>> >> >
>> >>
>> >>
>> >>
>> >> --
>> >> Apache MRUnit - Unit testing MapReduce -
>> >> http://incubator.apache.org/mrunit/
>> >
>> >
>>
>>
>>
>> --
>> Apache MRUnit - Unit testing MapReduce -
>> http://incubator.apache.org/mrunit/
>
>



-- 
Apache MRUnit - Unit testing MapReduce - http://incubator.apache.org/mrunit/

Re: Custom sink - "close() called when transaction is OPEN" error

Posted by Andrew Jones <an...@gmail.com>.
Sure.

Sink: http://pastebin.com/N6zh73hU
Config: http://pastebin.com/Tc2MH9iV

Thanks.


On 16 November 2012 15:32, Brock Noland <br...@cloudera.com> wrote:

> Would you be able to send the source of your sink via pastbin in
> addition to your config?
>
> On Fri, Nov 16, 2012 at 9:21 AM, Andrew Jones <an...@gmail.com>
> wrote:
> > I tried logging the first throwable, but now that is just the
> > IllegalStateException.
> >
> > Today I have been looking at Flume-1.3.0rc3 and I have noticed the same
> > problem. This is using the Avro source, File channel and our custom sink.
> > After Flume reloads its config, the first error message comes when the
> Avro
> > source starts up:
> >
> > 16 Nov 2012 16:04:25,237 INFO  [lifecycleSupervisor-1-4]
> > (org.apache.flume.source.AvroSource.start:142)  - Starting Avro source
> > source: { bindAddress: 0.0.0.0, port: 36060 }...
> > 16 Nov 2012 16:04:25,484 ERROR
> > [SinkRunner-PollingRunner-DefaultSinkProcessor]
> > (org.apache.flume.SinkRunner$PollingRunner.run:160)  - Unable to deliver
> > event. Exception follows.
> > java.lang.IllegalStateException: close() called when transaction is OPEN
> -
> > you must either commit or rollback first
> >         at
> > com.google.common.base.Preconditions.checkState(Preconditions.java:176)
> >         at
> >
> org.apache.flume.channel.BasicTransactionSemantics.close(BasicTransactionSemantics.java:179)
> >         at
> >
> com.arm.pd.brodie.flume.sink.ResultSink.processSingle(ResultSink.java:440)
> >         at
> > com.arm.pd.brodie.flume.sink.ResultSink.process(ResultSink.java:172)
> >         at
> >
> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
> >         at
> > org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
> >         at java.lang.Thread.run(Thread.java:636)
> > 16 Nov 2012 16:04:25,594 INFO  [lifecycleSupervisor-1-4]
> > (org.apache.flume.instrumentation.MonitoredCounterGroup.register:89)  -
> > Monitoried counter group for type: SOURCE, name: source, registered
> > successfully.
> > 16 Nov 2012 16:04:25,595 INFO  [lifecycleSupervisor-1-4]
> > (org.apache.flume.instrumentation.MonitoredCounterGroup.start:73)  -
> > Component type: SOURCE, name: source started
> > 16 Nov 2012 16:04:25,595 INFO  [lifecycleSupervisor-1-4]
> > (org.apache.flume.source.AvroSource.start:168)  - Avro source source
> > started.
> >
> > I then continually get errors from the Sink, presumably as its been
> called
> > periodically to check for events in the channel. So is it possible its
> the
> > Avro source causing the issue?
> >
> > There should have been nothing persisted in the file channel when
> > restarting.
> >
> > When the transaction gets messed up like this, is there a way to refresh
> it,
> > preferably without losing any data?
> >
> > I am still able to send things to flume and they get processed and
> inserted
> > by my sink, so it still seems to work OK.
> >
> > Thanks,
> > Andrew
> >
> >
> >
> > On 15 November 2012 12:50, Brock Noland <br...@cloudera.com> wrote:
> >>
> >> Can you log the Throwable as the first thing in the catch block to see
> >> if something and what it is, is being thrown?
> >>
> >> Transactions are thread local so if for some reason the the sequencing
> >> gets messed up on an earlier call the process, every call to
> >> transaction will thrown an exception including begin.
> >>
> >>
> >>
> https://github.com/apache/flume/blob/trunk/flume-ng-core/src/main/java/org/apache/flume/channel/BasicTransactionSemantics.java
> >>
> >> As I stated in FLUME-1089 I think that when close is called it should
> >> forcefully destroy the transaction like JDBC close() but I have not
> >> got much agreement.
> >>
> >>
> >> On Thu, Nov 15, 2012 at 5:24 AM, Andrew Jones <an...@gmail.com>
> >> wrote:
> >> > We are using Flume 1.2.0. We have a custom source, although it passes
> >> > through an Avro Sink and Source before getting to the sink. We are now
> >> > using
> >> > the memory channel, although had just switched from the JDBC channel
> >> > when we
> >> > started seeing these errors, so maybe that's something to do with it?
> >> >
> >> > I tried wrapping transaction.rollback(); in a try catch and logging in
> >> > the
> >> > catch, but it wasn't called, so I don't think the rollback is throwing
> >> > an
> >> > error.
> >> >
> >> > I think it may have something to do with switching channels, as right
> >> > after
> >> > Flume reloaded the config we started getting errors. I have restarted
> >> > the
> >> > flume node manually and we are still getting the error.
> >> >
> >> > Thanks,
> >> > Andrew
> >> >
> >> >
> >> > On 14 November 2012 20:02, Hari Shreedharan <
> hshreedharan@cloudera.com>
> >> > wrote:
> >> >>
> >> >> Which version of Flume are you using? It looks like the transaction
> was
> >> >> never rolled back or committed. It is likely that the rollback method
> >> >> too
> >> >> threw some exception, and the rollback was not successful. Also, what
> >> >> channel are you using?
> >> >>
> >> >>
> >> >> Thanks,
> >> >> Hari
> >> >>
> >> >> --
> >> >> Hari Shreedharan
> >> >>
> >> >> On Wednesday, November 14, 2012 at 8:55 AM, Andrew Jones wrote:
> >> >>
> >> >> Hi,
> >> >>
> >> >> I have a custom sink which has been working fine, but recently I have
> >> >> started seeing this error in the logs:
> >> >>
> >> >> Unable to deliver event. Exception follows.
> >> >> java.lang.IllegalStateException: close() called when transaction is
> >> >> OPEN -
> >> >> you must either commit or rollback first
> >> >>         at
> >> >>
> com.google.common.base.Preconditions.checkState(Preconditions.java:176)
> >> >> ...
> >> >>
> >> >>
> >> >> After having a google and finding
> >> >> https://issues.apache.org/jira/browse/FLUME-1089, I have double
> checked
> >> >> I am
> >> >> using the correct try, catch, finally idiom that other sinks use,
> and I
> >> >> seem
> >> >> to be doing the same. I do the following:
> >> >>
> >> >> public Status process() throws EventDeliveryException {
> >> >> Status status = Status.READY;
> >> >>
> >> >> Channel channel = getChannel();
> >> >> Transaction transaction = channel.getTransaction();
> >> >>
> >> >> try {
> >> >> transaction.begin();
> >> >>
> >> >>                         // does a bit of processing and
> >> >>                         // writes out the event to MongoDB
> >> >>
> >> >>                         transaction.commit();
> >> >>
> >> >> } catch (Throwable t) {
> >> >> transaction.rollback();
> >> >>
> >> >> if (t instanceof Error) {
> >> >> throw (Error) t;
> >> >> } else if  (t instanceof EventDeliveryException) {
> >> >> throw (EventDeliveryException) t;
> >> >> } else if (t instanceof ChannelException) {
> >> >> logger.error("Brodie Log Sink " + getName() + ": Unable to get event
> >> >> from"
> >> >> +
> >> >> " channel " + channel.getName() + ". Exception follows.", t);
> >> >> status = Status.BACKOFF;
> >> >> } else {
> >> >> throw new EventDeliveryException("Failed to send events", t);
> >> >> }
> >> >> } finally {
> >> >> transaction.close();
> >> >> }
> >> >>
> >> >> return status;
> >> >> }
> >> >>
> >> >> }
> >> >>
> >> >> All of this code came from looking at other sinks (Avro and HDFS),
> so I
> >> >> am
> >> >> pretty sure its correct.
> >> >>
> >> >> Can anyone see anything that might be a problem, or is there anything
> >> >> else
> >> >> I can do to avoid this error?
> >> >>
> >> >> Thanks,
> >> >> Andrew
> >> >>
> >> >>
> >> >
> >>
> >>
> >>
> >> --
> >> Apache MRUnit - Unit testing MapReduce -
> >> http://incubator.apache.org/mrunit/
> >
> >
>
>
>
> --
> Apache MRUnit - Unit testing MapReduce -
> http://incubator.apache.org/mrunit/
>

Re: Custom sink - "close() called when transaction is OPEN" error

Posted by Brock Noland <br...@cloudera.com>.
Would you be able to send the source of your sink via pastbin in
addition to your config?

On Fri, Nov 16, 2012 at 9:21 AM, Andrew Jones <an...@gmail.com> wrote:
> I tried logging the first throwable, but now that is just the
> IllegalStateException.
>
> Today I have been looking at Flume-1.3.0rc3 and I have noticed the same
> problem. This is using the Avro source, File channel and our custom sink.
> After Flume reloads its config, the first error message comes when the Avro
> source starts up:
>
> 16 Nov 2012 16:04:25,237 INFO  [lifecycleSupervisor-1-4]
> (org.apache.flume.source.AvroSource.start:142)  - Starting Avro source
> source: { bindAddress: 0.0.0.0, port: 36060 }...
> 16 Nov 2012 16:04:25,484 ERROR
> [SinkRunner-PollingRunner-DefaultSinkProcessor]
> (org.apache.flume.SinkRunner$PollingRunner.run:160)  - Unable to deliver
> event. Exception follows.
> java.lang.IllegalStateException: close() called when transaction is OPEN -
> you must either commit or rollback first
>         at
> com.google.common.base.Preconditions.checkState(Preconditions.java:176)
>         at
> org.apache.flume.channel.BasicTransactionSemantics.close(BasicTransactionSemantics.java:179)
>         at
> com.arm.pd.brodie.flume.sink.ResultSink.processSingle(ResultSink.java:440)
>         at
> com.arm.pd.brodie.flume.sink.ResultSink.process(ResultSink.java:172)
>         at
> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>         at
> org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>         at java.lang.Thread.run(Thread.java:636)
> 16 Nov 2012 16:04:25,594 INFO  [lifecycleSupervisor-1-4]
> (org.apache.flume.instrumentation.MonitoredCounterGroup.register:89)  -
> Monitoried counter group for type: SOURCE, name: source, registered
> successfully.
> 16 Nov 2012 16:04:25,595 INFO  [lifecycleSupervisor-1-4]
> (org.apache.flume.instrumentation.MonitoredCounterGroup.start:73)  -
> Component type: SOURCE, name: source started
> 16 Nov 2012 16:04:25,595 INFO  [lifecycleSupervisor-1-4]
> (org.apache.flume.source.AvroSource.start:168)  - Avro source source
> started.
>
> I then continually get errors from the Sink, presumably as its been called
> periodically to check for events in the channel. So is it possible its the
> Avro source causing the issue?
>
> There should have been nothing persisted in the file channel when
> restarting.
>
> When the transaction gets messed up like this, is there a way to refresh it,
> preferably without losing any data?
>
> I am still able to send things to flume and they get processed and inserted
> by my sink, so it still seems to work OK.
>
> Thanks,
> Andrew
>
>
>
> On 15 November 2012 12:50, Brock Noland <br...@cloudera.com> wrote:
>>
>> Can you log the Throwable as the first thing in the catch block to see
>> if something and what it is, is being thrown?
>>
>> Transactions are thread local so if for some reason the the sequencing
>> gets messed up on an earlier call the process, every call to
>> transaction will thrown an exception including begin.
>>
>>
>> https://github.com/apache/flume/blob/trunk/flume-ng-core/src/main/java/org/apache/flume/channel/BasicTransactionSemantics.java
>>
>> As I stated in FLUME-1089 I think that when close is called it should
>> forcefully destroy the transaction like JDBC close() but I have not
>> got much agreement.
>>
>>
>> On Thu, Nov 15, 2012 at 5:24 AM, Andrew Jones <an...@gmail.com>
>> wrote:
>> > We are using Flume 1.2.0. We have a custom source, although it passes
>> > through an Avro Sink and Source before getting to the sink. We are now
>> > using
>> > the memory channel, although had just switched from the JDBC channel
>> > when we
>> > started seeing these errors, so maybe that's something to do with it?
>> >
>> > I tried wrapping transaction.rollback(); in a try catch and logging in
>> > the
>> > catch, but it wasn't called, so I don't think the rollback is throwing
>> > an
>> > error.
>> >
>> > I think it may have something to do with switching channels, as right
>> > after
>> > Flume reloaded the config we started getting errors. I have restarted
>> > the
>> > flume node manually and we are still getting the error.
>> >
>> > Thanks,
>> > Andrew
>> >
>> >
>> > On 14 November 2012 20:02, Hari Shreedharan <hs...@cloudera.com>
>> > wrote:
>> >>
>> >> Which version of Flume are you using? It looks like the transaction was
>> >> never rolled back or committed. It is likely that the rollback method
>> >> too
>> >> threw some exception, and the rollback was not successful. Also, what
>> >> channel are you using?
>> >>
>> >>
>> >> Thanks,
>> >> Hari
>> >>
>> >> --
>> >> Hari Shreedharan
>> >>
>> >> On Wednesday, November 14, 2012 at 8:55 AM, Andrew Jones wrote:
>> >>
>> >> Hi,
>> >>
>> >> I have a custom sink which has been working fine, but recently I have
>> >> started seeing this error in the logs:
>> >>
>> >> Unable to deliver event. Exception follows.
>> >> java.lang.IllegalStateException: close() called when transaction is
>> >> OPEN -
>> >> you must either commit or rollback first
>> >>         at
>> >> com.google.common.base.Preconditions.checkState(Preconditions.java:176)
>> >> ...
>> >>
>> >>
>> >> After having a google and finding
>> >> https://issues.apache.org/jira/browse/FLUME-1089, I have double checked
>> >> I am
>> >> using the correct try, catch, finally idiom that other sinks use, and I
>> >> seem
>> >> to be doing the same. I do the following:
>> >>
>> >> public Status process() throws EventDeliveryException {
>> >> Status status = Status.READY;
>> >>
>> >> Channel channel = getChannel();
>> >> Transaction transaction = channel.getTransaction();
>> >>
>> >> try {
>> >> transaction.begin();
>> >>
>> >>                         // does a bit of processing and
>> >>                         // writes out the event to MongoDB
>> >>
>> >>                         transaction.commit();
>> >>
>> >> } catch (Throwable t) {
>> >> transaction.rollback();
>> >>
>> >> if (t instanceof Error) {
>> >> throw (Error) t;
>> >> } else if  (t instanceof EventDeliveryException) {
>> >> throw (EventDeliveryException) t;
>> >> } else if (t instanceof ChannelException) {
>> >> logger.error("Brodie Log Sink " + getName() + ": Unable to get event
>> >> from"
>> >> +
>> >> " channel " + channel.getName() + ". Exception follows.", t);
>> >> status = Status.BACKOFF;
>> >> } else {
>> >> throw new EventDeliveryException("Failed to send events", t);
>> >> }
>> >> } finally {
>> >> transaction.close();
>> >> }
>> >>
>> >> return status;
>> >> }
>> >>
>> >> }
>> >>
>> >> All of this code came from looking at other sinks (Avro and HDFS), so I
>> >> am
>> >> pretty sure its correct.
>> >>
>> >> Can anyone see anything that might be a problem, or is there anything
>> >> else
>> >> I can do to avoid this error?
>> >>
>> >> Thanks,
>> >> Andrew
>> >>
>> >>
>> >
>>
>>
>>
>> --
>> Apache MRUnit - Unit testing MapReduce -
>> http://incubator.apache.org/mrunit/
>
>



-- 
Apache MRUnit - Unit testing MapReduce - http://incubator.apache.org/mrunit/

Re: Custom sink - "close() called when transaction is OPEN" error

Posted by Andrew Jones <an...@gmail.com>.
I tried logging the first throwable, but now that is just the
IllegalStateException.

Today I have been looking at Flume-1.3.0rc3 and I have noticed the same
problem. This is using the Avro source, File channel and our custom sink.
After Flume reloads its config, the first error message comes when the Avro
source starts up:

16 Nov 2012 16:04:25,237 INFO  [lifecycleSupervisor-1-4]
(org.apache.flume.source.AvroSource.start:142)  - Starting Avro source
source: { bindAddress: 0.0.0.0, port: 36060 }...
16 Nov 2012 16:04:25,484 ERROR
[SinkRunner-PollingRunner-DefaultSinkProcessor]
(org.apache.flume.SinkRunner$PollingRunner.run:160)  - Unable to deliver
event. Exception follows.
java.lang.IllegalStateException: close() called when transaction is OPEN -
you must either commit or rollback first
        at
com.google.common.base.Preconditions.checkState(Preconditions.java:176)
        at
org.apache.flume.channel.BasicTransactionSemantics.close(BasicTransactionSemantics.java:179)
        at
com.arm.pd.brodie.flume.sink.ResultSink.processSingle(ResultSink.java:440)
        at
com.arm.pd.brodie.flume.sink.ResultSink.process(ResultSink.java:172)
        at
org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
        at
org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
        at java.lang.Thread.run(Thread.java:636)
16 Nov 2012 16:04:25,594 INFO  [lifecycleSupervisor-1-4]
(org.apache.flume.instrumentation.MonitoredCounterGroup.register:89)  -
Monitoried counter group for type: SOURCE, name: source, registered
successfully.
16 Nov 2012 16:04:25,595 INFO  [lifecycleSupervisor-1-4]
(org.apache.flume.instrumentation.MonitoredCounterGroup.start:73)  -
Component type: SOURCE, name: source started
16 Nov 2012 16:04:25,595 INFO  [lifecycleSupervisor-1-4]
(org.apache.flume.source.AvroSource.start:168)  - Avro source source
started.

I then continually get errors from the Sink, presumably as its been called
periodically to check for events in the channel. So is it possible its the
Avro source causing the issue?

There should have been nothing persisted in the file channel when
restarting.

When the transaction gets messed up like this, is there a way to refresh
it, preferably without losing any data?

I am still able to send things to flume and they get processed and inserted
by my sink, so it still seems to work OK.

Thanks,
Andrew



On 15 November 2012 12:50, Brock Noland <br...@cloudera.com> wrote:

> Can you log the Throwable as the first thing in the catch block to see
> if something and what it is, is being thrown?
>
> Transactions are thread local so if for some reason the the sequencing
> gets messed up on an earlier call the process, every call to
> transaction will thrown an exception including begin.
>
>
> https://github.com/apache/flume/blob/trunk/flume-ng-core/src/main/java/org/apache/flume/channel/BasicTransactionSemantics.java
>
> As I stated in FLUME-1089 I think that when close is called it should
> forcefully destroy the transaction like JDBC close() but I have not
> got much agreement.
>
>
> On Thu, Nov 15, 2012 at 5:24 AM, Andrew Jones <an...@gmail.com>
> wrote:
> > We are using Flume 1.2.0. We have a custom source, although it passes
> > through an Avro Sink and Source before getting to the sink. We are now
> using
> > the memory channel, although had just switched from the JDBC channel
> when we
> > started seeing these errors, so maybe that's something to do with it?
> >
> > I tried wrapping transaction.rollback(); in a try catch and logging in
> the
> > catch, but it wasn't called, so I don't think the rollback is throwing an
> > error.
> >
> > I think it may have something to do with switching channels, as right
> after
> > Flume reloaded the config we started getting errors. I have restarted the
> > flume node manually and we are still getting the error.
> >
> > Thanks,
> > Andrew
> >
> >
> > On 14 November 2012 20:02, Hari Shreedharan <hs...@cloudera.com>
> > wrote:
> >>
> >> Which version of Flume are you using? It looks like the transaction was
> >> never rolled back or committed. It is likely that the rollback method
> too
> >> threw some exception, and the rollback was not successful. Also, what
> >> channel are you using?
> >>
> >>
> >> Thanks,
> >> Hari
> >>
> >> --
> >> Hari Shreedharan
> >>
> >> On Wednesday, November 14, 2012 at 8:55 AM, Andrew Jones wrote:
> >>
> >> Hi,
> >>
> >> I have a custom sink which has been working fine, but recently I have
> >> started seeing this error in the logs:
> >>
> >> Unable to deliver event. Exception follows.
> >> java.lang.IllegalStateException: close() called when transaction is
> OPEN -
> >> you must either commit or rollback first
> >>         at
> >> com.google.common.base.Preconditions.checkState(Preconditions.java:176)
> >> ...
> >>
> >>
> >> After having a google and finding
> >> https://issues.apache.org/jira/browse/FLUME-1089, I have double
> checked I am
> >> using the correct try, catch, finally idiom that other sinks use, and I
> seem
> >> to be doing the same. I do the following:
> >>
> >> public Status process() throws EventDeliveryException {
> >> Status status = Status.READY;
> >>
> >> Channel channel = getChannel();
> >> Transaction transaction = channel.getTransaction();
> >>
> >> try {
> >> transaction.begin();
> >>
> >>                         // does a bit of processing and
> >>                         // writes out the event to MongoDB
> >>
> >>                         transaction.commit();
> >>
> >> } catch (Throwable t) {
> >> transaction.rollback();
> >>
> >> if (t instanceof Error) {
> >> throw (Error) t;
> >> } else if  (t instanceof EventDeliveryException) {
> >> throw (EventDeliveryException) t;
> >> } else if (t instanceof ChannelException) {
> >> logger.error("Brodie Log Sink " + getName() + ": Unable to get event
> from"
> >> +
> >> " channel " + channel.getName() + ". Exception follows.", t);
> >> status = Status.BACKOFF;
> >> } else {
> >> throw new EventDeliveryException("Failed to send events", t);
> >> }
> >> } finally {
> >> transaction.close();
> >> }
> >>
> >> return status;
> >> }
> >>
> >> }
> >>
> >> All of this code came from looking at other sinks (Avro and HDFS), so I
> am
> >> pretty sure its correct.
> >>
> >> Can anyone see anything that might be a problem, or is there anything
> else
> >> I can do to avoid this error?
> >>
> >> Thanks,
> >> Andrew
> >>
> >>
> >
>
>
>
> --
> Apache MRUnit - Unit testing MapReduce -
> http://incubator.apache.org/mrunit/
>

Re: Custom sink - "close() called when transaction is OPEN" error

Posted by Brock Noland <br...@cloudera.com>.
Can you log the Throwable as the first thing in the catch block to see
if something and what it is, is being thrown?

Transactions are thread local so if for some reason the the sequencing
gets messed up on an earlier call the process, every call to
transaction will thrown an exception including begin.

https://github.com/apache/flume/blob/trunk/flume-ng-core/src/main/java/org/apache/flume/channel/BasicTransactionSemantics.java

As I stated in FLUME-1089 I think that when close is called it should
forcefully destroy the transaction like JDBC close() but I have not
got much agreement.


On Thu, Nov 15, 2012 at 5:24 AM, Andrew Jones <an...@gmail.com> wrote:
> We are using Flume 1.2.0. We have a custom source, although it passes
> through an Avro Sink and Source before getting to the sink. We are now using
> the memory channel, although had just switched from the JDBC channel when we
> started seeing these errors, so maybe that's something to do with it?
>
> I tried wrapping transaction.rollback(); in a try catch and logging in the
> catch, but it wasn't called, so I don't think the rollback is throwing an
> error.
>
> I think it may have something to do with switching channels, as right after
> Flume reloaded the config we started getting errors. I have restarted the
> flume node manually and we are still getting the error.
>
> Thanks,
> Andrew
>
>
> On 14 November 2012 20:02, Hari Shreedharan <hs...@cloudera.com>
> wrote:
>>
>> Which version of Flume are you using? It looks like the transaction was
>> never rolled back or committed. It is likely that the rollback method too
>> threw some exception, and the rollback was not successful. Also, what
>> channel are you using?
>>
>>
>> Thanks,
>> Hari
>>
>> --
>> Hari Shreedharan
>>
>> On Wednesday, November 14, 2012 at 8:55 AM, Andrew Jones wrote:
>>
>> Hi,
>>
>> I have a custom sink which has been working fine, but recently I have
>> started seeing this error in the logs:
>>
>> Unable to deliver event. Exception follows.
>> java.lang.IllegalStateException: close() called when transaction is OPEN -
>> you must either commit or rollback first
>>         at
>> com.google.common.base.Preconditions.checkState(Preconditions.java:176)
>> ...
>>
>>
>> After having a google and finding
>> https://issues.apache.org/jira/browse/FLUME-1089, I have double checked I am
>> using the correct try, catch, finally idiom that other sinks use, and I seem
>> to be doing the same. I do the following:
>>
>> public Status process() throws EventDeliveryException {
>> Status status = Status.READY;
>>
>> Channel channel = getChannel();
>> Transaction transaction = channel.getTransaction();
>>
>> try {
>> transaction.begin();
>>
>>                         // does a bit of processing and
>>                         // writes out the event to MongoDB
>>
>>                         transaction.commit();
>>
>> } catch (Throwable t) {
>> transaction.rollback();
>>
>> if (t instanceof Error) {
>> throw (Error) t;
>> } else if  (t instanceof EventDeliveryException) {
>> throw (EventDeliveryException) t;
>> } else if (t instanceof ChannelException) {
>> logger.error("Brodie Log Sink " + getName() + ": Unable to get event from"
>> +
>> " channel " + channel.getName() + ". Exception follows.", t);
>> status = Status.BACKOFF;
>> } else {
>> throw new EventDeliveryException("Failed to send events", t);
>> }
>> } finally {
>> transaction.close();
>> }
>>
>> return status;
>> }
>>
>> }
>>
>> All of this code came from looking at other sinks (Avro and HDFS), so I am
>> pretty sure its correct.
>>
>> Can anyone see anything that might be a problem, or is there anything else
>> I can do to avoid this error?
>>
>> Thanks,
>> Andrew
>>
>>
>



-- 
Apache MRUnit - Unit testing MapReduce - http://incubator.apache.org/mrunit/

Re: Custom sink - "close() called when transaction is OPEN" error

Posted by Andrew Jones <an...@gmail.com>.
We are using Flume 1.2.0. We have a custom source, although it passes
through an Avro Sink and Source before getting to the sink. We are now
using the memory channel, although had just switched from the JDBC channel
when we started seeing these errors, so maybe that's something to do with
it?

I tried wrapping transaction.rollback(); in a try catch and logging in the
catch, but it wasn't called, so I don't think the rollback is throwing an
error.

I think it may have something to do with switching channels, as right after
Flume reloaded the config we started getting errors. I have restarted the
flume node manually and we are still getting the error.

Thanks,
Andrew


On 14 November 2012 20:02, Hari Shreedharan <hs...@cloudera.com>wrote:

> Which version of Flume are you using? It looks like the transaction was
> never rolled back or committed. It is likely that the rollback method
> too threw some exception, and the rollback was not successful. Also, what
> channel are you using?
>
>
> Thanks,
> Hari
>
> --
> Hari Shreedharan
>
> On Wednesday, November 14, 2012 at 8:55 AM, Andrew Jones wrote:
>
> Hi,
>
> I have a custom sink which has been working fine, but recently I have
> started seeing this error in the logs:
>
> Unable to deliver event. Exception follows.
> java.lang.IllegalStateException: close() called when transaction is OPEN -
> you must either commit or rollback first
>         at
> com.google.common.base.Preconditions.checkState(Preconditions.java:176)
> ...
>
>
> After having a google and finding
> https://issues.apache.org/jira/browse/FLUME-1089, I have double checked I
> am using the correct try, catch, finally idiom that other sinks use, and I
> seem to be doing the same. I do the following:
>
> public Status process() throws EventDeliveryException {
> Status status = Status.READY;
>
> Channel channel = getChannel();
>  Transaction transaction = channel.getTransaction();
>
> try {
> transaction.begin();
>
>                         // does a bit of processing and
>                         // writes out the event to MongoDB
>
>                         transaction.commit();
>
> } catch (Throwable t) {
> transaction.rollback();
>
> if (t instanceof Error) {
>  throw (Error) t;
> } else if  (t instanceof EventDeliveryException) {
> throw (EventDeliveryException) t;
>  } else if (t instanceof ChannelException) {
> logger.error("Brodie Log Sink " + getName() + ": Unable to get event from"
> +
>  " channel " + channel.getName() + ". Exception follows.", t);
> status = Status.BACKOFF;
>  } else {
> throw new EventDeliveryException("Failed to send events", t);
> }
>  } finally {
> transaction.close();
> }
>
> return status;
> }
>
> }
>
> All of this code came from looking at other sinks (Avro and HDFS), so I am
> pretty sure its correct.
>
> Can anyone see anything that might be a problem, or is there anything else
> I can do to avoid this error?
>
> Thanks,
> Andrew
>
>
>

Re: Custom sink - "close() called when transaction is OPEN" error

Posted by Hari Shreedharan <hs...@cloudera.com>.
Which version of Flume are you using? It looks like the transaction was never rolled back or committed. It is likely that the rollback method too threw some exception, and the rollback was not successful. Also, what channel are you using? 


Thanks,
Hari

-- 
Hari Shreedharan


On Wednesday, November 14, 2012 at 8:55 AM, Andrew Jones wrote:

> Hi,
> 
> I have a custom sink which has been working fine, but recently I have started seeing this error in the logs:
> 
> Unable to deliver event. Exception follows.
> java.lang.IllegalStateException: close() called when transaction is OPEN - you must either commit or rollback first
>         at com.google.common.base.Preconditions.checkState(Preconditions.java:176)
> 
> ...
> 
> 
> After having a google and finding https://issues.apache.org/jira/browse/FLUME-1089, I have double checked I am using the correct try, catch, finally idiom that other sinks use, and I seem to be doing the same. I do the following: 
> 
> public Status process() throws EventDeliveryException {
> Status status = Status.READY;
> 
> Channel channel = getChannel(); 
> Transaction transaction = channel.getTransaction();
> 
> try {
> transaction.begin();
> 
> 
>                         // does a bit of processing and
>                         // writes out the event to MongoDB
> 
>                         transaction.commit(); 
> 
> } catch (Throwable t) {
> transaction.rollback();
> 
> if (t instanceof Error) { 
> throw (Error) t;
> } else if  (t instanceof EventDeliveryException) {
> throw (EventDeliveryException) t;
> } else if (t instanceof ChannelException) {
> logger.error("Brodie Log Sink " + getName() + ": Unable to get event from" +
> " channel " + channel.getName() + ". Exception follows.", t);
> status = Status.BACKOFF;
> } else {
> throw new EventDeliveryException("Failed to send events", t);
> }
> } finally {
> transaction.close();
> }
> 
> 
> return status;
> }
> 
> }
> 
> All of this code came from looking at other sinks (Avro and HDFS), so I am pretty sure its correct. 
> 
> Can anyone see anything that might be a problem, or is there anything else I can do to avoid this error?
> 
> Thanks,
> Andrew
> 
> 



Re: Custom sink - "close() called when transaction is OPEN" error

Posted by Roshan Naik <ro...@hortonworks.com>.
I am curious... which channel and source are you using ?
btw.. dev may be the better mailing list for this Q.
-roshan


On Wed, Nov 14, 2012 at 8:55 AM, Andrew Jones <an...@gmail.com>wrote:

> Hi,
>
> I have a custom sink which has been working fine, but recently I have
> started seeing this error in the logs:
>
> Unable to deliver event. Exception follows.
> java.lang.IllegalStateException: close() called when transaction is OPEN -
> you must either commit or rollback first
>         at
> com.google.common.base.Preconditions.checkState(Preconditions.java:176)
> ...
>
>
> After having a google and finding
> https://issues.apache.org/jira/browse/FLUME-1089, I have double checked I
> am using the correct try, catch, finally idiom that other sinks use, and I
> seem to be doing the same. I do the following:
>
> public Status process() throws EventDeliveryException {
> Status status = Status.READY;
>
> Channel channel = getChannel();
>  Transaction transaction = channel.getTransaction();
>
> try {
> transaction.begin();
>
>                         // does a bit of processing and
>                         // writes out the event to MongoDB
>
>                         transaction.commit();
>
> } catch (Throwable t) {
> transaction.rollback();
>
> if (t instanceof Error) {
>  throw (Error) t;
> } else if  (t instanceof EventDeliveryException) {
> throw (EventDeliveryException) t;
>  } else if (t instanceof ChannelException) {
> logger.error("Brodie Log Sink " + getName() + ": Unable to get event from"
> +
>  " channel " + channel.getName() + ". Exception follows.", t);
> status = Status.BACKOFF;
>  } else {
> throw new EventDeliveryException("Failed to send events", t);
> }
>  } finally {
> transaction.close();
> }
>
> return status;
> }
>
> }
>
> All of this code came from looking at other sinks (Avro and HDFS), so I am
> pretty sure its correct.
>
> Can anyone see anything that might be a problem, or is there anything else
> I can do to avoid this error?
>
> Thanks,
> Andrew
>