You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flume.apache.org by beargan <be...@hotmail.com> on 2015/11/22 08:33:19 UTC
(Unknown)
Sink
public class MySink extends AbstractSink implements Configurable {
private String myProp;
@Override
public void configure(Context context) {
String myProp = context.getString("myProp", "defaultValue");
// Process the myProp value (e.g. validation)
// Store myProp for later retrieval by process() method
this.myProp = myProp;
}
@Override
public void start() {
// Initialize the connection to the external repository (e.g. HDFS) that
// this Sink will forward Events to ..
}
@Override
public void stop () {
// Disconnect from the external respository and do any
// additional cleanup (e.g. releasing resources or nulling-out
// field values) ..
}
@Override
public Status process() throws EventDeliveryException {
Status status = null;
// Start transaction
Channel ch = getChannel();
Transaction txn = ch.getTransaction();
txn.begin();
try {
// This try clause includes whatever Channel operations you want to do
Event event = ch.take();
// Send the Event to the external repository.
// storeSomeData(e);
txn.commit();
status = Status.READY;
} catch (Throwable t) {
txn.rollback();
// Log exception, handle individual exceptions as needed
status = Status.BACKOFF;
// re-throw all Errors
if (t instanceof Error) {
throw (Error)t;
}
}
return status;
}
}
Compile and execute the following error:
2015-11-22 15:30:58,773 (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR - org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:160)] Unable to deliver event. Exception follows.
java.lang.IllegalStateException: begin() called when transaction is COMPLETED!
at com.google.common.base.Preconditions.checkState(Preconditions.java:145)
at org.apache.flume.channel.BasicTransactionSemantics.begin(BasicTransactionSemantics.java:131)
at Toturial.Flume.MySink.process(MySink.java:54)
at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
at java.lang.Thread.run(Unknown Source)
Re:
Posted by Hari Shreedharan <hs...@cloudera.com>.
I think there is a bug in the documentation that does not close the
transaction in the sink. You should look at any one of the sinks in the
code base and follow that in your own sink
On Sunday, November 22, 2015, beargan <be...@hotmail.com> wrote:
> Sink
>
>
>
> *public* *class* *MySink* *extends* AbstractSink *implements* Configurable *{*
>
> *private* String myProp*;*
>
>
>
> @Override
>
> *public* *void* *configure**(*Context context*)* *{*
>
> String myProp *=* context*.*getString*(*"myProp"*,* "defaultValue"*);*
>
>
>
> *// Process the myProp value (e.g. validation)*
>
>
>
> *// Store myProp for later retrieval by process() method*
>
> *this**.*myProp *=* myProp*;*
>
> *}*
>
>
>
> @Override
>
> *public* *void* *start**()* *{*
>
> *// Initialize the connection to the external repository (e.g. HDFS) that*
>
> *// this Sink will forward Events to ..*
>
> *}*
>
>
>
> @Override
>
> *public* *void* *stop* *()* *{*
>
> *// Disconnect from the external respository and do any*
>
> *// additional cleanup (e.g. releasing resources or nulling-out*
>
> *// field values) ..*
>
> *}*
>
>
>
> @Override
>
> *public* Status *process**()* *throws* EventDeliveryException *{*
>
> Status status *=* *null**;*
>
>
>
> *// Start transaction*
>
> Channel ch *=* getChannel*();*
>
> Transaction txn *=* ch*.*getTransaction*();*
>
> txn*.*begin*();*
>
> *try* *{*
>
> *// This try clause includes whatever Channel operations you want to do*
>
>
>
> Event event *=* ch*.*take*();*
>
>
>
> *// Send the Event to the external repository.*
>
> *// storeSomeData(e);*
>
>
>
> txn*.*commit*();*
>
> status *=* Status*.*READY*;*
>
> *}* *catch* *(*Throwable t*)* *{*
>
> txn*.*rollback*();*
>
>
>
> *// Log exception, handle individual exceptions as needed*
>
>
>
> status *=* Status*.*BACKOFF*;*
>
>
>
> *// re-throw all Errors*
>
> *if* *(*t *instanceof* Error*)* *{*
>
> *throw* *(*Error*)*t*;*
>
> *}*
>
> *}*
>
> *return* status*;*
>
> *}*
>
> *}*
>
>
>
> *Compile and execute the following error:*
>
>
>
> 2015-11-22 15:30:58,773 (SinkRunner-PollingRunner-DefaultSinkProcessor)
> [ERROR -
> org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:160)] Unable
> to deliver event. Exception follows.
>
> java.lang.IllegalStateException: begin() called when transaction is
> COMPLETED!
>
> at
> com.google.common.base.Preconditions.checkState(Preconditions.java:145)
>
> at
> org.apache.flume.channel.BasicTransactionSemantics.begin(BasicTransactionSemantics.java:131)
>
> at Toturial.Flume.MySink.process(MySink.java:54)
>
> at
> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>
> at
> org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>
> at java.lang.Thread.run(Unknown Source)
>
--
Thanks,
Hari