You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flume.apache.org by JP <jp...@gmail.com> on 2012/08/03 05:26:24 UTC

Custom Serializer is not working

HI,

Im getting errros

2012-08-02 16:58:50,065 INFO source.AvroSource: Avro source seqGenSrc
started.
2012-08-02 16:59:02,463 INFO ipc.NettyServer: [id: 0x45cbda0a, /localhost=>
/ localhost 41414 <http://10.105.39.202:41414>] OPEN
2012-08-02 16:59:02,466 INFO ipc.NettyServer: [id: 0x45cbda0a, /localhost=>
/ localhost :41414 <http://10.105.39.202:41414>] BOUND: / localhost
:41414<http://10.105.39.202:41414>
2012-08-02 16:59:02,466 INFO ipc.NettyServer: [id: 0x45cbda0a, /localhost
:3770 <http://10.77.235.245:3770> => / localhost
:41414<http://10.105.39.202:41414>]
CONNECTED: / localhost :3770 <http://10.77.235.245:3770>
2012-08-02 16:59:04,006 INFO hdfs.BucketWriter: Creating hdfs://
localhost :8020/data/cssplogs/FlumeData.1343906943264.tmp
2012-08-02 16:59:04,167 ERROR serialization.EventSerializerFactory: Unable
to instantiate Builder from
org.apache.flume.serialization.CustomLogAvroEventSerializer
2012-08-02 16:59:04,168 WARN hdfs.HDFSEventSink: HDFS IO error
java.io.IOException: java.lang.NullPointerException
        at
org.apache.flume.sink.hdfs.BucketWriter.doOpen(BucketWriter.java:202)
        at
org.apache.flume.sink.hdfs.BucketWriter.access$000(BucketWriter.java:48)
        at
org.apache.flume.sink.hdfs.BucketWriter$1.run(BucketWriter.java:155)
        at
org.apache.flume.sink.hdfs.BucketWriter$1.run(BucketWriter.java:152)
        at
org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:125)
        at
org.apache.flume.sink.hdfs.BucketWriter.open(BucketWriter.java:152)
        at
org.apache.flume.sink.hdfs.BucketWriter.append(BucketWriter.java:307)
        at
org.apache.flume.sink.hdfs.HDFSEventSink$1.call(HDFSEventSink.java:717)
        at
org.apache.flume.sink.hdfs.HDFSEventSink$1.call(HDFSEventSink.java:714)
        at
java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
        at java.util.concurrent.FutureTask.run(FutureTask.java:138)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
        at java.lang.Thread.run(Thread.java:662)
Caused by: java.lang.NullPointerException
        at
org.apache.flume.sink.hdfs.HDFSDataStream.open(HDFSDataStream.java:75)
        at
org.apache.flume.sink.hdfs.BucketWriter.doOpen(BucketWriter.java:188)
        ... 13 more
2012-08-02 16:59:05,239 INFO hdfs.BucketWriter: Creating hdfs://
localhost :8020/data/cssplogs/FlumeData.1343906943265.tmp
2012-08-02 16:59:05,392 ERROR serialization.EventSerializerFactory: Unable
to instantiate Builder from
org.apache.flume.serialization.CustomLogAvroEventSerializer
2012-08-02 16:59:05,392 WARN hdfs.HDFSEventSink: HDFS IO error
java.io.IOException: java.lang.NullPointerException
        at
org.apache.flume.sink.hdfs.BucketWriter.doOpen(BucketWriter.java:202)
        at
org.apache.flume.sink.hdfs.BucketWriter.access$000(BucketWriter.java:48)
        at
org.apache.flume.sink.hdfs.BucketWriter$1.run(BucketWriter.java:155)
        at
org.apache.flume.sink.hdfs.BucketWriter$1.run(BucketWriter.java:152)
        at
org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:125)
        at
org.apache.flume.sink.hdfs.BucketWriter.open(BucketWriter.java:152)
        at
org.apache.flume.sink.hdfs.BucketWriter.append(BucketWriter.java:307)
        at
org.apache.flume.sink.hdfs.HDFSEventSink$1.call(HDFSEventSink.java:717)
        at
org.apache.flume.sink.hdfs.HDFSEventSink$1.call(HDFSEventSink.java:714)
        at
java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
        at java.util.concurrent.FutureTask.run(FutureTask.java:138)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
        at java.lang.Thread.run(Thread.java:662)

-----------------------------------------------------------------------------------------------------------------


This is my avro file

{ "type": "record", "name": "LogEvent", "namespace":
"org.apache.flume.serialization",
  "fields": [
    { "name": "srno",  "type": "int" },
    { "name": "severity",  "type": "int" },
    { "name": "timestamp", "type": "long" },
    { "name": "hostname",  "type": "string" },
    { "name": "message",   "type": "string" }
  ]
}

------------------------------------------------------------------------------------------------

This is the LogEvent created using maven-avro and little customized

@SuppressWarnings("all")
public class LogEvent extends SpecificRecordBase implements SpecificRecord {
  public static final Schema _SCHEMA =
Schema.parse("{\"type\":\"record\",\"name\":\"LogEvent\",\"namespace\":\"org.apache.flume.serialization\",\"fields\":[{\"name\":\"srno\",\"type\":\"int\"},{\"name\":\"severity\",\"type\":\"int\"},{\"name\":\"timestamp\",\"type\":\"long\"},{\"name\":\"hostname\",\"type\":\"string\"},{\"name\":\"message\",\"type\":\"string\"}]}");
  public int srno;
  public String severity;
  public long timestamp;
  public String hostname;
  public String message;

  public Schema getSchema() { return _SCHEMA; }
  public Object get(int _field) {
    switch (_field) {
    case 0: return srno;
    case 1: return severity;
    case 2: return timestamp;
    case 3: return hostname;
    case 4: return message;
    default: throw new AvroRuntimeException("Bad index");
    }
  }

  @SuppressWarnings(value="unchecked")
  public void set(int _field, Object _value) {
    switch (_field) {
    case 0: srno = (Integer)_value; break;
    case 1: severity = (String)_value; break;
    case 2: timestamp = (Long)_value; break;
    case 3: hostname = (String)_value; break;
    case 4: message = (String)_value; break;
    default: throw new AvroRuntimeException("Bad index");
    }
  }

    public void setSrno(int srno) {
        this.srno = srno;
    }
    public void setSeverity(String s) {
        severity = s;
    }
    public String getSeverity() {
        return severity;
    }

    public void setTimestamp(long t) {
        timestamp = t;
    }

    public long getTimestamp() {
        return timestamp;
    }

    public void setHostname(String h) {
        hostname = h;
    }

    public String getHostname() {
        return hostname;
    }

    public void setMessage(String m) {
        message = m;
    }

    public String getMessage() {
        return message;
    }

@Override
public void put(int field, Object value) {
    // TODO Auto-generated method stub

}
}
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------
agent2.sources = seqGenSrc
agent2.channels = memoryChannel
agent2.sinks = loggerSink


agent2.sources.seqGenSrc.type = avro
agent2.sources.seqGenSrc.bind=slcso-poc2-lnx
agent2.sources.seqGenSrc.port=41414

#agent2.sources.seqGenSrc.interceptors = time hostInterceptor
#agent2.sources.seqGenSrc.interceptors.hostInterceptor.type =
org.apache.flume.interceptor.HostInterceptor$Builder
#agent2.sources.seqGenSrc.interceptors.hostInterceptor.hostHeader = host
#agent2.sources.seqGenSrc.interceptors.hostInterceptor.useIP = false
#agent2.sources.seqGenSrc.interceptors.hostInterceptor.hostHeader.preserveExisting
= false
#agent2.sources.seqGenSrc.interceptors.time.type =
org.apache.flume.interceptor.TimestampInterceptor$Builder

agent2.channels.memoryChannel.type = memory
agent2.channels.memoryChannel.capacity = 1000000
agent2.channels.memoryChannel.transactionCapacity = 1000000
agent2.channels.memoryChannel.keep-alive = 30

agent2.sources.seqGenSrc.channels = memoryChannel

agent2.sinks.loggerSink.type = hdfs
#agent2.sinks.loggerSink.hdfs.path = hdfs://10.105.39.204:8020/data/CspcLogs
agent2.sinks.loggerSink.hdfs.path = hdfs://slcso-poc4-lnx:8020/data/cssplogs
agent2.sinks.loggerSink.hdfs.fileType = DataStream
#agent2.sinks.loggerSink.hdfs.writeFormat = Text

agent2.sinks.loggerSink.channel = memoryChannel
#agent2.sinks.loggerSink.serializer =
org.apache.flume.serialization.BodyTextEventSerializer
#agent2.sinks.loggerSink.serializer = avro_event
agent2.sinks.loggerSink.serializer =
org.apache.flume.serialization.CustomLogAvroEventSerializer
agent2.sinks.loggerSink.serializer.compressionCodec = snappy
#agent2.sinks.loggerSink.serializer.syncIntervalBytes = 2048000
agent2.channels.memoryChannel.type = memory
~


-----------------------------------------------------------------------------------------------------------------
The following is my class

public class CustomLogAvroEventSerializer extends
        AbstractAvroEventSerializer<LogEvent> {

      private static final DateTimeFormatter dateFmt1 =
          DateTimeFormat.forPattern("MMM dd HH:mm:ss").withZoneUTC();

      private static final DateTimeFormatter dateFmt2 =
          DateTimeFormat.forPattern("MMM  d HH:mm:ss").withZoneUTC();


      private static final Logger logger =
          LoggerFactory.getLogger(CustomLogAvroEventSerializer.class);

       private final OutputStream out;
      private final Schema schema;

      public CustomLogAvroEventSerializer(OutputStream out) throws
IOException {
        this.out = out;
        this.schema =new LogEvent().getSchema();;
      }

      @Override
      protected OutputStream getOutputStream() {
        return out;
      }

      @Override
      protected Schema getSchema() {
        return schema;
      }

      // very simple rfc3164 parser
      @Override
      protected LogEvent convert(Event event) {
          LogEvent sle = new LogEvent();

        // Stringify body so it's easy to parse.
        // This is a pretty inefficient way to do it.
        String msg = new String(event.getBody(), Charsets.UTF_8);

        // parser read pointer
        int seek = 0;

        // Check Flume headers to see if we came from SyslogTcp(or UDP)
Source,
        // which at the time of this writing only parses the priority.
        // This is a bit schizophrenic and it should parse all the fields
or none.
        Map<String, String> headers = event.getHeaders();
        boolean fromSyslogSource = false;
        if (headers.containsKey(SyslogUtils.SYSLOG_SRNO)) {
          fromSyslogSource = true;
          int srno = Integer.parseInt(headers.get("srno"));
          sle.setSrno(srno);
        }else{
            sle.setSrno(121);
        }
        if (headers.containsKey(SyslogUtils.SYSLOG_SEVERITY)) {
          fromSyslogSource = true;
          String severity = headers.get(SyslogUtils.SYSLOG_SEVERITY);
          sle.setSeverity(severity);
        }

        // assume the message was received raw (maybe via NetcatSource)
        // parse the priority string
        if (!fromSyslogSource) {
          if (msg.charAt(0) == '<') {
            int end = msg.indexOf(">");
            if (end > -1) {
              seek = end + 1;
              String priStr = msg.substring(1, end);
             // int priority = Integer.parseInt(priStr);
             // String severity = priStr;

              sle.setSeverity(priStr);
            }
          }
        }

        // parse the timestamp
        String timestampStr = msg.substring(seek, seek + 15);
        long ts = parseRfc3164Date(timestampStr);
        if (ts != 0) {
          sle.setTimestamp(ts);
          seek += 15 + 1; // space after timestamp
        }

        // parse the hostname
        int nextSpace = msg.indexOf(' ', seek);
        if (nextSpace > -1) {
          String hostname = msg.substring(seek, nextSpace);
          sle.setHostname(hostname);
          seek = nextSpace + 1;
        }

        // everything else is the message
        String actualMessage = msg.substring(seek);
        sle.setMessage(actualMessage);

        logger.debug("Serialized event as: {}", sle);

        return sle;
      }

      private static long parseRfc3164Date(String in) {
            DateTime date = null;
            try {
              date = dateFmt1.parseDateTime(in);
            } catch (IllegalArgumentException e) {
              // ignore the exception, we act based on nullity of date
object
              logger.debug("Date parse failed on ({}), trying single-digit
date", in);
            }

            if (date == null) {
              try {
                date = dateFmt2.parseDateTime(in);
              } catch (IllegalArgumentException e) {
                // ignore the exception, we act based on nullity of date
object
                logger.debug("2nd date parse failed on ({}), unknown date
format", in);
              }
            }

            // hacky stuff to try and deal with boundary cases, i.e. new
year's eve.
            // rfc3164 dates are really dumb.
            // NB: cannot handle replaying of old logs or going back to the
future
            if (date != null) {
              DateTime now = new DateTime();
              int year = now.getYear();
              DateTime corrected = date.withYear(year);

              // flume clock is ahead or there is some latency, and the
year rolled
              if (corrected.isAfter(now) &&
corrected.minusMonths(1).isAfter(now)) {
                corrected = date.withYear(year - 1);
              // flume clock is behind and the year rolled
              } else if (corrected.isBefore(now) &&
corrected.plusMonths(1).isBefore(now)) {
                corrected = date.withYear(year + 1);
              }
              date = corrected;
            }

            if (date == null) {
              return 0;
            }

            return date.getMillis();
          }

      public static class Builder implements EventSerializer.Builder {

            @Override
            public EventSerializer build(Context context, OutputStream out)
{
                CustomLogAvroEventSerializer writer = null;
              try {
                writer = new CustomLogAvroEventSerializer(out);
                writer.configure(context);
              } catch (IOException e) {
                logger.error("Unable to parse schema file. Exception
follows.", e);
              }
              return writer;
            }

    }



}

Please suggest me i need output like this and i want to customize like log4j
-------------------------------------------------------------------------------------------------------------------------------------
172  [main] FATAL com.cisco.flume.FlumeTest  - Sample fatal message
188  [main] INFO  com.cisco.flume.FlumeTest  - Sample info message1
203  [main] WARN  com.cisco.flume.FlumeTest  - Sample warn message
219  [main] INFO  com.cisco.flume.FlumeTest  - Sample info message2
219  [main] INFO  com.cisco.flume.FlumeTest  - Sample info message3
266  [main] ERROR com.cisco.flume.FlumeTest  - Sample error message
282  [main] FATAL com.cisco.flume.FlumeTest  - Sample fatal message
282  [main] INFO  com.cisco.flume.FlumeTest  - Sample info message4


-- 
JP



-- 
JP

Re: Custom Serializer is not working

Posted by JP <jp...@gmail.com>.
Thanks Mubarak it is working fine after adding

agent2.sinks.loggerSink.
serializer =
org.apache.flume.serialization.CustomLogAvroEventSerializer$Builder


On Fri, Aug 3, 2012 at 9:23 AM, Mubarak Seyed <se...@apple.com> wrote:

> can you please try with
>
> agent2.sinks.loggerSink.serializer =
>> org.apache.flume.serialization.CustomLogAvroEventSerializer$Builder
>>
>>
> On Aug 2, 2012, at 8:39 PM, JP wrote:
>
> Thanks Mubarak,
>
> I have added the jar to lib .
>
> After that only im getting the exception.
>
> Any help to resolve this issue.
>
> On Fri, Aug 3, 2012 at 9:05 AM, Mubarak Seyed <se...@apple.com> wrote:
>
>> Do you have a custom jar which contains
>> org.apache.flume.serialization.CustomLogAvroEventSerializer in flume
>> classpath? You can copy the custom jar file to <FLUME_HOME>/libdirectory.
>>
>> ERROR serialization.EventSerializerFactory: Unable to instantiate Builder
>> from org.apache.flume.serialization.CustomLogAvroEventSerializer
>>
>> On Aug 2, 2012, at 8:26 PM, JP wrote:
>>
>> HI,
>>
>> Im getting errros
>>
>> 2012-08-02 16:58:50,065 INFO source.AvroSource: Avro source seqGenSrc
>> started.
>> 2012-08-02 16:59:02,463 INFO ipc.NettyServer: [id: 0x45cbda0a,
>> /localhost=> / localhost 41414 <http://10.105.39.202:41414/>] OPEN
>> 2012-08-02 16:59:02,466 INFO ipc.NettyServer: [id: 0x45cbda0a,
>> /localhost=> / localhost :41414 <http://10.105.39.202:41414/>] BOUND: /
>> localhost :41414 <http://10.105.39.202:41414/>
>> 2012-08-02 16:59:02,466 INFO ipc.NettyServer: [id: 0x45cbda0a, /localhost
>> :3770 <http://10.77.235.245:3770/> => / localhost :41414<http://10.105.39.202:41414/>]
>> CONNECTED: / localhost :3770 <http://10.77.235.245:3770/>
>> 2012-08-02 16:59:04,006 INFO hdfs.BucketWriter: Creating hdfs://
>> localhost :8020/data/cssplogs/FlumeData.1343906943264.tmp
>> 2012-08-02 16:59:04,167 ERROR serialization.EventSerializerFactory:
>> Unable to instantiate Builder from
>> org.apache.flume.serialization.CustomLogAvroEventSerializer
>> 2012-08-02 16:59:04,168 WARN hdfs.HDFSEventSink: HDFS IO error
>> java.io.IOException: java.lang.NullPointerException
>>         at
>> org.apache.flume.sink.hdfs.BucketWriter.doOpen(BucketWriter.java:202)
>>         at
>> org.apache.flume.sink.hdfs.BucketWriter.access$000(BucketWriter.java:48)
>>         at
>> org.apache.flume.sink.hdfs.BucketWriter$1.run(BucketWriter.java:155)
>>         at
>> org.apache.flume.sink.hdfs.BucketWriter$1.run(BucketWriter.java:152)
>>         at
>> org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:125)
>>         at
>> org.apache.flume.sink.hdfs.BucketWriter.open(BucketWriter.java:152)
>>         at
>> org.apache.flume.sink.hdfs.BucketWriter.append(BucketWriter.java:307)
>>         at
>> org.apache.flume.sink.hdfs.HDFSEventSink$1.call(HDFSEventSink.java:717)
>>         at
>> org.apache.flume.sink.hdfs.HDFSEventSink$1.call(HDFSEventSink.java:714)
>>         at
>> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
>>         at java.util.concurrent.FutureTask.run(FutureTask.java:138)
>>         at
>> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
>>         at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
>>         at java.lang.Thread.run(Thread.java:662)
>> Caused by: java.lang.NullPointerException
>>         at
>> org.apache.flume.sink.hdfs.HDFSDataStream.open(HDFSDataStream.java:75)
>>         at
>> org.apache.flume.sink.hdfs.BucketWriter.doOpen(BucketWriter.java:188)
>>         ... 13 more
>> 2012-08-02 16:59:05,239 INFO hdfs.BucketWriter: Creating hdfs://
>> localhost :8020/data/cssplogs/FlumeData.1343906943265.tmp
>> 2012-08-02 16:59:05,392 ERROR serialization.EventSerializerFactory:
>> Unable to instantiate Builder from
>> org.apache.flume.serialization.CustomLogAvroEventSerializer
>> 2012-08-02 16:59:05,392 WARN hdfs.HDFSEventSink: HDFS IO error
>> java.io.IOException: java.lang.NullPointerException
>>         at
>> org.apache.flume.sink.hdfs.BucketWriter.doOpen(BucketWriter.java:202)
>>         at
>> org.apache.flume.sink.hdfs.BucketWriter.access$000(BucketWriter.java:48)
>>         at
>> org.apache.flume.sink.hdfs.BucketWriter$1.run(BucketWriter.java:155)
>>         at
>> org.apache.flume.sink.hdfs.BucketWriter$1.run(BucketWriter.java:152)
>>         at
>> org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:125)
>>         at
>> org.apache.flume.sink.hdfs.BucketWriter.open(BucketWriter.java:152)
>>         at
>> org.apache.flume.sink.hdfs.BucketWriter.append(BucketWriter.java:307)
>>         at
>> org.apache.flume.sink.hdfs.HDFSEventSink$1.call(HDFSEventSink.java:717)
>>         at
>> org.apache.flume.sink.hdfs.HDFSEventSink$1.call(HDFSEventSink.java:714)
>>         at
>> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
>>         at java.util.concurrent.FutureTask.run(FutureTask.java:138)
>>         at
>> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
>>         at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
>>         at java.lang.Thread.run(Thread.java:662)
>>
>>
>> -----------------------------------------------------------------------------------------------------------------
>>
>>
>> This is my avro file
>>
>> { "type": "record", "name": "LogEvent", "namespace":
>> "org.apache.flume.serialization",
>>   "fields": [
>>     { "name": "srno",  "type": "int" },
>>     { "name": "severity",  "type": "int" },
>>     { "name": "timestamp", "type": "long" },
>>     { "name": "hostname",  "type": "string" },
>>     { "name": "message",   "type": "string" }
>>   ]
>> }
>>
>>
>> ------------------------------------------------------------------------------------------------
>>
>> This is the LogEvent created using maven-avro and little customized
>>
>> @SuppressWarnings("all")
>> public class LogEvent extends SpecificRecordBase implements
>> SpecificRecord {
>>   public static final Schema _SCHEMA =
>> Schema.parse("{\"type\":\"record\",\"name\":\"LogEvent\",\"namespace\":\"org.apache.flume.serialization\",\"fields\":[{\"name\":\"srno\",\"type\":\"int\"},{\"name\":\"severity\",\"type\":\"int\"},{\"name\":\"timestamp\",\"type\":\"long\"},{\"name\":\"hostname\",\"type\":\"string\"},{\"name\":\"message\",\"type\":\"string\"}]}");
>>   public int srno;
>>   public String severity;
>>   public long timestamp;
>>   public String hostname;
>>   public String message;
>>
>>   public Schema getSchema() { return _SCHEMA; }
>>   public Object get(int _field) {
>>     switch (_field) {
>>     case 0: return srno;
>>     case 1: return severity;
>>     case 2: return timestamp;
>>     case 3: return hostname;
>>     case 4: return message;
>>     default: throw new AvroRuntimeException("Bad index");
>>     }
>>   }
>>
>>   @SuppressWarnings(value="unchecked")
>>   public void set(int _field, Object _value) {
>>     switch (_field) {
>>     case 0: srno = (Integer)_value; break;
>>     case 1: severity = (String)_value; break;
>>     case 2: timestamp = (Long)_value; break;
>>     case 3: hostname = (String)_value; break;
>>     case 4: message = (String)_value; break;
>>     default: throw new AvroRuntimeException("Bad index");
>>     }
>>   }
>>
>>     public void setSrno(int srno) {
>>         this.srno = srno;
>>     }
>>     public void setSeverity(String s) {
>>         severity = s;
>>     }
>>     public String getSeverity() {
>>         return severity;
>>     }
>>
>>     public void setTimestamp(long t) {
>>         timestamp = t;
>>     }
>>
>>     public long getTimestamp() {
>>         return timestamp;
>>     }
>>
>>     public void setHostname(String h) {
>>         hostname = h;
>>     }
>>
>>     public String getHostname() {
>>         return hostname;
>>     }
>>
>>     public void setMessage(String m) {
>>         message = m;
>>     }
>>
>>     public String getMessage() {
>>         return message;
>>     }
>>
>> @Override
>> public void put(int field, Object value) {
>>     // TODO Auto-generated method stub
>>
>> }
>> }
>>
>> -----------------------------------------------------------------------------------------------------------------------------------------------------------------------
>> agent2.sources = seqGenSrc
>> agent2.channels = memoryChannel
>> agent2.sinks = loggerSink
>>
>>
>> agent2.sources.seqGenSrc.type = avro
>> agent2.sources.seqGenSrc.bind=slcso-poc2-lnx
>> agent2.sources.seqGenSrc.port=41414
>>
>> #agent2.sources.seqGenSrc.interceptors = time hostInterceptor
>> #agent2.sources.seqGenSrc.interceptors.hostInterceptor.type =
>> org.apache.flume.interceptor.HostInterceptor$Builder
>> #agent2.sources.seqGenSrc.interceptors.hostInterceptor.hostHeader = host
>> #agent2.sources.seqGenSrc.interceptors.hostInterceptor.useIP = false
>> #agent2.sources.seqGenSrc.interceptors.hostInterceptor.hostHeader.preserveExisting
>> = false
>> #agent2.sources.seqGenSrc.interceptors.time.type =
>> org.apache.flume.interceptor.TimestampInterceptor$Builder
>>
>> agent2.channels.memoryChannel.type = memory
>> agent2.channels.memoryChannel.capacity = 1000000
>> agent2.channels.memoryChannel.transactionCapacity = 1000000
>> agent2.channels.memoryChannel.keep-alive = 30
>>
>> agent2.sources.seqGenSrc.channels = memoryChannel
>>
>> agent2.sinks.loggerSink.type = hdfs
>> #agent2.sinks.loggerSink.hdfs.path = hdfs://
>> 10.105.39.204:8020/data/CspcLogs
>> agent2.sinks.loggerSink.hdfs.path =
>> hdfs://slcso-poc4-lnx:8020/data/cssplogs
>> agent2.sinks.loggerSink.hdfs.fileType = DataStream
>> #agent2.sinks.loggerSink.hdfs.writeFormat = Text
>>
>> agent2.sinks.loggerSink.channel = memoryChannel
>> #agent2.sinks.loggerSink.serializer =
>> org.apache.flume.serialization.BodyTextEventSerializer
>> #agent2.sinks.loggerSink.serializer = avro_event
>> agent2.sinks.loggerSink.serializer =
>> org.apache.flume.serialization.CustomLogAvroEventSerializer
>> agent2.sinks.loggerSink.serializer.compressionCodec = snappy
>> #agent2.sinks.loggerSink.serializer.syncIntervalBytes = 2048000
>> agent2.channels.memoryChannel.type = memory
>> ~
>>
>>
>>
>> -----------------------------------------------------------------------------------------------------------------
>> The following is my class
>>
>> public class CustomLogAvroEventSerializer extends
>>         AbstractAvroEventSerializer<LogEvent> {
>>
>>       private static final DateTimeFormatter dateFmt1 =
>>           DateTimeFormat.forPattern("MMM dd HH:mm:ss").withZoneUTC();
>>
>>       private static final DateTimeFormatter dateFmt2 =
>>           DateTimeFormat.forPattern("MMM  d HH:mm:ss").withZoneUTC();
>>
>>
>>       private static final Logger logger =
>>           LoggerFactory.getLogger(CustomLogAvroEventSerializer.class);
>>
>>        private final OutputStream out;
>>       private final Schema schema;
>>
>>       public CustomLogAvroEventSerializer(OutputStream out) throws
>> IOException {
>>         this.out = out;
>>         this.schema =new LogEvent().getSchema();;
>>       }
>>
>>       @Override
>>       protected OutputStream getOutputStream() {
>>         return out;
>>       }
>>
>>       @Override
>>       protected Schema getSchema() {
>>         return schema;
>>       }
>>
>>       // very simple rfc3164 parser
>>       @Override
>>       protected LogEvent convert(Event event) {
>>           LogEvent sle = new LogEvent();
>>
>>         // Stringify body so it's easy to parse.
>>         // This is a pretty inefficient way to do it.
>>         String msg = new String(event.getBody(), Charsets.UTF_8);
>>
>>         // parser read pointer
>>         int seek = 0;
>>
>>         // Check Flume headers to see if we came from SyslogTcp(or UDP)
>> Source,
>>         // which at the time of this writing only parses the priority.
>>         // This is a bit schizophrenic and it should parse all the fields
>> or none.
>>         Map<String, String> headers = event.getHeaders();
>>         boolean fromSyslogSource = false;
>>         if (headers.containsKey(SyslogUtils.SYSLOG_SRNO)) {
>>           fromSyslogSource = true;
>>           int srno = Integer.parseInt(headers.get("srno"));
>>           sle.setSrno(srno);
>>         }else{
>>             sle.setSrno(121);
>>         }
>>         if (headers.containsKey(SyslogUtils.SYSLOG_SEVERITY)) {
>>           fromSyslogSource = true;
>>           String severity = headers.get(SyslogUtils.SYSLOG_SEVERITY);
>>           sle.setSeverity(severity);
>>         }
>>
>>         // assume the message was received raw (maybe via NetcatSource)
>>         // parse the priority string
>>         if (!fromSyslogSource) {
>>           if (msg.charAt(0) == '<') {
>>             int end = msg.indexOf(">");
>>             if (end > -1) {
>>               seek = end + 1;
>>               String priStr = msg.substring(1, end);
>>              // int priority = Integer.parseInt(priStr);
>>              // String severity = priStr;
>>
>>               sle.setSeverity(priStr);
>>             }
>>           }
>>         }
>>
>>         // parse the timestamp
>>         String timestampStr = msg.substring(seek, seek + 15);
>>         long ts = parseRfc3164Date(timestampStr);
>>         if (ts != 0) {
>>           sle.setTimestamp(ts);
>>           seek += 15 + 1; // space after timestamp
>>         }
>>
>>         // parse the hostname
>>         int nextSpace = msg.indexOf(' ', seek);
>>         if (nextSpace > -1) {
>>           String hostname = msg.substring(seek, nextSpace);
>>           sle.setHostname(hostname);
>>           seek = nextSpace + 1;
>>         }
>>
>>         // everything else is the message
>>         String actualMessage = msg.substring(seek);
>>         sle.setMessage(actualMessage);
>>
>>         logger.debug("Serialized event as: {}", sle);
>>
>>         return sle;
>>       }
>>
>>       private static long parseRfc3164Date(String in) {
>>             DateTime date = null;
>>             try {
>>               date = dateFmt1.parseDateTime(in);
>>             } catch (IllegalArgumentException e) {
>>               // ignore the exception, we act based on nullity of date
>> object
>>               logger.debug("Date parse failed on ({}), trying
>> single-digit date", in);
>>             }
>>
>>             if (date == null) {
>>               try {
>>                 date = dateFmt2.parseDateTime(in);
>>               } catch (IllegalArgumentException e) {
>>                 // ignore the exception, we act based on nullity of date
>> object
>>                 logger.debug("2nd date parse failed on ({}), unknown date
>> format", in);
>>               }
>>             }
>>
>>             // hacky stuff to try and deal with boundary cases, i.e. new
>> year's eve.
>>             // rfc3164 dates are really dumb.
>>             // NB: cannot handle replaying of old logs or going back to
>> the future
>>             if (date != null) {
>>               DateTime now = new DateTime();
>>               int year = now.getYear();
>>               DateTime corrected = date.withYear(year);
>>
>>               // flume clock is ahead or there is some latency, and the
>> year rolled
>>               if (corrected.isAfter(now) &&
>> corrected.minusMonths(1).isAfter(now)) {
>>                 corrected = date.withYear(year - 1);
>>               // flume clock is behind and the year rolled
>>               } else if (corrected.isBefore(now) &&
>> corrected.plusMonths(1).isBefore(now)) {
>>                 corrected = date.withYear(year + 1);
>>               }
>>               date = corrected;
>>             }
>>
>>             if (date == null) {
>>               return 0;
>>             }
>>
>>             return date.getMillis();
>>           }
>>
>>       public static class Builder implements EventSerializer.Builder {
>>
>>             @Override
>>             public EventSerializer build(Context context, OutputStream
>> out) {
>>                 CustomLogAvroEventSerializer writer = null;
>>               try {
>>                 writer = new CustomLogAvroEventSerializer(out);
>>                 writer.configure(context);
>>               } catch (IOException e) {
>>                 logger.error("Unable to parse schema file. Exception
>> follows.", e);
>>               }
>>               return writer;
>>             }
>>
>>     }
>>
>>
>>
>> }
>>
>> Please suggest me i need output like this and i want to customize like
>> log4j
>>
>> -------------------------------------------------------------------------------------------------------------------------------------
>> 172  [main] FATAL com.cisco.flume.FlumeTest  - Sample fatal message
>> 188  [main] INFO  com.cisco.flume.FlumeTest  - Sample info message1
>> 203  [main] WARN  com.cisco.flume.FlumeTest  - Sample warn message
>> 219  [main] INFO  com.cisco.flume.FlumeTest  - Sample info message2
>> 219  [main] INFO  com.cisco.flume.FlumeTest  - Sample info message3
>> 266  [main] ERROR com.cisco.flume.FlumeTest  - Sample error message
>> 282  [main] FATAL com.cisco.flume.FlumeTest  - Sample fatal message
>> 282  [main] INFO  com.cisco.flume.FlumeTest  - Sample info message4
>>
>>
>> --
>> JP
>>
>>
>>
>> --
>> JP
>>
>>
>>
>
>
> --
> JP
>
>
>


-- 
JP

Re: Custom Serializer is not working

Posted by Mubarak Seyed <se...@apple.com>.
can you please try with

>> agent2.sinks.loggerSink.serializer = org.apache.flume.serialization.CustomLogAvroEventSerializer$Builder

On Aug 2, 2012, at 8:39 PM, JP wrote:

> Thanks Mubarak,
> 
> I have added the jar to lib .
> 
> After that only im getting the exception.
> 
> Any help to resolve this issue.
> 
> On Fri, Aug 3, 2012 at 9:05 AM, Mubarak Seyed <se...@apple.com> wrote:
> Do you have a custom jar which contains org.apache.flume.serialization.CustomLogAvroEventSerializer in flume classpath? You can copy the custom jar file to <FLUME_HOME>/lib directory.
> 
> ERROR serialization.EventSerializerFactory: Unable to instantiate Builder from org.apache.flume.serialization.CustomLogAvroEventSerializer
> 
> On Aug 2, 2012, at 8:26 PM, JP wrote:
> 
>> HI,
>> 
>> Im getting errros
>> 
>> 2012-08-02 16:58:50,065 INFO source.AvroSource: Avro source seqGenSrc started.
>> 2012-08-02 16:59:02,463 INFO ipc.NettyServer: [id: 0x45cbda0a, /localhost=> / localhost 41414] OPEN
>> 2012-08-02 16:59:02,466 INFO ipc.NettyServer: [id: 0x45cbda0a, /localhost=> / localhost :41414] BOUND: / localhost :41414
>> 2012-08-02 16:59:02,466 INFO ipc.NettyServer: [id: 0x45cbda0a, /localhost:3770 => / localhost :41414] CONNECTED: / localhost :3770
>> 2012-08-02 16:59:04,006 INFO hdfs.BucketWriter: Creating hdfs:// localhost :8020/data/cssplogs/FlumeData.1343906943264.tmp
>> 2012-08-02 16:59:04,167 ERROR serialization.EventSerializerFactory: Unable to instantiate Builder from org.apache.flume.serialization.CustomLogAvroEventSerializer
>> 2012-08-02 16:59:04,168 WARN hdfs.HDFSEventSink: HDFS IO error
>> java.io.IOException: java.lang.NullPointerException
>>         at org.apache.flume.sink.hdfs.BucketWriter.doOpen(BucketWriter.java:202)
>>         at org.apache.flume.sink.hdfs.BucketWriter.access$000(BucketWriter.java:48)
>>         at org.apache.flume.sink.hdfs.BucketWriter$1.run(BucketWriter.java:155)
>>         at org.apache.flume.sink.hdfs.BucketWriter$1.run(BucketWriter.java:152)
>>         at org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:125)
>>         at org.apache.flume.sink.hdfs.BucketWriter.open(BucketWriter.java:152)
>>         at org.apache.flume.sink.hdfs.BucketWriter.append(BucketWriter.java:307)
>>         at org.apache.flume.sink.hdfs.HDFSEventSink$1.call(HDFSEventSink.java:717)
>>         at org.apache.flume.sink.hdfs.HDFSEventSink$1.call(HDFSEventSink.java:714)
>>         at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
>>         at java.util.concurrent.FutureTask.run(FutureTask.java:138)
>>         at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
>>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
>>         at java.lang.Thread.run(Thread.java:662)
>> Caused by: java.lang.NullPointerException
>>         at org.apache.flume.sink.hdfs.HDFSDataStream.open(HDFSDataStream.java:75)
>>         at org.apache.flume.sink.hdfs.BucketWriter.doOpen(BucketWriter.java:188)
>>         ... 13 more
>> 2012-08-02 16:59:05,239 INFO hdfs.BucketWriter: Creating hdfs:// localhost :8020/data/cssplogs/FlumeData.1343906943265.tmp
>> 2012-08-02 16:59:05,392 ERROR serialization.EventSerializerFactory: Unable to instantiate Builder from org.apache.flume.serialization.CustomLogAvroEventSerializer
>> 2012-08-02 16:59:05,392 WARN hdfs.HDFSEventSink: HDFS IO error
>> java.io.IOException: java.lang.NullPointerException
>>         at org.apache.flume.sink.hdfs.BucketWriter.doOpen(BucketWriter.java:202)
>>         at org.apache.flume.sink.hdfs.BucketWriter.access$000(BucketWriter.java:48)
>>         at org.apache.flume.sink.hdfs.BucketWriter$1.run(BucketWriter.java:155)
>>         at org.apache.flume.sink.hdfs.BucketWriter$1.run(BucketWriter.java:152)
>>         at org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:125)
>>         at org.apache.flume.sink.hdfs.BucketWriter.open(BucketWriter.java:152)
>>         at org.apache.flume.sink.hdfs.BucketWriter.append(BucketWriter.java:307)
>>         at org.apache.flume.sink.hdfs.HDFSEventSink$1.call(HDFSEventSink.java:717)
>>         at org.apache.flume.sink.hdfs.HDFSEventSink$1.call(HDFSEventSink.java:714)
>>         at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
>>         at java.util.concurrent.FutureTask.run(FutureTask.java:138)
>>         at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
>>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
>>         at java.lang.Thread.run(Thread.java:662)
>> 
>> -----------------------------------------------------------------------------------------------------------------
>> 
>> 
>> This is my avro file
>> 
>> { "type": "record", "name": "LogEvent", "namespace": "org.apache.flume.serialization",
>>   "fields": [
>>     { "name": "srno",  "type": "int" },
>>     { "name": "severity",  "type": "int" },
>>     { "name": "timestamp", "type": "long" },
>>     { "name": "hostname",  "type": "string" },
>>     { "name": "message",   "type": "string" }
>>   ]
>> }
>> 
>> ------------------------------------------------------------------------------------------------
>> 
>> This is the LogEvent created using maven-avro and little customized
>> 
>> @SuppressWarnings("all")
>> public class LogEvent extends SpecificRecordBase implements SpecificRecord {
>>   public static final Schema _SCHEMA = Schema.parse("{\"type\":\"record\",\"name\":\"LogEvent\",\"namespace\":\"org.apache.flume.serialization\",\"fields\":[{\"name\":\"srno\",\"type\":\"int\"},{\"name\":\"severity\",\"type\":\"int\"},{\"name\":\"timestamp\",\"type\":\"long\"},{\"name\":\"hostname\",\"type\":\"string\"},{\"name\":\"message\",\"type\":\"string\"}]}");
>>   public int srno;
>>   public String severity;
>>   public long timestamp;
>>   public String hostname;
>>   public String message;
>>   
>>   public Schema getSchema() { return _SCHEMA; }
>>   public Object get(int _field) {
>>     switch (_field) {
>>     case 0: return srno;
>>     case 1: return severity;
>>     case 2: return timestamp;
>>     case 3: return hostname;
>>     case 4: return message;
>>     default: throw new AvroRuntimeException("Bad index");
>>     }
>>   }
>>   
>>   @SuppressWarnings(value="unchecked")
>>   public void set(int _field, Object _value) {
>>     switch (_field) {
>>     case 0: srno = (Integer)_value; break;
>>     case 1: severity = (String)_value; break;
>>     case 2: timestamp = (Long)_value; break;
>>     case 3: hostname = (String)_value; break;
>>     case 4: message = (String)_value; break;
>>     default: throw new AvroRuntimeException("Bad index");
>>     }
>>   }
>>   
>>     public void setSrno(int srno) {
>>         this.srno = srno;
>>     }
>>     public void setSeverity(String s) { 
>>         severity = s; 
>>     }
>>     public String getSeverity() { 
>>         return severity; 
>>     }
>> 
>>     public void setTimestamp(long t) {
>>         timestamp = t;
>>     }
>> 
>>     public long getTimestamp() {
>>         return timestamp;
>>     }
>> 
>>     public void setHostname(String h) {
>>         hostname = h;
>>     }
>> 
>>     public String getHostname() {
>>         return hostname;
>>     }
>> 
>>     public void setMessage(String m) {
>>         message = m;
>>     }
>> 
>>     public String getMessage() {
>>         return message;
>>     }
>> 
>> @Override
>> public void put(int field, Object value) {
>>     // TODO Auto-generated method stub
>>     
>> }
>> }
>> -----------------------------------------------------------------------------------------------------------------------------------------------------------------------
>> agent2.sources = seqGenSrc
>> agent2.channels = memoryChannel
>> agent2.sinks = loggerSink
>> 
>> 
>> agent2.sources.seqGenSrc.type = avro
>> agent2.sources.seqGenSrc.bind=slcso-poc2-lnx
>> agent2.sources.seqGenSrc.port=41414
>> 
>> #agent2.sources.seqGenSrc.interceptors = time hostInterceptor
>> #agent2.sources.seqGenSrc.interceptors.hostInterceptor.type = org.apache.flume.interceptor.HostInterceptor$Builder
>> #agent2.sources.seqGenSrc.interceptors.hostInterceptor.hostHeader = host
>> #agent2.sources.seqGenSrc.interceptors.hostInterceptor.useIP = false
>> #agent2.sources.seqGenSrc.interceptors.hostInterceptor.hostHeader.preserveExisting = false
>> #agent2.sources.seqGenSrc.interceptors.time.type = org.apache.flume.interceptor.TimestampInterceptor$Builder
>> 
>> agent2.channels.memoryChannel.type = memory
>> agent2.channels.memoryChannel.capacity = 1000000
>> agent2.channels.memoryChannel.transactionCapacity = 1000000
>> agent2.channels.memoryChannel.keep-alive = 30
>> 
>> agent2.sources.seqGenSrc.channels = memoryChannel
>> 
>> agent2.sinks.loggerSink.type = hdfs
>> #agent2.sinks.loggerSink.hdfs.path = hdfs://10.105.39.204:8020/data/CspcLogs
>> agent2.sinks.loggerSink.hdfs.path = hdfs://slcso-poc4-lnx:8020/data/cssplogs
>> agent2.sinks.loggerSink.hdfs.fileType = DataStream
>> #agent2.sinks.loggerSink.hdfs.writeFormat = Text
>> 
>> agent2.sinks.loggerSink.channel = memoryChannel
>> #agent2.sinks.loggerSink.serializer = org.apache.flume.serialization.BodyTextEventSerializer
>> #agent2.sinks.loggerSink.serializer = avro_event
>> agent2.sinks.loggerSink.serializer = org.apache.flume.serialization.CustomLogAvroEventSerializer
>> agent2.sinks.loggerSink.serializer.compressionCodec = snappy
>> #agent2.sinks.loggerSink.serializer.syncIntervalBytes = 2048000
>> agent2.channels.memoryChannel.type = memory
>> ~
>> 
>> 
>> -----------------------------------------------------------------------------------------------------------------
>> The following is my class
>> 
>> public class CustomLogAvroEventSerializer extends
>>         AbstractAvroEventSerializer<LogEvent> {
>>     
>>       private static final DateTimeFormatter dateFmt1 =
>>           DateTimeFormat.forPattern("MMM dd HH:mm:ss").withZoneUTC();
>>       
>>       private static final DateTimeFormatter dateFmt2 =
>>           DateTimeFormat.forPattern("MMM  d HH:mm:ss").withZoneUTC();
>> 
>>       
>>       private static final Logger logger =
>>           LoggerFactory.getLogger(CustomLogAvroEventSerializer.class);
>>       
>>        private final OutputStream out;
>>       private final Schema schema;
>> 
>>       public CustomLogAvroEventSerializer(OutputStream out) throws IOException {
>>         this.out = out;
>>         this.schema =new LogEvent().getSchema();;
>>       }
>> 
>>       @Override
>>       protected OutputStream getOutputStream() {
>>         return out;
>>       }
>> 
>>       @Override
>>       protected Schema getSchema() {
>>         return schema;
>>       }
>> 
>>       // very simple rfc3164 parser
>>       @Override
>>       protected LogEvent convert(Event event) {
>>           LogEvent sle = new LogEvent();
>> 
>>         // Stringify body so it's easy to parse.
>>         // This is a pretty inefficient way to do it.
>>         String msg = new String(event.getBody(), Charsets.UTF_8);
>> 
>>         // parser read pointer
>>         int seek = 0;
>> 
>>         // Check Flume headers to see if we came from SyslogTcp(or UDP) Source,
>>         // which at the time of this writing only parses the priority.
>>         // This is a bit schizophrenic and it should parse all the fields or none.
>>         Map<String, String> headers = event.getHeaders();
>>         boolean fromSyslogSource = false;
>>         if (headers.containsKey(SyslogUtils.SYSLOG_SRNO)) {
>>           fromSyslogSource = true;
>>           int srno = Integer.parseInt(headers.get("srno"));
>>           sle.setSrno(srno);
>>         }else{
>>             sle.setSrno(121);
>>         }
>>         if (headers.containsKey(SyslogUtils.SYSLOG_SEVERITY)) {
>>           fromSyslogSource = true;
>>           String severity = headers.get(SyslogUtils.SYSLOG_SEVERITY);
>>           sle.setSeverity(severity);
>>         }
>> 
>>         // assume the message was received raw (maybe via NetcatSource)
>>         // parse the priority string
>>         if (!fromSyslogSource) {
>>           if (msg.charAt(0) == '<') {
>>             int end = msg.indexOf(">");
>>             if (end > -1) {
>>               seek = end + 1;
>>               String priStr = msg.substring(1, end);
>>              // int priority = Integer.parseInt(priStr);
>>              // String severity = priStr;
>>           
>>               sle.setSeverity(priStr);
>>             }
>>           }
>>         }
>> 
>>         // parse the timestamp
>>         String timestampStr = msg.substring(seek, seek + 15);
>>         long ts = parseRfc3164Date(timestampStr);
>>         if (ts != 0) {
>>           sle.setTimestamp(ts);
>>           seek += 15 + 1; // space after timestamp
>>         }
>> 
>>         // parse the hostname
>>         int nextSpace = msg.indexOf(' ', seek);
>>         if (nextSpace > -1) {
>>           String hostname = msg.substring(seek, nextSpace);
>>           sle.setHostname(hostname);
>>           seek = nextSpace + 1;
>>         }
>> 
>>         // everything else is the message
>>         String actualMessage = msg.substring(seek);
>>         sle.setMessage(actualMessage);
>> 
>>         logger.debug("Serialized event as: {}", sle);
>> 
>>         return sle;
>>       }
>>       
>>       private static long parseRfc3164Date(String in) {
>>             DateTime date = null;
>>             try {
>>               date = dateFmt1.parseDateTime(in);
>>             } catch (IllegalArgumentException e) {
>>               // ignore the exception, we act based on nullity of date object
>>               logger.debug("Date parse failed on ({}), trying single-digit date", in);
>>             }
>> 
>>             if (date == null) {
>>               try {
>>                 date = dateFmt2.parseDateTime(in);
>>               } catch (IllegalArgumentException e) {
>>                 // ignore the exception, we act based on nullity of date object
>>                 logger.debug("2nd date parse failed on ({}), unknown date format", in);
>>               }
>>             }
>> 
>>             // hacky stuff to try and deal with boundary cases, i.e. new year's eve.
>>             // rfc3164 dates are really dumb.
>>             // NB: cannot handle replaying of old logs or going back to the future
>>             if (date != null) {
>>               DateTime now = new DateTime();
>>               int year = now.getYear();
>>               DateTime corrected = date.withYear(year);
>> 
>>               // flume clock is ahead or there is some latency, and the year rolled
>>               if (corrected.isAfter(now) && corrected.minusMonths(1).isAfter(now)) {
>>                 corrected = date.withYear(year - 1);
>>               // flume clock is behind and the year rolled
>>               } else if (corrected.isBefore(now) && corrected.plusMonths(1).isBefore(now)) {
>>                 corrected = date.withYear(year + 1);
>>               }
>>               date = corrected;
>>             }
>> 
>>             if (date == null) {
>>               return 0;
>>             }
>> 
>>             return date.getMillis();
>>           }
>>       
>>       public static class Builder implements EventSerializer.Builder {
>> 
>>             @Override
>>             public EventSerializer build(Context context, OutputStream out) {
>>                 CustomLogAvroEventSerializer writer = null;
>>               try {
>>                 writer = new CustomLogAvroEventSerializer(out);
>>                 writer.configure(context);
>>               } catch (IOException e) {
>>                 logger.error("Unable to parse schema file. Exception follows.", e);
>>               }
>>               return writer;
>>             }
>> 
>>     }
>>       
>> 
>> 
>> }
>> 
>> Please suggest me i need output like this and i want to customize like log4j
>> -------------------------------------------------------------------------------------------------------------------------------------
>> 172  [main] FATAL com.cisco.flume.FlumeTest  - Sample fatal message
>> 188  [main] INFO  com.cisco.flume.FlumeTest  - Sample info message1
>> 203  [main] WARN  com.cisco.flume.FlumeTest  - Sample warn message
>> 219  [main] INFO  com.cisco.flume.FlumeTest  - Sample info message2
>> 219  [main] INFO  com.cisco.flume.FlumeTest  - Sample info message3
>> 266  [main] ERROR com.cisco.flume.FlumeTest  - Sample error message
>> 282  [main] FATAL com.cisco.flume.FlumeTest  - Sample fatal message
>> 282  [main] INFO  com.cisco.flume.FlumeTest  - Sample info message4
>> 
>> 
>> -- 
>> JP
>> 
>> 
>> 
>> -- 
>> JP
> 
> 
> 
> 
> -- 
> JP


Re: Custom Serializer is not working

Posted by JP <jp...@gmail.com>.
Thanks Mubarak,

I have added the jar to lib .

After that only im getting the exception.

Any help to resolve this issue.

On Fri, Aug 3, 2012 at 9:05 AM, Mubarak Seyed <se...@apple.com> wrote:

> Do you have a custom jar which contains
> org.apache.flume.serialization.CustomLogAvroEventSerializer in flume
> classpath? You can copy the custom jar file to <FLUME_HOME>/lib directory.
>
> ERROR serialization.EventSerializerFactory: Unable to instantiate Builder
> from org.apache.flume.serialization.CustomLogAvroEventSerializer
>
> On Aug 2, 2012, at 8:26 PM, JP wrote:
>
> HI,
>
> Im getting errros
>
> 2012-08-02 16:58:50,065 INFO source.AvroSource: Avro source seqGenSrc
> started.
> 2012-08-02 16:59:02,463 INFO ipc.NettyServer: [id: 0x45cbda0a,
> /localhost=> / localhost 41414 <http://10.105.39.202:41414/>] OPEN
> 2012-08-02 16:59:02,466 INFO ipc.NettyServer: [id: 0x45cbda0a,
> /localhost=> / localhost :41414 <http://10.105.39.202:41414/>] BOUND: /
> localhost :41414 <http://10.105.39.202:41414/>
> 2012-08-02 16:59:02,466 INFO ipc.NettyServer: [id: 0x45cbda0a, /localhost
> :3770 <http://10.77.235.245:3770/> => / localhost :41414<http://10.105.39.202:41414/>]
> CONNECTED: / localhost :3770 <http://10.77.235.245:3770/>
> 2012-08-02 16:59:04,006 INFO hdfs.BucketWriter: Creating hdfs://
> localhost :8020/data/cssplogs/FlumeData.1343906943264.tmp
> 2012-08-02 16:59:04,167 ERROR serialization.EventSerializerFactory: Unable
> to instantiate Builder from
> org.apache.flume.serialization.CustomLogAvroEventSerializer
> 2012-08-02 16:59:04,168 WARN hdfs.HDFSEventSink: HDFS IO error
> java.io.IOException: java.lang.NullPointerException
>         at
> org.apache.flume.sink.hdfs.BucketWriter.doOpen(BucketWriter.java:202)
>         at
> org.apache.flume.sink.hdfs.BucketWriter.access$000(BucketWriter.java:48)
>         at
> org.apache.flume.sink.hdfs.BucketWriter$1.run(BucketWriter.java:155)
>         at
> org.apache.flume.sink.hdfs.BucketWriter$1.run(BucketWriter.java:152)
>         at
> org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:125)
>         at
> org.apache.flume.sink.hdfs.BucketWriter.open(BucketWriter.java:152)
>         at
> org.apache.flume.sink.hdfs.BucketWriter.append(BucketWriter.java:307)
>         at
> org.apache.flume.sink.hdfs.HDFSEventSink$1.call(HDFSEventSink.java:717)
>         at
> org.apache.flume.sink.hdfs.HDFSEventSink$1.call(HDFSEventSink.java:714)
>         at
> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:138)
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
>         at java.lang.Thread.run(Thread.java:662)
> Caused by: java.lang.NullPointerException
>         at
> org.apache.flume.sink.hdfs.HDFSDataStream.open(HDFSDataStream.java:75)
>         at
> org.apache.flume.sink.hdfs.BucketWriter.doOpen(BucketWriter.java:188)
>         ... 13 more
> 2012-08-02 16:59:05,239 INFO hdfs.BucketWriter: Creating hdfs://
> localhost :8020/data/cssplogs/FlumeData.1343906943265.tmp
> 2012-08-02 16:59:05,392 ERROR serialization.EventSerializerFactory: Unable
> to instantiate Builder from
> org.apache.flume.serialization.CustomLogAvroEventSerializer
> 2012-08-02 16:59:05,392 WARN hdfs.HDFSEventSink: HDFS IO error
> java.io.IOException: java.lang.NullPointerException
>         at
> org.apache.flume.sink.hdfs.BucketWriter.doOpen(BucketWriter.java:202)
>         at
> org.apache.flume.sink.hdfs.BucketWriter.access$000(BucketWriter.java:48)
>         at
> org.apache.flume.sink.hdfs.BucketWriter$1.run(BucketWriter.java:155)
>         at
> org.apache.flume.sink.hdfs.BucketWriter$1.run(BucketWriter.java:152)
>         at
> org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:125)
>         at
> org.apache.flume.sink.hdfs.BucketWriter.open(BucketWriter.java:152)
>         at
> org.apache.flume.sink.hdfs.BucketWriter.append(BucketWriter.java:307)
>         at
> org.apache.flume.sink.hdfs.HDFSEventSink$1.call(HDFSEventSink.java:717)
>         at
> org.apache.flume.sink.hdfs.HDFSEventSink$1.call(HDFSEventSink.java:714)
>         at
> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:138)
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
>         at java.lang.Thread.run(Thread.java:662)
>
>
> -----------------------------------------------------------------------------------------------------------------
>
>
> This is my avro file
>
> { "type": "record", "name": "LogEvent", "namespace":
> "org.apache.flume.serialization",
>   "fields": [
>     { "name": "srno",  "type": "int" },
>     { "name": "severity",  "type": "int" },
>     { "name": "timestamp", "type": "long" },
>     { "name": "hostname",  "type": "string" },
>     { "name": "message",   "type": "string" }
>   ]
> }
>
>
> ------------------------------------------------------------------------------------------------
>
> This is the LogEvent created using maven-avro and little customized
>
> @SuppressWarnings("all")
> public class LogEvent extends SpecificRecordBase implements SpecificRecord
> {
>   public static final Schema _SCHEMA =
> Schema.parse("{\"type\":\"record\",\"name\":\"LogEvent\",\"namespace\":\"org.apache.flume.serialization\",\"fields\":[{\"name\":\"srno\",\"type\":\"int\"},{\"name\":\"severity\",\"type\":\"int\"},{\"name\":\"timestamp\",\"type\":\"long\"},{\"name\":\"hostname\",\"type\":\"string\"},{\"name\":\"message\",\"type\":\"string\"}]}");
>   public int srno;
>   public String severity;
>   public long timestamp;
>   public String hostname;
>   public String message;
>
>   public Schema getSchema() { return _SCHEMA; }
>   public Object get(int _field) {
>     switch (_field) {
>     case 0: return srno;
>     case 1: return severity;
>     case 2: return timestamp;
>     case 3: return hostname;
>     case 4: return message;
>     default: throw new AvroRuntimeException("Bad index");
>     }
>   }
>
>   @SuppressWarnings(value="unchecked")
>   public void set(int _field, Object _value) {
>     switch (_field) {
>     case 0: srno = (Integer)_value; break;
>     case 1: severity = (String)_value; break;
>     case 2: timestamp = (Long)_value; break;
>     case 3: hostname = (String)_value; break;
>     case 4: message = (String)_value; break;
>     default: throw new AvroRuntimeException("Bad index");
>     }
>   }
>
>     public void setSrno(int srno) {
>         this.srno = srno;
>     }
>     public void setSeverity(String s) {
>         severity = s;
>     }
>     public String getSeverity() {
>         return severity;
>     }
>
>     public void setTimestamp(long t) {
>         timestamp = t;
>     }
>
>     public long getTimestamp() {
>         return timestamp;
>     }
>
>     public void setHostname(String h) {
>         hostname = h;
>     }
>
>     public String getHostname() {
>         return hostname;
>     }
>
>     public void setMessage(String m) {
>         message = m;
>     }
>
>     public String getMessage() {
>         return message;
>     }
>
> @Override
> public void put(int field, Object value) {
>     // TODO Auto-generated method stub
>
> }
> }
>
> -----------------------------------------------------------------------------------------------------------------------------------------------------------------------
> agent2.sources = seqGenSrc
> agent2.channels = memoryChannel
> agent2.sinks = loggerSink
>
>
> agent2.sources.seqGenSrc.type = avro
> agent2.sources.seqGenSrc.bind=slcso-poc2-lnx
> agent2.sources.seqGenSrc.port=41414
>
> #agent2.sources.seqGenSrc.interceptors = time hostInterceptor
> #agent2.sources.seqGenSrc.interceptors.hostInterceptor.type =
> org.apache.flume.interceptor.HostInterceptor$Builder
> #agent2.sources.seqGenSrc.interceptors.hostInterceptor.hostHeader = host
> #agent2.sources.seqGenSrc.interceptors.hostInterceptor.useIP = false
> #agent2.sources.seqGenSrc.interceptors.hostInterceptor.hostHeader.preserveExisting
> = false
> #agent2.sources.seqGenSrc.interceptors.time.type =
> org.apache.flume.interceptor.TimestampInterceptor$Builder
>
> agent2.channels.memoryChannel.type = memory
> agent2.channels.memoryChannel.capacity = 1000000
> agent2.channels.memoryChannel.transactionCapacity = 1000000
> agent2.channels.memoryChannel.keep-alive = 30
>
> agent2.sources.seqGenSrc.channels = memoryChannel
>
> agent2.sinks.loggerSink.type = hdfs
> #agent2.sinks.loggerSink.hdfs.path = hdfs://
> 10.105.39.204:8020/data/CspcLogs
> agent2.sinks.loggerSink.hdfs.path =
> hdfs://slcso-poc4-lnx:8020/data/cssplogs
> agent2.sinks.loggerSink.hdfs.fileType = DataStream
> #agent2.sinks.loggerSink.hdfs.writeFormat = Text
>
> agent2.sinks.loggerSink.channel = memoryChannel
> #agent2.sinks.loggerSink.serializer =
> org.apache.flume.serialization.BodyTextEventSerializer
> #agent2.sinks.loggerSink.serializer = avro_event
> agent2.sinks.loggerSink.serializer =
> org.apache.flume.serialization.CustomLogAvroEventSerializer
> agent2.sinks.loggerSink.serializer.compressionCodec = snappy
> #agent2.sinks.loggerSink.serializer.syncIntervalBytes = 2048000
> agent2.channels.memoryChannel.type = memory
> ~
>
>
>
> -----------------------------------------------------------------------------------------------------------------
> The following is my class
>
> public class CustomLogAvroEventSerializer extends
>         AbstractAvroEventSerializer<LogEvent> {
>
>       private static final DateTimeFormatter dateFmt1 =
>           DateTimeFormat.forPattern("MMM dd HH:mm:ss").withZoneUTC();
>
>       private static final DateTimeFormatter dateFmt2 =
>           DateTimeFormat.forPattern("MMM  d HH:mm:ss").withZoneUTC();
>
>
>       private static final Logger logger =
>           LoggerFactory.getLogger(CustomLogAvroEventSerializer.class);
>
>        private final OutputStream out;
>       private final Schema schema;
>
>       public CustomLogAvroEventSerializer(OutputStream out) throws
> IOException {
>         this.out = out;
>         this.schema =new LogEvent().getSchema();;
>       }
>
>       @Override
>       protected OutputStream getOutputStream() {
>         return out;
>       }
>
>       @Override
>       protected Schema getSchema() {
>         return schema;
>       }
>
>       // very simple rfc3164 parser
>       @Override
>       protected LogEvent convert(Event event) {
>           LogEvent sle = new LogEvent();
>
>         // Stringify body so it's easy to parse.
>         // This is a pretty inefficient way to do it.
>         String msg = new String(event.getBody(), Charsets.UTF_8);
>
>         // parser read pointer
>         int seek = 0;
>
>         // Check Flume headers to see if we came from SyslogTcp(or UDP)
> Source,
>         // which at the time of this writing only parses the priority.
>         // This is a bit schizophrenic and it should parse all the fields
> or none.
>         Map<String, String> headers = event.getHeaders();
>         boolean fromSyslogSource = false;
>         if (headers.containsKey(SyslogUtils.SYSLOG_SRNO)) {
>           fromSyslogSource = true;
>           int srno = Integer.parseInt(headers.get("srno"));
>           sle.setSrno(srno);
>         }else{
>             sle.setSrno(121);
>         }
>         if (headers.containsKey(SyslogUtils.SYSLOG_SEVERITY)) {
>           fromSyslogSource = true;
>           String severity = headers.get(SyslogUtils.SYSLOG_SEVERITY);
>           sle.setSeverity(severity);
>         }
>
>         // assume the message was received raw (maybe via NetcatSource)
>         // parse the priority string
>         if (!fromSyslogSource) {
>           if (msg.charAt(0) == '<') {
>             int end = msg.indexOf(">");
>             if (end > -1) {
>               seek = end + 1;
>               String priStr = msg.substring(1, end);
>              // int priority = Integer.parseInt(priStr);
>              // String severity = priStr;
>
>               sle.setSeverity(priStr);
>             }
>           }
>         }
>
>         // parse the timestamp
>         String timestampStr = msg.substring(seek, seek + 15);
>         long ts = parseRfc3164Date(timestampStr);
>         if (ts != 0) {
>           sle.setTimestamp(ts);
>           seek += 15 + 1; // space after timestamp
>         }
>
>         // parse the hostname
>         int nextSpace = msg.indexOf(' ', seek);
>         if (nextSpace > -1) {
>           String hostname = msg.substring(seek, nextSpace);
>           sle.setHostname(hostname);
>           seek = nextSpace + 1;
>         }
>
>         // everything else is the message
>         String actualMessage = msg.substring(seek);
>         sle.setMessage(actualMessage);
>
>         logger.debug("Serialized event as: {}", sle);
>
>         return sle;
>       }
>
>       private static long parseRfc3164Date(String in) {
>             DateTime date = null;
>             try {
>               date = dateFmt1.parseDateTime(in);
>             } catch (IllegalArgumentException e) {
>               // ignore the exception, we act based on nullity of date
> object
>               logger.debug("Date parse failed on ({}), trying single-digit
> date", in);
>             }
>
>             if (date == null) {
>               try {
>                 date = dateFmt2.parseDateTime(in);
>               } catch (IllegalArgumentException e) {
>                 // ignore the exception, we act based on nullity of date
> object
>                 logger.debug("2nd date parse failed on ({}), unknown date
> format", in);
>               }
>             }
>
>             // hacky stuff to try and deal with boundary cases, i.e. new
> year's eve.
>             // rfc3164 dates are really dumb.
>             // NB: cannot handle replaying of old logs or going back to
> the future
>             if (date != null) {
>               DateTime now = new DateTime();
>               int year = now.getYear();
>               DateTime corrected = date.withYear(year);
>
>               // flume clock is ahead or there is some latency, and the
> year rolled
>               if (corrected.isAfter(now) &&
> corrected.minusMonths(1).isAfter(now)) {
>                 corrected = date.withYear(year - 1);
>               // flume clock is behind and the year rolled
>               } else if (corrected.isBefore(now) &&
> corrected.plusMonths(1).isBefore(now)) {
>                 corrected = date.withYear(year + 1);
>               }
>               date = corrected;
>             }
>
>             if (date == null) {
>               return 0;
>             }
>
>             return date.getMillis();
>           }
>
>       public static class Builder implements EventSerializer.Builder {
>
>             @Override
>             public EventSerializer build(Context context, OutputStream
> out) {
>                 CustomLogAvroEventSerializer writer = null;
>               try {
>                 writer = new CustomLogAvroEventSerializer(out);
>                 writer.configure(context);
>               } catch (IOException e) {
>                 logger.error("Unable to parse schema file. Exception
> follows.", e);
>               }
>               return writer;
>             }
>
>     }
>
>
>
> }
>
> Please suggest me i need output like this and i want to customize like
> log4j
>
> -------------------------------------------------------------------------------------------------------------------------------------
> 172  [main] FATAL com.cisco.flume.FlumeTest  - Sample fatal message
> 188  [main] INFO  com.cisco.flume.FlumeTest  - Sample info message1
> 203  [main] WARN  com.cisco.flume.FlumeTest  - Sample warn message
> 219  [main] INFO  com.cisco.flume.FlumeTest  - Sample info message2
> 219  [main] INFO  com.cisco.flume.FlumeTest  - Sample info message3
> 266  [main] ERROR com.cisco.flume.FlumeTest  - Sample error message
> 282  [main] FATAL com.cisco.flume.FlumeTest  - Sample fatal message
> 282  [main] INFO  com.cisco.flume.FlumeTest  - Sample info message4
>
>
> --
> JP
>
>
>
> --
> JP
>
>
>


-- 
JP

Re: Custom Serializer is not working

Posted by Mubarak Seyed <se...@apple.com>.
Do you have a custom jar which contains org.apache.flume.serialization.CustomLogAvroEventSerializer in flume classpath? You can copy the custom jar file to <FLUME_HOME>/lib directory.

ERROR serialization.EventSerializerFactory: Unable to instantiate Builder from org.apache.flume.serialization.CustomLogAvroEventSerializer

On Aug 2, 2012, at 8:26 PM, JP wrote:

> HI,
> 
> Im getting errros
> 
> 2012-08-02 16:58:50,065 INFO source.AvroSource: Avro source seqGenSrc started.
> 2012-08-02 16:59:02,463 INFO ipc.NettyServer: [id: 0x45cbda0a, /localhost=> / localhost 41414] OPEN
> 2012-08-02 16:59:02,466 INFO ipc.NettyServer: [id: 0x45cbda0a, /localhost=> / localhost :41414] BOUND: / localhost :41414
> 2012-08-02 16:59:02,466 INFO ipc.NettyServer: [id: 0x45cbda0a, /localhost:3770 => / localhost :41414] CONNECTED: / localhost :3770
> 2012-08-02 16:59:04,006 INFO hdfs.BucketWriter: Creating hdfs:// localhost :8020/data/cssplogs/FlumeData.1343906943264.tmp
> 2012-08-02 16:59:04,167 ERROR serialization.EventSerializerFactory: Unable to instantiate Builder from org.apache.flume.serialization.CustomLogAvroEventSerializer
> 2012-08-02 16:59:04,168 WARN hdfs.HDFSEventSink: HDFS IO error
> java.io.IOException: java.lang.NullPointerException
>         at org.apache.flume.sink.hdfs.BucketWriter.doOpen(BucketWriter.java:202)
>         at org.apache.flume.sink.hdfs.BucketWriter.access$000(BucketWriter.java:48)
>         at org.apache.flume.sink.hdfs.BucketWriter$1.run(BucketWriter.java:155)
>         at org.apache.flume.sink.hdfs.BucketWriter$1.run(BucketWriter.java:152)
>         at org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:125)
>         at org.apache.flume.sink.hdfs.BucketWriter.open(BucketWriter.java:152)
>         at org.apache.flume.sink.hdfs.BucketWriter.append(BucketWriter.java:307)
>         at org.apache.flume.sink.hdfs.HDFSEventSink$1.call(HDFSEventSink.java:717)
>         at org.apache.flume.sink.hdfs.HDFSEventSink$1.call(HDFSEventSink.java:714)
>         at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:138)
>         at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
>         at java.lang.Thread.run(Thread.java:662)
> Caused by: java.lang.NullPointerException
>         at org.apache.flume.sink.hdfs.HDFSDataStream.open(HDFSDataStream.java:75)
>         at org.apache.flume.sink.hdfs.BucketWriter.doOpen(BucketWriter.java:188)
>         ... 13 more
> 2012-08-02 16:59:05,239 INFO hdfs.BucketWriter: Creating hdfs:// localhost :8020/data/cssplogs/FlumeData.1343906943265.tmp
> 2012-08-02 16:59:05,392 ERROR serialization.EventSerializerFactory: Unable to instantiate Builder from org.apache.flume.serialization.CustomLogAvroEventSerializer
> 2012-08-02 16:59:05,392 WARN hdfs.HDFSEventSink: HDFS IO error
> java.io.IOException: java.lang.NullPointerException
>         at org.apache.flume.sink.hdfs.BucketWriter.doOpen(BucketWriter.java:202)
>         at org.apache.flume.sink.hdfs.BucketWriter.access$000(BucketWriter.java:48)
>         at org.apache.flume.sink.hdfs.BucketWriter$1.run(BucketWriter.java:155)
>         at org.apache.flume.sink.hdfs.BucketWriter$1.run(BucketWriter.java:152)
>         at org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:125)
>         at org.apache.flume.sink.hdfs.BucketWriter.open(BucketWriter.java:152)
>         at org.apache.flume.sink.hdfs.BucketWriter.append(BucketWriter.java:307)
>         at org.apache.flume.sink.hdfs.HDFSEventSink$1.call(HDFSEventSink.java:717)
>         at org.apache.flume.sink.hdfs.HDFSEventSink$1.call(HDFSEventSink.java:714)
>         at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:138)
>         at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
>         at java.lang.Thread.run(Thread.java:662)
> 
> -----------------------------------------------------------------------------------------------------------------
> 
> 
> This is my avro file
> 
> { "type": "record", "name": "LogEvent", "namespace": "org.apache.flume.serialization",
>   "fields": [
>     { "name": "srno",  "type": "int" },
>     { "name": "severity",  "type": "int" },
>     { "name": "timestamp", "type": "long" },
>     { "name": "hostname",  "type": "string" },
>     { "name": "message",   "type": "string" }
>   ]
> }
> 
> ------------------------------------------------------------------------------------------------
> 
> This is the LogEvent created using maven-avro and little customized
> 
> @SuppressWarnings("all")
> public class LogEvent extends SpecificRecordBase implements SpecificRecord {
>   public static final Schema _SCHEMA = Schema.parse("{\"type\":\"record\",\"name\":\"LogEvent\",\"namespace\":\"org.apache.flume.serialization\",\"fields\":[{\"name\":\"srno\",\"type\":\"int\"},{\"name\":\"severity\",\"type\":\"int\"},{\"name\":\"timestamp\",\"type\":\"long\"},{\"name\":\"hostname\",\"type\":\"string\"},{\"name\":\"message\",\"type\":\"string\"}]}");
>   public int srno;
>   public String severity;
>   public long timestamp;
>   public String hostname;
>   public String message;
>   
>   public Schema getSchema() { return _SCHEMA; }
>   public Object get(int _field) {
>     switch (_field) {
>     case 0: return srno;
>     case 1: return severity;
>     case 2: return timestamp;
>     case 3: return hostname;
>     case 4: return message;
>     default: throw new AvroRuntimeException("Bad index");
>     }
>   }
>   
>   @SuppressWarnings(value="unchecked")
>   public void set(int _field, Object _value) {
>     switch (_field) {
>     case 0: srno = (Integer)_value; break;
>     case 1: severity = (String)_value; break;
>     case 2: timestamp = (Long)_value; break;
>     case 3: hostname = (String)_value; break;
>     case 4: message = (String)_value; break;
>     default: throw new AvroRuntimeException("Bad index");
>     }
>   }
>   
>     public void setSrno(int srno) {
>         this.srno = srno;
>     }
>     public void setSeverity(String s) { 
>         severity = s; 
>     }
>     public String getSeverity() { 
>         return severity; 
>     }
> 
>     public void setTimestamp(long t) {
>         timestamp = t;
>     }
> 
>     public long getTimestamp() {
>         return timestamp;
>     }
> 
>     public void setHostname(String h) {
>         hostname = h;
>     }
> 
>     public String getHostname() {
>         return hostname;
>     }
> 
>     public void setMessage(String m) {
>         message = m;
>     }
> 
>     public String getMessage() {
>         return message;
>     }
> 
> @Override
> public void put(int field, Object value) {
>     // TODO Auto-generated method stub
>     
> }
> }
> -----------------------------------------------------------------------------------------------------------------------------------------------------------------------
> agent2.sources = seqGenSrc
> agent2.channels = memoryChannel
> agent2.sinks = loggerSink
> 
> 
> agent2.sources.seqGenSrc.type = avro
> agent2.sources.seqGenSrc.bind=slcso-poc2-lnx
> agent2.sources.seqGenSrc.port=41414
> 
> #agent2.sources.seqGenSrc.interceptors = time hostInterceptor
> #agent2.sources.seqGenSrc.interceptors.hostInterceptor.type = org.apache.flume.interceptor.HostInterceptor$Builder
> #agent2.sources.seqGenSrc.interceptors.hostInterceptor.hostHeader = host
> #agent2.sources.seqGenSrc.interceptors.hostInterceptor.useIP = false
> #agent2.sources.seqGenSrc.interceptors.hostInterceptor.hostHeader.preserveExisting = false
> #agent2.sources.seqGenSrc.interceptors.time.type = org.apache.flume.interceptor.TimestampInterceptor$Builder
> 
> agent2.channels.memoryChannel.type = memory
> agent2.channels.memoryChannel.capacity = 1000000
> agent2.channels.memoryChannel.transactionCapacity = 1000000
> agent2.channels.memoryChannel.keep-alive = 30
> 
> agent2.sources.seqGenSrc.channels = memoryChannel
> 
> agent2.sinks.loggerSink.type = hdfs
> #agent2.sinks.loggerSink.hdfs.path = hdfs://10.105.39.204:8020/data/CspcLogs
> agent2.sinks.loggerSink.hdfs.path = hdfs://slcso-poc4-lnx:8020/data/cssplogs
> agent2.sinks.loggerSink.hdfs.fileType = DataStream
> #agent2.sinks.loggerSink.hdfs.writeFormat = Text
> 
> agent2.sinks.loggerSink.channel = memoryChannel
> #agent2.sinks.loggerSink.serializer = org.apache.flume.serialization.BodyTextEventSerializer
> #agent2.sinks.loggerSink.serializer = avro_event
> agent2.sinks.loggerSink.serializer = org.apache.flume.serialization.CustomLogAvroEventSerializer
> agent2.sinks.loggerSink.serializer.compressionCodec = snappy
> #agent2.sinks.loggerSink.serializer.syncIntervalBytes = 2048000
> agent2.channels.memoryChannel.type = memory
> ~
> 
> 
> -----------------------------------------------------------------------------------------------------------------
> The following is my class
> 
> public class CustomLogAvroEventSerializer extends
>         AbstractAvroEventSerializer<LogEvent> {
>     
>       private static final DateTimeFormatter dateFmt1 =
>           DateTimeFormat.forPattern("MMM dd HH:mm:ss").withZoneUTC();
>       
>       private static final DateTimeFormatter dateFmt2 =
>           DateTimeFormat.forPattern("MMM  d HH:mm:ss").withZoneUTC();
> 
>       
>       private static final Logger logger =
>           LoggerFactory.getLogger(CustomLogAvroEventSerializer.class);
>       
>        private final OutputStream out;
>       private final Schema schema;
> 
>       public CustomLogAvroEventSerializer(OutputStream out) throws IOException {
>         this.out = out;
>         this.schema =new LogEvent().getSchema();;
>       }
> 
>       @Override
>       protected OutputStream getOutputStream() {
>         return out;
>       }
> 
>       @Override
>       protected Schema getSchema() {
>         return schema;
>       }
> 
>       // very simple rfc3164 parser
>       @Override
>       protected LogEvent convert(Event event) {
>           LogEvent sle = new LogEvent();
> 
>         // Stringify body so it's easy to parse.
>         // This is a pretty inefficient way to do it.
>         String msg = new String(event.getBody(), Charsets.UTF_8);
> 
>         // parser read pointer
>         int seek = 0;
> 
>         // Check Flume headers to see if we came from SyslogTcp(or UDP) Source,
>         // which at the time of this writing only parses the priority.
>         // This is a bit schizophrenic and it should parse all the fields or none.
>         Map<String, String> headers = event.getHeaders();
>         boolean fromSyslogSource = false;
>         if (headers.containsKey(SyslogUtils.SYSLOG_SRNO)) {
>           fromSyslogSource = true;
>           int srno = Integer.parseInt(headers.get("srno"));
>           sle.setSrno(srno);
>         }else{
>             sle.setSrno(121);
>         }
>         if (headers.containsKey(SyslogUtils.SYSLOG_SEVERITY)) {
>           fromSyslogSource = true;
>           String severity = headers.get(SyslogUtils.SYSLOG_SEVERITY);
>           sle.setSeverity(severity);
>         }
> 
>         // assume the message was received raw (maybe via NetcatSource)
>         // parse the priority string
>         if (!fromSyslogSource) {
>           if (msg.charAt(0) == '<') {
>             int end = msg.indexOf(">");
>             if (end > -1) {
>               seek = end + 1;
>               String priStr = msg.substring(1, end);
>              // int priority = Integer.parseInt(priStr);
>              // String severity = priStr;
>           
>               sle.setSeverity(priStr);
>             }
>           }
>         }
> 
>         // parse the timestamp
>         String timestampStr = msg.substring(seek, seek + 15);
>         long ts = parseRfc3164Date(timestampStr);
>         if (ts != 0) {
>           sle.setTimestamp(ts);
>           seek += 15 + 1; // space after timestamp
>         }
> 
>         // parse the hostname
>         int nextSpace = msg.indexOf(' ', seek);
>         if (nextSpace > -1) {
>           String hostname = msg.substring(seek, nextSpace);
>           sle.setHostname(hostname);
>           seek = nextSpace + 1;
>         }
> 
>         // everything else is the message
>         String actualMessage = msg.substring(seek);
>         sle.setMessage(actualMessage);
> 
>         logger.debug("Serialized event as: {}", sle);
> 
>         return sle;
>       }
>       
>       private static long parseRfc3164Date(String in) {
>             DateTime date = null;
>             try {
>               date = dateFmt1.parseDateTime(in);
>             } catch (IllegalArgumentException e) {
>               // ignore the exception, we act based on nullity of date object
>               logger.debug("Date parse failed on ({}), trying single-digit date", in);
>             }
> 
>             if (date == null) {
>               try {
>                 date = dateFmt2.parseDateTime(in);
>               } catch (IllegalArgumentException e) {
>                 // ignore the exception, we act based on nullity of date object
>                 logger.debug("2nd date parse failed on ({}), unknown date format", in);
>               }
>             }
> 
>             // hacky stuff to try and deal with boundary cases, i.e. new year's eve.
>             // rfc3164 dates are really dumb.
>             // NB: cannot handle replaying of old logs or going back to the future
>             if (date != null) {
>               DateTime now = new DateTime();
>               int year = now.getYear();
>               DateTime corrected = date.withYear(year);
> 
>               // flume clock is ahead or there is some latency, and the year rolled
>               if (corrected.isAfter(now) && corrected.minusMonths(1).isAfter(now)) {
>                 corrected = date.withYear(year - 1);
>               // flume clock is behind and the year rolled
>               } else if (corrected.isBefore(now) && corrected.plusMonths(1).isBefore(now)) {
>                 corrected = date.withYear(year + 1);
>               }
>               date = corrected;
>             }
> 
>             if (date == null) {
>               return 0;
>             }
> 
>             return date.getMillis();
>           }
>       
>       public static class Builder implements EventSerializer.Builder {
> 
>             @Override
>             public EventSerializer build(Context context, OutputStream out) {
>                 CustomLogAvroEventSerializer writer = null;
>               try {
>                 writer = new CustomLogAvroEventSerializer(out);
>                 writer.configure(context);
>               } catch (IOException e) {
>                 logger.error("Unable to parse schema file. Exception follows.", e);
>               }
>               return writer;
>             }
> 
>     }
>       
> 
> 
> }
> 
> Please suggest me i need output like this and i want to customize like log4j
> -------------------------------------------------------------------------------------------------------------------------------------
> 172  [main] FATAL com.cisco.flume.FlumeTest  - Sample fatal message
> 188  [main] INFO  com.cisco.flume.FlumeTest  - Sample info message1
> 203  [main] WARN  com.cisco.flume.FlumeTest  - Sample warn message
> 219  [main] INFO  com.cisco.flume.FlumeTest  - Sample info message2
> 219  [main] INFO  com.cisco.flume.FlumeTest  - Sample info message3
> 266  [main] ERROR com.cisco.flume.FlumeTest  - Sample error message
> 282  [main] FATAL com.cisco.flume.FlumeTest  - Sample fatal message
> 282  [main] INFO  com.cisco.flume.FlumeTest  - Sample info message4
> 
> 
> -- 
> JP
> 
> 
> 
> -- 
> JP