You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flume.apache.org by beargan <be...@hotmail.com> on 2015/11/23 02:48:41 UTC

Re: Custom sink example MySink bug in documentation

Yes,I Think so. The problem is not the close() method. The code is as follows:

public class MySink extends AbstractSink implements Configurable {
  private String myProp;

  @Override
  public void configure(Context context) {
    String myProp = context.getString("myProp", "defaultValue");

    // Process the myProp value (e.g. validation)

    // Store myProp for later retrieval by process() method
    this.myProp = myProp;
  }

  @Override
  public void start() {
    // Initialize the connection to the external repository (e.g. HDFS) that
    // this Sink will forward Events to ..
  }

  @Override
  public void stop () {
    // Disconnect from the external respository and do any
    // additional cleanup (e.g. releasing resources or nulling-out
    // field values) ..
  }

  @Override
  public Status process() throws EventDeliveryException {
    Status status = null;

    // Start transaction
    Channel ch = getChannel();
    Transaction txn = ch.getTransaction();
    txn.begin();
    try {
      // This try clause includes whatever Channel operations you want to do

      Event event = ch.take();

      // Send the Event to the external repository.
      // storeSomeData(e);

      txn.commit();
      status = Status.READY;
    } catch (Throwable t) {
      txn.rollback();

      // Log exception, handle individual exceptions as needed

      status = Status.BACKOFF;

      // re-throw all Errors
      if (t instanceof Error) {
        throw (Error)t;
      }
  }
  //you must add this line of code in order to close the Transaction.
  txn.close();
    return status;
  }
}









I think there is a bug in the documentation that does not close the transaction in the sink. You should look at any one of the sinks in the code base and follow that in your own sink

On Sunday, November 22, 2015, beargan <be...@hotmail.com> wrote:
Sink
 
public class MySink extends AbstractSink implements Configurable {
  private String myProp;
 
  @Override
  public void configure(Context context) {
    String myProp = context.getString("myProp", "defaultValue");
 
    // Process the myProp value (e.g. validation)
 
    // Store myProp for later retrieval by process() method
    this.myProp = myProp;
  }
 
  @Override
  public void start() {
    // Initialize the connection to the external repository (e.g. HDFS) that
    // this Sink will forward Events to ..
  }
 
  @Override
  public void stop () {
    // Disconnect from the external respository and do any
    // additional cleanup (e.g. releasing resources or nulling-out
    // field values) ..
  }
 
  @Override
  public Status process() throws EventDeliveryException {
    Status status = null;
 
    // Start transaction
    Channel ch = getChannel();
    Transaction txn = ch.getTransaction();
    txn.begin();
    try {
      // This try clause includes whatever Channel operations you want to do
 
      Event event = ch.take();
 
      // Send the Event to the external repository.
      // storeSomeData(e);
 
      txn.commit();
      status = Status.READY;
    } catch (Throwable t) {
      txn.rollback();
 
      // Log exception, handle individual exceptions as needed
 
      status = Status.BACKOFF;
 
      // re-throw all Errors
      if (t instanceof Error) {
        throw (Error)t;
      }
    }
    return status;
  }
}
 
Compile and execute the following error:
 
2015-11-22 15:30:58,773 (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR - org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:160)] Unable to deliver event. Exception follows.
java.lang.IllegalStateException: begin() called when transaction is COMPLETED!
        at com.google.common.base.Preconditions.checkState(Preconditions.java:145)
        at org.apache.flume.channel.BasicTransactionSemantics.begin(BasicTransactionSemantics.java:131)
        at Toturial.Flume.MySink.process(MySink.java:54)
        at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
        at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
        at java.lang.Thread.run(Unknown Source)


-- 

Thanks,
Hari