You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flume.apache.org by beargan <be...@hotmail.com> on 2015/11/22 08:33:19 UTC

(Unknown)

Sink

public class MySink extends AbstractSink implements Configurable {
  private String myProp;

  @Override
  public void configure(Context context) {
    String myProp = context.getString("myProp", "defaultValue");

    // Process the myProp value (e.g. validation)

    // Store myProp for later retrieval by process() method
    this.myProp = myProp;
  }

  @Override
  public void start() {
    // Initialize the connection to the external repository (e.g. HDFS) that
    // this Sink will forward Events to ..
  }

  @Override
  public void stop () {
    // Disconnect from the external respository and do any
    // additional cleanup (e.g. releasing resources or nulling-out
    // field values) ..
  }

  @Override
  public Status process() throws EventDeliveryException {
    Status status = null;

    // Start transaction
    Channel ch = getChannel();
    Transaction txn = ch.getTransaction();
    txn.begin();
    try {
      // This try clause includes whatever Channel operations you want to do

      Event event = ch.take();

      // Send the Event to the external repository.
      // storeSomeData(e);

      txn.commit();
      status = Status.READY;
    } catch (Throwable t) {
      txn.rollback();

      // Log exception, handle individual exceptions as needed

      status = Status.BACKOFF;

      // re-throw all Errors
      if (t instanceof Error) {
        throw (Error)t;
      }
    }
    return status;
  }
}

Compile and execute the following error:

2015-11-22 15:30:58,773 (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR - org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:160)] Unable to deliver event. Exception follows.
java.lang.IllegalStateException: begin() called when transaction is COMPLETED!
        at com.google.common.base.Preconditions.checkState(Preconditions.java:145)
        at org.apache.flume.channel.BasicTransactionSemantics.begin(BasicTransactionSemantics.java:131)
        at Toturial.Flume.MySink.process(MySink.java:54)
        at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
        at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
        at java.lang.Thread.run(Unknown Source)

Re:

Posted by Hari Shreedharan <hs...@cloudera.com>.
I think there is a bug in the documentation that does not close the
transaction in the sink. You should look at any one of the sinks in the
code base and follow that in your own sink

On Sunday, November 22, 2015, beargan <be...@hotmail.com> wrote:

> Sink
>
>
>
> *public* *class* *MySink* *extends* AbstractSink *implements* Configurable *{*
>
>   *private* String myProp*;*
>
>
>
>   @Override
>
>   *public* *void* *configure**(*Context context*)* *{*
>
>     String myProp *=* context*.*getString*(*"myProp"*,* "defaultValue"*);*
>
>
>
>     *// Process the myProp value (e.g. validation)*
>
>
>
>     *// Store myProp for later retrieval by process() method*
>
>     *this**.*myProp *=* myProp*;*
>
>   *}*
>
>
>
>   @Override
>
>   *public* *void* *start**()* *{*
>
>     *// Initialize the connection to the external repository (e.g. HDFS) that*
>
>     *// this Sink will forward Events to ..*
>
>   *}*
>
>
>
>   @Override
>
>   *public* *void* *stop* *()* *{*
>
>     *// Disconnect from the external respository and do any*
>
>     *// additional cleanup (e.g. releasing resources or nulling-out*
>
>     *// field values) ..*
>
>   *}*
>
>
>
>   @Override
>
>   *public* Status *process**()* *throws* EventDeliveryException *{*
>
>     Status status *=* *null**;*
>
>
>
>     *// Start transaction*
>
>     Channel ch *=* getChannel*();*
>
>     Transaction txn *=* ch*.*getTransaction*();*
>
>     txn*.*begin*();*
>
>     *try* *{*
>
>       *// This try clause includes whatever Channel operations you want to do*
>
>
>
>       Event event *=* ch*.*take*();*
>
>
>
>       *// Send the Event to the external repository.*
>
>       *// storeSomeData(e);*
>
>
>
>       txn*.*commit*();*
>
>       status *=* Status*.*READY*;*
>
>     *}* *catch* *(*Throwable t*)* *{*
>
>       txn*.*rollback*();*
>
>
>
>       *// Log exception, handle individual exceptions as needed*
>
>
>
>       status *=* Status*.*BACKOFF*;*
>
>
>
>       *// re-throw all Errors*
>
>       *if* *(*t *instanceof* Error*)* *{*
>
>         *throw* *(*Error*)*t*;*
>
>       *}*
>
>     *}*
>
>     *return* status*;*
>
>   *}*
>
> *}*
>
>
>
> *Compile and execute the following error:*
>
>
>
> 2015-11-22 15:30:58,773 (SinkRunner-PollingRunner-DefaultSinkProcessor)
> [ERROR -
> org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:160)] Unable
> to deliver event. Exception follows.
>
> java.lang.IllegalStateException: begin() called when transaction is
> COMPLETED!
>
>         at
> com.google.common.base.Preconditions.checkState(Preconditions.java:145)
>
>         at
> org.apache.flume.channel.BasicTransactionSemantics.begin(BasicTransactionSemantics.java:131)
>
>         at Toturial.Flume.MySink.process(MySink.java:54)
>
>         at
> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>
>         at
> org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>
>         at java.lang.Thread.run(Unknown Source)
>


-- 

Thanks,
Hari