You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flume.apache.org by JP <jp...@gmail.com> on 2012/08/03 05:26:24 UTC
Custom Serializer is not working
HI,
Im getting errros
2012-08-02 16:58:50,065 INFO source.AvroSource: Avro source seqGenSrc
started.
2012-08-02 16:59:02,463 INFO ipc.NettyServer: [id: 0x45cbda0a, /localhost=>
/ localhost 41414 <http://10.105.39.202:41414>] OPEN
2012-08-02 16:59:02,466 INFO ipc.NettyServer: [id: 0x45cbda0a, /localhost=>
/ localhost :41414 <http://10.105.39.202:41414>] BOUND: / localhost
:41414<http://10.105.39.202:41414>
2012-08-02 16:59:02,466 INFO ipc.NettyServer: [id: 0x45cbda0a, /localhost
:3770 <http://10.77.235.245:3770> => / localhost
:41414<http://10.105.39.202:41414>]
CONNECTED: / localhost :3770 <http://10.77.235.245:3770>
2012-08-02 16:59:04,006 INFO hdfs.BucketWriter: Creating hdfs://
localhost :8020/data/cssplogs/FlumeData.1343906943264.tmp
2012-08-02 16:59:04,167 ERROR serialization.EventSerializerFactory: Unable
to instantiate Builder from
org.apache.flume.serialization.CustomLogAvroEventSerializer
2012-08-02 16:59:04,168 WARN hdfs.HDFSEventSink: HDFS IO error
java.io.IOException: java.lang.NullPointerException
at
org.apache.flume.sink.hdfs.BucketWriter.doOpen(BucketWriter.java:202)
at
org.apache.flume.sink.hdfs.BucketWriter.access$000(BucketWriter.java:48)
at
org.apache.flume.sink.hdfs.BucketWriter$1.run(BucketWriter.java:155)
at
org.apache.flume.sink.hdfs.BucketWriter$1.run(BucketWriter.java:152)
at
org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:125)
at
org.apache.flume.sink.hdfs.BucketWriter.open(BucketWriter.java:152)
at
org.apache.flume.sink.hdfs.BucketWriter.append(BucketWriter.java:307)
at
org.apache.flume.sink.hdfs.HDFSEventSink$1.call(HDFSEventSink.java:717)
at
org.apache.flume.sink.hdfs.HDFSEventSink$1.call(HDFSEventSink.java:714)
at
java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
at java.util.concurrent.FutureTask.run(FutureTask.java:138)
at
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:662)
Caused by: java.lang.NullPointerException
at
org.apache.flume.sink.hdfs.HDFSDataStream.open(HDFSDataStream.java:75)
at
org.apache.flume.sink.hdfs.BucketWriter.doOpen(BucketWriter.java:188)
... 13 more
2012-08-02 16:59:05,239 INFO hdfs.BucketWriter: Creating hdfs://
localhost :8020/data/cssplogs/FlumeData.1343906943265.tmp
2012-08-02 16:59:05,392 ERROR serialization.EventSerializerFactory: Unable
to instantiate Builder from
org.apache.flume.serialization.CustomLogAvroEventSerializer
2012-08-02 16:59:05,392 WARN hdfs.HDFSEventSink: HDFS IO error
java.io.IOException: java.lang.NullPointerException
at
org.apache.flume.sink.hdfs.BucketWriter.doOpen(BucketWriter.java:202)
at
org.apache.flume.sink.hdfs.BucketWriter.access$000(BucketWriter.java:48)
at
org.apache.flume.sink.hdfs.BucketWriter$1.run(BucketWriter.java:155)
at
org.apache.flume.sink.hdfs.BucketWriter$1.run(BucketWriter.java:152)
at
org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:125)
at
org.apache.flume.sink.hdfs.BucketWriter.open(BucketWriter.java:152)
at
org.apache.flume.sink.hdfs.BucketWriter.append(BucketWriter.java:307)
at
org.apache.flume.sink.hdfs.HDFSEventSink$1.call(HDFSEventSink.java:717)
at
org.apache.flume.sink.hdfs.HDFSEventSink$1.call(HDFSEventSink.java:714)
at
java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
at java.util.concurrent.FutureTask.run(FutureTask.java:138)
at
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:662)
-----------------------------------------------------------------------------------------------------------------
This is my avro file
{ "type": "record", "name": "LogEvent", "namespace":
"org.apache.flume.serialization",
"fields": [
{ "name": "srno", "type": "int" },
{ "name": "severity", "type": "int" },
{ "name": "timestamp", "type": "long" },
{ "name": "hostname", "type": "string" },
{ "name": "message", "type": "string" }
]
}
------------------------------------------------------------------------------------------------
This is the LogEvent created using maven-avro and little customized
@SuppressWarnings("all")
public class LogEvent extends SpecificRecordBase implements SpecificRecord {
public static final Schema _SCHEMA =
Schema.parse("{\"type\":\"record\",\"name\":\"LogEvent\",\"namespace\":\"org.apache.flume.serialization\",\"fields\":[{\"name\":\"srno\",\"type\":\"int\"},{\"name\":\"severity\",\"type\":\"int\"},{\"name\":\"timestamp\",\"type\":\"long\"},{\"name\":\"hostname\",\"type\":\"string\"},{\"name\":\"message\",\"type\":\"string\"}]}");
public int srno;
public String severity;
public long timestamp;
public String hostname;
public String message;
public Schema getSchema() { return _SCHEMA; }
public Object get(int _field) {
switch (_field) {
case 0: return srno;
case 1: return severity;
case 2: return timestamp;
case 3: return hostname;
case 4: return message;
default: throw new AvroRuntimeException("Bad index");
}
}
@SuppressWarnings(value="unchecked")
public void set(int _field, Object _value) {
switch (_field) {
case 0: srno = (Integer)_value; break;
case 1: severity = (String)_value; break;
case 2: timestamp = (Long)_value; break;
case 3: hostname = (String)_value; break;
case 4: message = (String)_value; break;
default: throw new AvroRuntimeException("Bad index");
}
}
public void setSrno(int srno) {
this.srno = srno;
}
public void setSeverity(String s) {
severity = s;
}
public String getSeverity() {
return severity;
}
public void setTimestamp(long t) {
timestamp = t;
}
public long getTimestamp() {
return timestamp;
}
public void setHostname(String h) {
hostname = h;
}
public String getHostname() {
return hostname;
}
public void setMessage(String m) {
message = m;
}
public String getMessage() {
return message;
}
@Override
public void put(int field, Object value) {
// TODO Auto-generated method stub
}
}
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------
agent2.sources = seqGenSrc
agent2.channels = memoryChannel
agent2.sinks = loggerSink
agent2.sources.seqGenSrc.type = avro
agent2.sources.seqGenSrc.bind=slcso-poc2-lnx
agent2.sources.seqGenSrc.port=41414
#agent2.sources.seqGenSrc.interceptors = time hostInterceptor
#agent2.sources.seqGenSrc.interceptors.hostInterceptor.type =
org.apache.flume.interceptor.HostInterceptor$Builder
#agent2.sources.seqGenSrc.interceptors.hostInterceptor.hostHeader = host
#agent2.sources.seqGenSrc.interceptors.hostInterceptor.useIP = false
#agent2.sources.seqGenSrc.interceptors.hostInterceptor.hostHeader.preserveExisting
= false
#agent2.sources.seqGenSrc.interceptors.time.type =
org.apache.flume.interceptor.TimestampInterceptor$Builder
agent2.channels.memoryChannel.type = memory
agent2.channels.memoryChannel.capacity = 1000000
agent2.channels.memoryChannel.transactionCapacity = 1000000
agent2.channels.memoryChannel.keep-alive = 30
agent2.sources.seqGenSrc.channels = memoryChannel
agent2.sinks.loggerSink.type = hdfs
#agent2.sinks.loggerSink.hdfs.path = hdfs://10.105.39.204:8020/data/CspcLogs
agent2.sinks.loggerSink.hdfs.path = hdfs://slcso-poc4-lnx:8020/data/cssplogs
agent2.sinks.loggerSink.hdfs.fileType = DataStream
#agent2.sinks.loggerSink.hdfs.writeFormat = Text
agent2.sinks.loggerSink.channel = memoryChannel
#agent2.sinks.loggerSink.serializer =
org.apache.flume.serialization.BodyTextEventSerializer
#agent2.sinks.loggerSink.serializer = avro_event
agent2.sinks.loggerSink.serializer =
org.apache.flume.serialization.CustomLogAvroEventSerializer
agent2.sinks.loggerSink.serializer.compressionCodec = snappy
#agent2.sinks.loggerSink.serializer.syncIntervalBytes = 2048000
agent2.channels.memoryChannel.type = memory
~
-----------------------------------------------------------------------------------------------------------------
The following is my class
public class CustomLogAvroEventSerializer extends
AbstractAvroEventSerializer<LogEvent> {
private static final DateTimeFormatter dateFmt1 =
DateTimeFormat.forPattern("MMM dd HH:mm:ss").withZoneUTC();
private static final DateTimeFormatter dateFmt2 =
DateTimeFormat.forPattern("MMM d HH:mm:ss").withZoneUTC();
private static final Logger logger =
LoggerFactory.getLogger(CustomLogAvroEventSerializer.class);
private final OutputStream out;
private final Schema schema;
public CustomLogAvroEventSerializer(OutputStream out) throws
IOException {
this.out = out;
this.schema =new LogEvent().getSchema();;
}
@Override
protected OutputStream getOutputStream() {
return out;
}
@Override
protected Schema getSchema() {
return schema;
}
// very simple rfc3164 parser
@Override
protected LogEvent convert(Event event) {
LogEvent sle = new LogEvent();
// Stringify body so it's easy to parse.
// This is a pretty inefficient way to do it.
String msg = new String(event.getBody(), Charsets.UTF_8);
// parser read pointer
int seek = 0;
// Check Flume headers to see if we came from SyslogTcp(or UDP)
Source,
// which at the time of this writing only parses the priority.
// This is a bit schizophrenic and it should parse all the fields
or none.
Map<String, String> headers = event.getHeaders();
boolean fromSyslogSource = false;
if (headers.containsKey(SyslogUtils.SYSLOG_SRNO)) {
fromSyslogSource = true;
int srno = Integer.parseInt(headers.get("srno"));
sle.setSrno(srno);
}else{
sle.setSrno(121);
}
if (headers.containsKey(SyslogUtils.SYSLOG_SEVERITY)) {
fromSyslogSource = true;
String severity = headers.get(SyslogUtils.SYSLOG_SEVERITY);
sle.setSeverity(severity);
}
// assume the message was received raw (maybe via NetcatSource)
// parse the priority string
if (!fromSyslogSource) {
if (msg.charAt(0) == '<') {
int end = msg.indexOf(">");
if (end > -1) {
seek = end + 1;
String priStr = msg.substring(1, end);
// int priority = Integer.parseInt(priStr);
// String severity = priStr;
sle.setSeverity(priStr);
}
}
}
// parse the timestamp
String timestampStr = msg.substring(seek, seek + 15);
long ts = parseRfc3164Date(timestampStr);
if (ts != 0) {
sle.setTimestamp(ts);
seek += 15 + 1; // space after timestamp
}
// parse the hostname
int nextSpace = msg.indexOf(' ', seek);
if (nextSpace > -1) {
String hostname = msg.substring(seek, nextSpace);
sle.setHostname(hostname);
seek = nextSpace + 1;
}
// everything else is the message
String actualMessage = msg.substring(seek);
sle.setMessage(actualMessage);
logger.debug("Serialized event as: {}", sle);
return sle;
}
private static long parseRfc3164Date(String in) {
DateTime date = null;
try {
date = dateFmt1.parseDateTime(in);
} catch (IllegalArgumentException e) {
// ignore the exception, we act based on nullity of date
object
logger.debug("Date parse failed on ({}), trying single-digit
date", in);
}
if (date == null) {
try {
date = dateFmt2.parseDateTime(in);
} catch (IllegalArgumentException e) {
// ignore the exception, we act based on nullity of date
object
logger.debug("2nd date parse failed on ({}), unknown date
format", in);
}
}
// hacky stuff to try and deal with boundary cases, i.e. new
year's eve.
// rfc3164 dates are really dumb.
// NB: cannot handle replaying of old logs or going back to the
future
if (date != null) {
DateTime now = new DateTime();
int year = now.getYear();
DateTime corrected = date.withYear(year);
// flume clock is ahead or there is some latency, and the
year rolled
if (corrected.isAfter(now) &&
corrected.minusMonths(1).isAfter(now)) {
corrected = date.withYear(year - 1);
// flume clock is behind and the year rolled
} else if (corrected.isBefore(now) &&
corrected.plusMonths(1).isBefore(now)) {
corrected = date.withYear(year + 1);
}
date = corrected;
}
if (date == null) {
return 0;
}
return date.getMillis();
}
public static class Builder implements EventSerializer.Builder {
@Override
public EventSerializer build(Context context, OutputStream out)
{
CustomLogAvroEventSerializer writer = null;
try {
writer = new CustomLogAvroEventSerializer(out);
writer.configure(context);
} catch (IOException e) {
logger.error("Unable to parse schema file. Exception
follows.", e);
}
return writer;
}
}
}
Please suggest me i need output like this and i want to customize like log4j
-------------------------------------------------------------------------------------------------------------------------------------
172 [main] FATAL com.cisco.flume.FlumeTest - Sample fatal message
188 [main] INFO com.cisco.flume.FlumeTest - Sample info message1
203 [main] WARN com.cisco.flume.FlumeTest - Sample warn message
219 [main] INFO com.cisco.flume.FlumeTest - Sample info message2
219 [main] INFO com.cisco.flume.FlumeTest - Sample info message3
266 [main] ERROR com.cisco.flume.FlumeTest - Sample error message
282 [main] FATAL com.cisco.flume.FlumeTest - Sample fatal message
282 [main] INFO com.cisco.flume.FlumeTest - Sample info message4
--
JP
--
JP
Re: Custom Serializer is not working
Posted by JP <jp...@gmail.com>.
Thanks Mubarak it is working fine after adding
agent2.sinks.loggerSink.
serializer =
org.apache.flume.serialization.CustomLogAvroEventSerializer$Builder
On Fri, Aug 3, 2012 at 9:23 AM, Mubarak Seyed <se...@apple.com> wrote:
> can you please try with
>
> agent2.sinks.loggerSink.serializer =
>> org.apache.flume.serialization.CustomLogAvroEventSerializer$Builder
>>
>>
> On Aug 2, 2012, at 8:39 PM, JP wrote:
>
> Thanks Mubarak,
>
> I have added the jar to lib .
>
> After that only im getting the exception.
>
> Any help to resolve this issue.
>
> On Fri, Aug 3, 2012 at 9:05 AM, Mubarak Seyed <se...@apple.com> wrote:
>
>> Do you have a custom jar which contains
>> org.apache.flume.serialization.CustomLogAvroEventSerializer in flume
>> classpath? You can copy the custom jar file to <FLUME_HOME>/libdirectory.
>>
>> ERROR serialization.EventSerializerFactory: Unable to instantiate Builder
>> from org.apache.flume.serialization.CustomLogAvroEventSerializer
>>
>> On Aug 2, 2012, at 8:26 PM, JP wrote:
>>
>> HI,
>>
>> Im getting errros
>>
>> 2012-08-02 16:58:50,065 INFO source.AvroSource: Avro source seqGenSrc
>> started.
>> 2012-08-02 16:59:02,463 INFO ipc.NettyServer: [id: 0x45cbda0a,
>> /localhost=> / localhost 41414 <http://10.105.39.202:41414/>] OPEN
>> 2012-08-02 16:59:02,466 INFO ipc.NettyServer: [id: 0x45cbda0a,
>> /localhost=> / localhost :41414 <http://10.105.39.202:41414/>] BOUND: /
>> localhost :41414 <http://10.105.39.202:41414/>
>> 2012-08-02 16:59:02,466 INFO ipc.NettyServer: [id: 0x45cbda0a, /localhost
>> :3770 <http://10.77.235.245:3770/> => / localhost :41414<http://10.105.39.202:41414/>]
>> CONNECTED: / localhost :3770 <http://10.77.235.245:3770/>
>> 2012-08-02 16:59:04,006 INFO hdfs.BucketWriter: Creating hdfs://
>> localhost :8020/data/cssplogs/FlumeData.1343906943264.tmp
>> 2012-08-02 16:59:04,167 ERROR serialization.EventSerializerFactory:
>> Unable to instantiate Builder from
>> org.apache.flume.serialization.CustomLogAvroEventSerializer
>> 2012-08-02 16:59:04,168 WARN hdfs.HDFSEventSink: HDFS IO error
>> java.io.IOException: java.lang.NullPointerException
>> at
>> org.apache.flume.sink.hdfs.BucketWriter.doOpen(BucketWriter.java:202)
>> at
>> org.apache.flume.sink.hdfs.BucketWriter.access$000(BucketWriter.java:48)
>> at
>> org.apache.flume.sink.hdfs.BucketWriter$1.run(BucketWriter.java:155)
>> at
>> org.apache.flume.sink.hdfs.BucketWriter$1.run(BucketWriter.java:152)
>> at
>> org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:125)
>> at
>> org.apache.flume.sink.hdfs.BucketWriter.open(BucketWriter.java:152)
>> at
>> org.apache.flume.sink.hdfs.BucketWriter.append(BucketWriter.java:307)
>> at
>> org.apache.flume.sink.hdfs.HDFSEventSink$1.call(HDFSEventSink.java:717)
>> at
>> org.apache.flume.sink.hdfs.HDFSEventSink$1.call(HDFSEventSink.java:714)
>> at
>> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
>> at java.util.concurrent.FutureTask.run(FutureTask.java:138)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
>> at java.lang.Thread.run(Thread.java:662)
>> Caused by: java.lang.NullPointerException
>> at
>> org.apache.flume.sink.hdfs.HDFSDataStream.open(HDFSDataStream.java:75)
>> at
>> org.apache.flume.sink.hdfs.BucketWriter.doOpen(BucketWriter.java:188)
>> ... 13 more
>> 2012-08-02 16:59:05,239 INFO hdfs.BucketWriter: Creating hdfs://
>> localhost :8020/data/cssplogs/FlumeData.1343906943265.tmp
>> 2012-08-02 16:59:05,392 ERROR serialization.EventSerializerFactory:
>> Unable to instantiate Builder from
>> org.apache.flume.serialization.CustomLogAvroEventSerializer
>> 2012-08-02 16:59:05,392 WARN hdfs.HDFSEventSink: HDFS IO error
>> java.io.IOException: java.lang.NullPointerException
>> at
>> org.apache.flume.sink.hdfs.BucketWriter.doOpen(BucketWriter.java:202)
>> at
>> org.apache.flume.sink.hdfs.BucketWriter.access$000(BucketWriter.java:48)
>> at
>> org.apache.flume.sink.hdfs.BucketWriter$1.run(BucketWriter.java:155)
>> at
>> org.apache.flume.sink.hdfs.BucketWriter$1.run(BucketWriter.java:152)
>> at
>> org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:125)
>> at
>> org.apache.flume.sink.hdfs.BucketWriter.open(BucketWriter.java:152)
>> at
>> org.apache.flume.sink.hdfs.BucketWriter.append(BucketWriter.java:307)
>> at
>> org.apache.flume.sink.hdfs.HDFSEventSink$1.call(HDFSEventSink.java:717)
>> at
>> org.apache.flume.sink.hdfs.HDFSEventSink$1.call(HDFSEventSink.java:714)
>> at
>> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
>> at java.util.concurrent.FutureTask.run(FutureTask.java:138)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
>> at java.lang.Thread.run(Thread.java:662)
>>
>>
>> -----------------------------------------------------------------------------------------------------------------
>>
>>
>> This is my avro file
>>
>> { "type": "record", "name": "LogEvent", "namespace":
>> "org.apache.flume.serialization",
>> "fields": [
>> { "name": "srno", "type": "int" },
>> { "name": "severity", "type": "int" },
>> { "name": "timestamp", "type": "long" },
>> { "name": "hostname", "type": "string" },
>> { "name": "message", "type": "string" }
>> ]
>> }
>>
>>
>> ------------------------------------------------------------------------------------------------
>>
>> This is the LogEvent created using maven-avro and little customized
>>
>> @SuppressWarnings("all")
>> public class LogEvent extends SpecificRecordBase implements
>> SpecificRecord {
>> public static final Schema _SCHEMA =
>> Schema.parse("{\"type\":\"record\",\"name\":\"LogEvent\",\"namespace\":\"org.apache.flume.serialization\",\"fields\":[{\"name\":\"srno\",\"type\":\"int\"},{\"name\":\"severity\",\"type\":\"int\"},{\"name\":\"timestamp\",\"type\":\"long\"},{\"name\":\"hostname\",\"type\":\"string\"},{\"name\":\"message\",\"type\":\"string\"}]}");
>> public int srno;
>> public String severity;
>> public long timestamp;
>> public String hostname;
>> public String message;
>>
>> public Schema getSchema() { return _SCHEMA; }
>> public Object get(int _field) {
>> switch (_field) {
>> case 0: return srno;
>> case 1: return severity;
>> case 2: return timestamp;
>> case 3: return hostname;
>> case 4: return message;
>> default: throw new AvroRuntimeException("Bad index");
>> }
>> }
>>
>> @SuppressWarnings(value="unchecked")
>> public void set(int _field, Object _value) {
>> switch (_field) {
>> case 0: srno = (Integer)_value; break;
>> case 1: severity = (String)_value; break;
>> case 2: timestamp = (Long)_value; break;
>> case 3: hostname = (String)_value; break;
>> case 4: message = (String)_value; break;
>> default: throw new AvroRuntimeException("Bad index");
>> }
>> }
>>
>> public void setSrno(int srno) {
>> this.srno = srno;
>> }
>> public void setSeverity(String s) {
>> severity = s;
>> }
>> public String getSeverity() {
>> return severity;
>> }
>>
>> public void setTimestamp(long t) {
>> timestamp = t;
>> }
>>
>> public long getTimestamp() {
>> return timestamp;
>> }
>>
>> public void setHostname(String h) {
>> hostname = h;
>> }
>>
>> public String getHostname() {
>> return hostname;
>> }
>>
>> public void setMessage(String m) {
>> message = m;
>> }
>>
>> public String getMessage() {
>> return message;
>> }
>>
>> @Override
>> public void put(int field, Object value) {
>> // TODO Auto-generated method stub
>>
>> }
>> }
>>
>> -----------------------------------------------------------------------------------------------------------------------------------------------------------------------
>> agent2.sources = seqGenSrc
>> agent2.channels = memoryChannel
>> agent2.sinks = loggerSink
>>
>>
>> agent2.sources.seqGenSrc.type = avro
>> agent2.sources.seqGenSrc.bind=slcso-poc2-lnx
>> agent2.sources.seqGenSrc.port=41414
>>
>> #agent2.sources.seqGenSrc.interceptors = time hostInterceptor
>> #agent2.sources.seqGenSrc.interceptors.hostInterceptor.type =
>> org.apache.flume.interceptor.HostInterceptor$Builder
>> #agent2.sources.seqGenSrc.interceptors.hostInterceptor.hostHeader = host
>> #agent2.sources.seqGenSrc.interceptors.hostInterceptor.useIP = false
>> #agent2.sources.seqGenSrc.interceptors.hostInterceptor.hostHeader.preserveExisting
>> = false
>> #agent2.sources.seqGenSrc.interceptors.time.type =
>> org.apache.flume.interceptor.TimestampInterceptor$Builder
>>
>> agent2.channels.memoryChannel.type = memory
>> agent2.channels.memoryChannel.capacity = 1000000
>> agent2.channels.memoryChannel.transactionCapacity = 1000000
>> agent2.channels.memoryChannel.keep-alive = 30
>>
>> agent2.sources.seqGenSrc.channels = memoryChannel
>>
>> agent2.sinks.loggerSink.type = hdfs
>> #agent2.sinks.loggerSink.hdfs.path = hdfs://
>> 10.105.39.204:8020/data/CspcLogs
>> agent2.sinks.loggerSink.hdfs.path =
>> hdfs://slcso-poc4-lnx:8020/data/cssplogs
>> agent2.sinks.loggerSink.hdfs.fileType = DataStream
>> #agent2.sinks.loggerSink.hdfs.writeFormat = Text
>>
>> agent2.sinks.loggerSink.channel = memoryChannel
>> #agent2.sinks.loggerSink.serializer =
>> org.apache.flume.serialization.BodyTextEventSerializer
>> #agent2.sinks.loggerSink.serializer = avro_event
>> agent2.sinks.loggerSink.serializer =
>> org.apache.flume.serialization.CustomLogAvroEventSerializer
>> agent2.sinks.loggerSink.serializer.compressionCodec = snappy
>> #agent2.sinks.loggerSink.serializer.syncIntervalBytes = 2048000
>> agent2.channels.memoryChannel.type = memory
>> ~
>>
>>
>>
>> -----------------------------------------------------------------------------------------------------------------
>> The following is my class
>>
>> public class CustomLogAvroEventSerializer extends
>> AbstractAvroEventSerializer<LogEvent> {
>>
>> private static final DateTimeFormatter dateFmt1 =
>> DateTimeFormat.forPattern("MMM dd HH:mm:ss").withZoneUTC();
>>
>> private static final DateTimeFormatter dateFmt2 =
>> DateTimeFormat.forPattern("MMM d HH:mm:ss").withZoneUTC();
>>
>>
>> private static final Logger logger =
>> LoggerFactory.getLogger(CustomLogAvroEventSerializer.class);
>>
>> private final OutputStream out;
>> private final Schema schema;
>>
>> public CustomLogAvroEventSerializer(OutputStream out) throws
>> IOException {
>> this.out = out;
>> this.schema =new LogEvent().getSchema();;
>> }
>>
>> @Override
>> protected OutputStream getOutputStream() {
>> return out;
>> }
>>
>> @Override
>> protected Schema getSchema() {
>> return schema;
>> }
>>
>> // very simple rfc3164 parser
>> @Override
>> protected LogEvent convert(Event event) {
>> LogEvent sle = new LogEvent();
>>
>> // Stringify body so it's easy to parse.
>> // This is a pretty inefficient way to do it.
>> String msg = new String(event.getBody(), Charsets.UTF_8);
>>
>> // parser read pointer
>> int seek = 0;
>>
>> // Check Flume headers to see if we came from SyslogTcp(or UDP)
>> Source,
>> // which at the time of this writing only parses the priority.
>> // This is a bit schizophrenic and it should parse all the fields
>> or none.
>> Map<String, String> headers = event.getHeaders();
>> boolean fromSyslogSource = false;
>> if (headers.containsKey(SyslogUtils.SYSLOG_SRNO)) {
>> fromSyslogSource = true;
>> int srno = Integer.parseInt(headers.get("srno"));
>> sle.setSrno(srno);
>> }else{
>> sle.setSrno(121);
>> }
>> if (headers.containsKey(SyslogUtils.SYSLOG_SEVERITY)) {
>> fromSyslogSource = true;
>> String severity = headers.get(SyslogUtils.SYSLOG_SEVERITY);
>> sle.setSeverity(severity);
>> }
>>
>> // assume the message was received raw (maybe via NetcatSource)
>> // parse the priority string
>> if (!fromSyslogSource) {
>> if (msg.charAt(0) == '<') {
>> int end = msg.indexOf(">");
>> if (end > -1) {
>> seek = end + 1;
>> String priStr = msg.substring(1, end);
>> // int priority = Integer.parseInt(priStr);
>> // String severity = priStr;
>>
>> sle.setSeverity(priStr);
>> }
>> }
>> }
>>
>> // parse the timestamp
>> String timestampStr = msg.substring(seek, seek + 15);
>> long ts = parseRfc3164Date(timestampStr);
>> if (ts != 0) {
>> sle.setTimestamp(ts);
>> seek += 15 + 1; // space after timestamp
>> }
>>
>> // parse the hostname
>> int nextSpace = msg.indexOf(' ', seek);
>> if (nextSpace > -1) {
>> String hostname = msg.substring(seek, nextSpace);
>> sle.setHostname(hostname);
>> seek = nextSpace + 1;
>> }
>>
>> // everything else is the message
>> String actualMessage = msg.substring(seek);
>> sle.setMessage(actualMessage);
>>
>> logger.debug("Serialized event as: {}", sle);
>>
>> return sle;
>> }
>>
>> private static long parseRfc3164Date(String in) {
>> DateTime date = null;
>> try {
>> date = dateFmt1.parseDateTime(in);
>> } catch (IllegalArgumentException e) {
>> // ignore the exception, we act based on nullity of date
>> object
>> logger.debug("Date parse failed on ({}), trying
>> single-digit date", in);
>> }
>>
>> if (date == null) {
>> try {
>> date = dateFmt2.parseDateTime(in);
>> } catch (IllegalArgumentException e) {
>> // ignore the exception, we act based on nullity of date
>> object
>> logger.debug("2nd date parse failed on ({}), unknown date
>> format", in);
>> }
>> }
>>
>> // hacky stuff to try and deal with boundary cases, i.e. new
>> year's eve.
>> // rfc3164 dates are really dumb.
>> // NB: cannot handle replaying of old logs or going back to
>> the future
>> if (date != null) {
>> DateTime now = new DateTime();
>> int year = now.getYear();
>> DateTime corrected = date.withYear(year);
>>
>> // flume clock is ahead or there is some latency, and the
>> year rolled
>> if (corrected.isAfter(now) &&
>> corrected.minusMonths(1).isAfter(now)) {
>> corrected = date.withYear(year - 1);
>> // flume clock is behind and the year rolled
>> } else if (corrected.isBefore(now) &&
>> corrected.plusMonths(1).isBefore(now)) {
>> corrected = date.withYear(year + 1);
>> }
>> date = corrected;
>> }
>>
>> if (date == null) {
>> return 0;
>> }
>>
>> return date.getMillis();
>> }
>>
>> public static class Builder implements EventSerializer.Builder {
>>
>> @Override
>> public EventSerializer build(Context context, OutputStream
>> out) {
>> CustomLogAvroEventSerializer writer = null;
>> try {
>> writer = new CustomLogAvroEventSerializer(out);
>> writer.configure(context);
>> } catch (IOException e) {
>> logger.error("Unable to parse schema file. Exception
>> follows.", e);
>> }
>> return writer;
>> }
>>
>> }
>>
>>
>>
>> }
>>
>> Please suggest me i need output like this and i want to customize like
>> log4j
>>
>> -------------------------------------------------------------------------------------------------------------------------------------
>> 172 [main] FATAL com.cisco.flume.FlumeTest - Sample fatal message
>> 188 [main] INFO com.cisco.flume.FlumeTest - Sample info message1
>> 203 [main] WARN com.cisco.flume.FlumeTest - Sample warn message
>> 219 [main] INFO com.cisco.flume.FlumeTest - Sample info message2
>> 219 [main] INFO com.cisco.flume.FlumeTest - Sample info message3
>> 266 [main] ERROR com.cisco.flume.FlumeTest - Sample error message
>> 282 [main] FATAL com.cisco.flume.FlumeTest - Sample fatal message
>> 282 [main] INFO com.cisco.flume.FlumeTest - Sample info message4
>>
>>
>> --
>> JP
>>
>>
>>
>> --
>> JP
>>
>>
>>
>
>
> --
> JP
>
>
>
--
JP
Re: Custom Serializer is not working
Posted by Mubarak Seyed <se...@apple.com>.
can you please try with
>> agent2.sinks.loggerSink.serializer = org.apache.flume.serialization.CustomLogAvroEventSerializer$Builder
On Aug 2, 2012, at 8:39 PM, JP wrote:
> Thanks Mubarak,
>
> I have added the jar to lib .
>
> After that only im getting the exception.
>
> Any help to resolve this issue.
>
> On Fri, Aug 3, 2012 at 9:05 AM, Mubarak Seyed <se...@apple.com> wrote:
> Do you have a custom jar which contains org.apache.flume.serialization.CustomLogAvroEventSerializer in flume classpath? You can copy the custom jar file to <FLUME_HOME>/lib directory.
>
> ERROR serialization.EventSerializerFactory: Unable to instantiate Builder from org.apache.flume.serialization.CustomLogAvroEventSerializer
>
> On Aug 2, 2012, at 8:26 PM, JP wrote:
>
>> HI,
>>
>> Im getting errros
>>
>> 2012-08-02 16:58:50,065 INFO source.AvroSource: Avro source seqGenSrc started.
>> 2012-08-02 16:59:02,463 INFO ipc.NettyServer: [id: 0x45cbda0a, /localhost=> / localhost 41414] OPEN
>> 2012-08-02 16:59:02,466 INFO ipc.NettyServer: [id: 0x45cbda0a, /localhost=> / localhost :41414] BOUND: / localhost :41414
>> 2012-08-02 16:59:02,466 INFO ipc.NettyServer: [id: 0x45cbda0a, /localhost:3770 => / localhost :41414] CONNECTED: / localhost :3770
>> 2012-08-02 16:59:04,006 INFO hdfs.BucketWriter: Creating hdfs:// localhost :8020/data/cssplogs/FlumeData.1343906943264.tmp
>> 2012-08-02 16:59:04,167 ERROR serialization.EventSerializerFactory: Unable to instantiate Builder from org.apache.flume.serialization.CustomLogAvroEventSerializer
>> 2012-08-02 16:59:04,168 WARN hdfs.HDFSEventSink: HDFS IO error
>> java.io.IOException: java.lang.NullPointerException
>> at org.apache.flume.sink.hdfs.BucketWriter.doOpen(BucketWriter.java:202)
>> at org.apache.flume.sink.hdfs.BucketWriter.access$000(BucketWriter.java:48)
>> at org.apache.flume.sink.hdfs.BucketWriter$1.run(BucketWriter.java:155)
>> at org.apache.flume.sink.hdfs.BucketWriter$1.run(BucketWriter.java:152)
>> at org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:125)
>> at org.apache.flume.sink.hdfs.BucketWriter.open(BucketWriter.java:152)
>> at org.apache.flume.sink.hdfs.BucketWriter.append(BucketWriter.java:307)
>> at org.apache.flume.sink.hdfs.HDFSEventSink$1.call(HDFSEventSink.java:717)
>> at org.apache.flume.sink.hdfs.HDFSEventSink$1.call(HDFSEventSink.java:714)
>> at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
>> at java.util.concurrent.FutureTask.run(FutureTask.java:138)
>> at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
>> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
>> at java.lang.Thread.run(Thread.java:662)
>> Caused by: java.lang.NullPointerException
>> at org.apache.flume.sink.hdfs.HDFSDataStream.open(HDFSDataStream.java:75)
>> at org.apache.flume.sink.hdfs.BucketWriter.doOpen(BucketWriter.java:188)
>> ... 13 more
>> 2012-08-02 16:59:05,239 INFO hdfs.BucketWriter: Creating hdfs:// localhost :8020/data/cssplogs/FlumeData.1343906943265.tmp
>> 2012-08-02 16:59:05,392 ERROR serialization.EventSerializerFactory: Unable to instantiate Builder from org.apache.flume.serialization.CustomLogAvroEventSerializer
>> 2012-08-02 16:59:05,392 WARN hdfs.HDFSEventSink: HDFS IO error
>> java.io.IOException: java.lang.NullPointerException
>> at org.apache.flume.sink.hdfs.BucketWriter.doOpen(BucketWriter.java:202)
>> at org.apache.flume.sink.hdfs.BucketWriter.access$000(BucketWriter.java:48)
>> at org.apache.flume.sink.hdfs.BucketWriter$1.run(BucketWriter.java:155)
>> at org.apache.flume.sink.hdfs.BucketWriter$1.run(BucketWriter.java:152)
>> at org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:125)
>> at org.apache.flume.sink.hdfs.BucketWriter.open(BucketWriter.java:152)
>> at org.apache.flume.sink.hdfs.BucketWriter.append(BucketWriter.java:307)
>> at org.apache.flume.sink.hdfs.HDFSEventSink$1.call(HDFSEventSink.java:717)
>> at org.apache.flume.sink.hdfs.HDFSEventSink$1.call(HDFSEventSink.java:714)
>> at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
>> at java.util.concurrent.FutureTask.run(FutureTask.java:138)
>> at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
>> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
>> at java.lang.Thread.run(Thread.java:662)
>>
>> -----------------------------------------------------------------------------------------------------------------
>>
>>
>> This is my avro file
>>
>> { "type": "record", "name": "LogEvent", "namespace": "org.apache.flume.serialization",
>> "fields": [
>> { "name": "srno", "type": "int" },
>> { "name": "severity", "type": "int" },
>> { "name": "timestamp", "type": "long" },
>> { "name": "hostname", "type": "string" },
>> { "name": "message", "type": "string" }
>> ]
>> }
>>
>> ------------------------------------------------------------------------------------------------
>>
>> This is the LogEvent created using maven-avro and little customized
>>
>> @SuppressWarnings("all")
>> public class LogEvent extends SpecificRecordBase implements SpecificRecord {
>> public static final Schema _SCHEMA = Schema.parse("{\"type\":\"record\",\"name\":\"LogEvent\",\"namespace\":\"org.apache.flume.serialization\",\"fields\":[{\"name\":\"srno\",\"type\":\"int\"},{\"name\":\"severity\",\"type\":\"int\"},{\"name\":\"timestamp\",\"type\":\"long\"},{\"name\":\"hostname\",\"type\":\"string\"},{\"name\":\"message\",\"type\":\"string\"}]}");
>> public int srno;
>> public String severity;
>> public long timestamp;
>> public String hostname;
>> public String message;
>>
>> public Schema getSchema() { return _SCHEMA; }
>> public Object get(int _field) {
>> switch (_field) {
>> case 0: return srno;
>> case 1: return severity;
>> case 2: return timestamp;
>> case 3: return hostname;
>> case 4: return message;
>> default: throw new AvroRuntimeException("Bad index");
>> }
>> }
>>
>> @SuppressWarnings(value="unchecked")
>> public void set(int _field, Object _value) {
>> switch (_field) {
>> case 0: srno = (Integer)_value; break;
>> case 1: severity = (String)_value; break;
>> case 2: timestamp = (Long)_value; break;
>> case 3: hostname = (String)_value; break;
>> case 4: message = (String)_value; break;
>> default: throw new AvroRuntimeException("Bad index");
>> }
>> }
>>
>> public void setSrno(int srno) {
>> this.srno = srno;
>> }
>> public void setSeverity(String s) {
>> severity = s;
>> }
>> public String getSeverity() {
>> return severity;
>> }
>>
>> public void setTimestamp(long t) {
>> timestamp = t;
>> }
>>
>> public long getTimestamp() {
>> return timestamp;
>> }
>>
>> public void setHostname(String h) {
>> hostname = h;
>> }
>>
>> public String getHostname() {
>> return hostname;
>> }
>>
>> public void setMessage(String m) {
>> message = m;
>> }
>>
>> public String getMessage() {
>> return message;
>> }
>>
>> @Override
>> public void put(int field, Object value) {
>> // TODO Auto-generated method stub
>>
>> }
>> }
>> -----------------------------------------------------------------------------------------------------------------------------------------------------------------------
>> agent2.sources = seqGenSrc
>> agent2.channels = memoryChannel
>> agent2.sinks = loggerSink
>>
>>
>> agent2.sources.seqGenSrc.type = avro
>> agent2.sources.seqGenSrc.bind=slcso-poc2-lnx
>> agent2.sources.seqGenSrc.port=41414
>>
>> #agent2.sources.seqGenSrc.interceptors = time hostInterceptor
>> #agent2.sources.seqGenSrc.interceptors.hostInterceptor.type = org.apache.flume.interceptor.HostInterceptor$Builder
>> #agent2.sources.seqGenSrc.interceptors.hostInterceptor.hostHeader = host
>> #agent2.sources.seqGenSrc.interceptors.hostInterceptor.useIP = false
>> #agent2.sources.seqGenSrc.interceptors.hostInterceptor.hostHeader.preserveExisting = false
>> #agent2.sources.seqGenSrc.interceptors.time.type = org.apache.flume.interceptor.TimestampInterceptor$Builder
>>
>> agent2.channels.memoryChannel.type = memory
>> agent2.channels.memoryChannel.capacity = 1000000
>> agent2.channels.memoryChannel.transactionCapacity = 1000000
>> agent2.channels.memoryChannel.keep-alive = 30
>>
>> agent2.sources.seqGenSrc.channels = memoryChannel
>>
>> agent2.sinks.loggerSink.type = hdfs
>> #agent2.sinks.loggerSink.hdfs.path = hdfs://10.105.39.204:8020/data/CspcLogs
>> agent2.sinks.loggerSink.hdfs.path = hdfs://slcso-poc4-lnx:8020/data/cssplogs
>> agent2.sinks.loggerSink.hdfs.fileType = DataStream
>> #agent2.sinks.loggerSink.hdfs.writeFormat = Text
>>
>> agent2.sinks.loggerSink.channel = memoryChannel
>> #agent2.sinks.loggerSink.serializer = org.apache.flume.serialization.BodyTextEventSerializer
>> #agent2.sinks.loggerSink.serializer = avro_event
>> agent2.sinks.loggerSink.serializer = org.apache.flume.serialization.CustomLogAvroEventSerializer
>> agent2.sinks.loggerSink.serializer.compressionCodec = snappy
>> #agent2.sinks.loggerSink.serializer.syncIntervalBytes = 2048000
>> agent2.channels.memoryChannel.type = memory
>> ~
>>
>>
>> -----------------------------------------------------------------------------------------------------------------
>> The following is my class
>>
>> public class CustomLogAvroEventSerializer extends
>> AbstractAvroEventSerializer<LogEvent> {
>>
>> private static final DateTimeFormatter dateFmt1 =
>> DateTimeFormat.forPattern("MMM dd HH:mm:ss").withZoneUTC();
>>
>> private static final DateTimeFormatter dateFmt2 =
>> DateTimeFormat.forPattern("MMM d HH:mm:ss").withZoneUTC();
>>
>>
>> private static final Logger logger =
>> LoggerFactory.getLogger(CustomLogAvroEventSerializer.class);
>>
>> private final OutputStream out;
>> private final Schema schema;
>>
>> public CustomLogAvroEventSerializer(OutputStream out) throws IOException {
>> this.out = out;
>> this.schema =new LogEvent().getSchema();;
>> }
>>
>> @Override
>> protected OutputStream getOutputStream() {
>> return out;
>> }
>>
>> @Override
>> protected Schema getSchema() {
>> return schema;
>> }
>>
>> // very simple rfc3164 parser
>> @Override
>> protected LogEvent convert(Event event) {
>> LogEvent sle = new LogEvent();
>>
>> // Stringify body so it's easy to parse.
>> // This is a pretty inefficient way to do it.
>> String msg = new String(event.getBody(), Charsets.UTF_8);
>>
>> // parser read pointer
>> int seek = 0;
>>
>> // Check Flume headers to see if we came from SyslogTcp(or UDP) Source,
>> // which at the time of this writing only parses the priority.
>> // This is a bit schizophrenic and it should parse all the fields or none.
>> Map<String, String> headers = event.getHeaders();
>> boolean fromSyslogSource = false;
>> if (headers.containsKey(SyslogUtils.SYSLOG_SRNO)) {
>> fromSyslogSource = true;
>> int srno = Integer.parseInt(headers.get("srno"));
>> sle.setSrno(srno);
>> }else{
>> sle.setSrno(121);
>> }
>> if (headers.containsKey(SyslogUtils.SYSLOG_SEVERITY)) {
>> fromSyslogSource = true;
>> String severity = headers.get(SyslogUtils.SYSLOG_SEVERITY);
>> sle.setSeverity(severity);
>> }
>>
>> // assume the message was received raw (maybe via NetcatSource)
>> // parse the priority string
>> if (!fromSyslogSource) {
>> if (msg.charAt(0) == '<') {
>> int end = msg.indexOf(">");
>> if (end > -1) {
>> seek = end + 1;
>> String priStr = msg.substring(1, end);
>> // int priority = Integer.parseInt(priStr);
>> // String severity = priStr;
>>
>> sle.setSeverity(priStr);
>> }
>> }
>> }
>>
>> // parse the timestamp
>> String timestampStr = msg.substring(seek, seek + 15);
>> long ts = parseRfc3164Date(timestampStr);
>> if (ts != 0) {
>> sle.setTimestamp(ts);
>> seek += 15 + 1; // space after timestamp
>> }
>>
>> // parse the hostname
>> int nextSpace = msg.indexOf(' ', seek);
>> if (nextSpace > -1) {
>> String hostname = msg.substring(seek, nextSpace);
>> sle.setHostname(hostname);
>> seek = nextSpace + 1;
>> }
>>
>> // everything else is the message
>> String actualMessage = msg.substring(seek);
>> sle.setMessage(actualMessage);
>>
>> logger.debug("Serialized event as: {}", sle);
>>
>> return sle;
>> }
>>
>> private static long parseRfc3164Date(String in) {
>> DateTime date = null;
>> try {
>> date = dateFmt1.parseDateTime(in);
>> } catch (IllegalArgumentException e) {
>> // ignore the exception, we act based on nullity of date object
>> logger.debug("Date parse failed on ({}), trying single-digit date", in);
>> }
>>
>> if (date == null) {
>> try {
>> date = dateFmt2.parseDateTime(in);
>> } catch (IllegalArgumentException e) {
>> // ignore the exception, we act based on nullity of date object
>> logger.debug("2nd date parse failed on ({}), unknown date format", in);
>> }
>> }
>>
>> // hacky stuff to try and deal with boundary cases, i.e. new year's eve.
>> // rfc3164 dates are really dumb.
>> // NB: cannot handle replaying of old logs or going back to the future
>> if (date != null) {
>> DateTime now = new DateTime();
>> int year = now.getYear();
>> DateTime corrected = date.withYear(year);
>>
>> // flume clock is ahead or there is some latency, and the year rolled
>> if (corrected.isAfter(now) && corrected.minusMonths(1).isAfter(now)) {
>> corrected = date.withYear(year - 1);
>> // flume clock is behind and the year rolled
>> } else if (corrected.isBefore(now) && corrected.plusMonths(1).isBefore(now)) {
>> corrected = date.withYear(year + 1);
>> }
>> date = corrected;
>> }
>>
>> if (date == null) {
>> return 0;
>> }
>>
>> return date.getMillis();
>> }
>>
>> public static class Builder implements EventSerializer.Builder {
>>
>> @Override
>> public EventSerializer build(Context context, OutputStream out) {
>> CustomLogAvroEventSerializer writer = null;
>> try {
>> writer = new CustomLogAvroEventSerializer(out);
>> writer.configure(context);
>> } catch (IOException e) {
>> logger.error("Unable to parse schema file. Exception follows.", e);
>> }
>> return writer;
>> }
>>
>> }
>>
>>
>>
>> }
>>
>> Please suggest me i need output like this and i want to customize like log4j
>> -------------------------------------------------------------------------------------------------------------------------------------
>> 172 [main] FATAL com.cisco.flume.FlumeTest - Sample fatal message
>> 188 [main] INFO com.cisco.flume.FlumeTest - Sample info message1
>> 203 [main] WARN com.cisco.flume.FlumeTest - Sample warn message
>> 219 [main] INFO com.cisco.flume.FlumeTest - Sample info message2
>> 219 [main] INFO com.cisco.flume.FlumeTest - Sample info message3
>> 266 [main] ERROR com.cisco.flume.FlumeTest - Sample error message
>> 282 [main] FATAL com.cisco.flume.FlumeTest - Sample fatal message
>> 282 [main] INFO com.cisco.flume.FlumeTest - Sample info message4
>>
>>
>> --
>> JP
>>
>>
>>
>> --
>> JP
>
>
>
>
> --
> JP
Re: Custom Serializer is not working
Posted by JP <jp...@gmail.com>.
Thanks Mubarak,
I have added the jar to lib .
After that only im getting the exception.
Any help to resolve this issue.
On Fri, Aug 3, 2012 at 9:05 AM, Mubarak Seyed <se...@apple.com> wrote:
> Do you have a custom jar which contains
> org.apache.flume.serialization.CustomLogAvroEventSerializer in flume
> classpath? You can copy the custom jar file to <FLUME_HOME>/lib directory.
>
> ERROR serialization.EventSerializerFactory: Unable to instantiate Builder
> from org.apache.flume.serialization.CustomLogAvroEventSerializer
>
> On Aug 2, 2012, at 8:26 PM, JP wrote:
>
> HI,
>
> Im getting errros
>
> 2012-08-02 16:58:50,065 INFO source.AvroSource: Avro source seqGenSrc
> started.
> 2012-08-02 16:59:02,463 INFO ipc.NettyServer: [id: 0x45cbda0a,
> /localhost=> / localhost 41414 <http://10.105.39.202:41414/>] OPEN
> 2012-08-02 16:59:02,466 INFO ipc.NettyServer: [id: 0x45cbda0a,
> /localhost=> / localhost :41414 <http://10.105.39.202:41414/>] BOUND: /
> localhost :41414 <http://10.105.39.202:41414/>
> 2012-08-02 16:59:02,466 INFO ipc.NettyServer: [id: 0x45cbda0a, /localhost
> :3770 <http://10.77.235.245:3770/> => / localhost :41414<http://10.105.39.202:41414/>]
> CONNECTED: / localhost :3770 <http://10.77.235.245:3770/>
> 2012-08-02 16:59:04,006 INFO hdfs.BucketWriter: Creating hdfs://
> localhost :8020/data/cssplogs/FlumeData.1343906943264.tmp
> 2012-08-02 16:59:04,167 ERROR serialization.EventSerializerFactory: Unable
> to instantiate Builder from
> org.apache.flume.serialization.CustomLogAvroEventSerializer
> 2012-08-02 16:59:04,168 WARN hdfs.HDFSEventSink: HDFS IO error
> java.io.IOException: java.lang.NullPointerException
> at
> org.apache.flume.sink.hdfs.BucketWriter.doOpen(BucketWriter.java:202)
> at
> org.apache.flume.sink.hdfs.BucketWriter.access$000(BucketWriter.java:48)
> at
> org.apache.flume.sink.hdfs.BucketWriter$1.run(BucketWriter.java:155)
> at
> org.apache.flume.sink.hdfs.BucketWriter$1.run(BucketWriter.java:152)
> at
> org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:125)
> at
> org.apache.flume.sink.hdfs.BucketWriter.open(BucketWriter.java:152)
> at
> org.apache.flume.sink.hdfs.BucketWriter.append(BucketWriter.java:307)
> at
> org.apache.flume.sink.hdfs.HDFSEventSink$1.call(HDFSEventSink.java:717)
> at
> org.apache.flume.sink.hdfs.HDFSEventSink$1.call(HDFSEventSink.java:714)
> at
> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
> at java.util.concurrent.FutureTask.run(FutureTask.java:138)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
> at java.lang.Thread.run(Thread.java:662)
> Caused by: java.lang.NullPointerException
> at
> org.apache.flume.sink.hdfs.HDFSDataStream.open(HDFSDataStream.java:75)
> at
> org.apache.flume.sink.hdfs.BucketWriter.doOpen(BucketWriter.java:188)
> ... 13 more
> 2012-08-02 16:59:05,239 INFO hdfs.BucketWriter: Creating hdfs://
> localhost :8020/data/cssplogs/FlumeData.1343906943265.tmp
> 2012-08-02 16:59:05,392 ERROR serialization.EventSerializerFactory: Unable
> to instantiate Builder from
> org.apache.flume.serialization.CustomLogAvroEventSerializer
> 2012-08-02 16:59:05,392 WARN hdfs.HDFSEventSink: HDFS IO error
> java.io.IOException: java.lang.NullPointerException
> at
> org.apache.flume.sink.hdfs.BucketWriter.doOpen(BucketWriter.java:202)
> at
> org.apache.flume.sink.hdfs.BucketWriter.access$000(BucketWriter.java:48)
> at
> org.apache.flume.sink.hdfs.BucketWriter$1.run(BucketWriter.java:155)
> at
> org.apache.flume.sink.hdfs.BucketWriter$1.run(BucketWriter.java:152)
> at
> org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:125)
> at
> org.apache.flume.sink.hdfs.BucketWriter.open(BucketWriter.java:152)
> at
> org.apache.flume.sink.hdfs.BucketWriter.append(BucketWriter.java:307)
> at
> org.apache.flume.sink.hdfs.HDFSEventSink$1.call(HDFSEventSink.java:717)
> at
> org.apache.flume.sink.hdfs.HDFSEventSink$1.call(HDFSEventSink.java:714)
> at
> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
> at java.util.concurrent.FutureTask.run(FutureTask.java:138)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
> at java.lang.Thread.run(Thread.java:662)
>
>
> -----------------------------------------------------------------------------------------------------------------
>
>
> This is my avro file
>
> { "type": "record", "name": "LogEvent", "namespace":
> "org.apache.flume.serialization",
> "fields": [
> { "name": "srno", "type": "int" },
> { "name": "severity", "type": "int" },
> { "name": "timestamp", "type": "long" },
> { "name": "hostname", "type": "string" },
> { "name": "message", "type": "string" }
> ]
> }
>
>
> ------------------------------------------------------------------------------------------------
>
> This is the LogEvent created using maven-avro and little customized
>
> @SuppressWarnings("all")
> public class LogEvent extends SpecificRecordBase implements SpecificRecord
> {
> public static final Schema _SCHEMA =
> Schema.parse("{\"type\":\"record\",\"name\":\"LogEvent\",\"namespace\":\"org.apache.flume.serialization\",\"fields\":[{\"name\":\"srno\",\"type\":\"int\"},{\"name\":\"severity\",\"type\":\"int\"},{\"name\":\"timestamp\",\"type\":\"long\"},{\"name\":\"hostname\",\"type\":\"string\"},{\"name\":\"message\",\"type\":\"string\"}]}");
> public int srno;
> public String severity;
> public long timestamp;
> public String hostname;
> public String message;
>
> public Schema getSchema() { return _SCHEMA; }
> public Object get(int _field) {
> switch (_field) {
> case 0: return srno;
> case 1: return severity;
> case 2: return timestamp;
> case 3: return hostname;
> case 4: return message;
> default: throw new AvroRuntimeException("Bad index");
> }
> }
>
> @SuppressWarnings(value="unchecked")
> public void set(int _field, Object _value) {
> switch (_field) {
> case 0: srno = (Integer)_value; break;
> case 1: severity = (String)_value; break;
> case 2: timestamp = (Long)_value; break;
> case 3: hostname = (String)_value; break;
> case 4: message = (String)_value; break;
> default: throw new AvroRuntimeException("Bad index");
> }
> }
>
> public void setSrno(int srno) {
> this.srno = srno;
> }
> public void setSeverity(String s) {
> severity = s;
> }
> public String getSeverity() {
> return severity;
> }
>
> public void setTimestamp(long t) {
> timestamp = t;
> }
>
> public long getTimestamp() {
> return timestamp;
> }
>
> public void setHostname(String h) {
> hostname = h;
> }
>
> public String getHostname() {
> return hostname;
> }
>
> public void setMessage(String m) {
> message = m;
> }
>
> public String getMessage() {
> return message;
> }
>
> @Override
> public void put(int field, Object value) {
> // TODO Auto-generated method stub
>
> }
> }
>
> -----------------------------------------------------------------------------------------------------------------------------------------------------------------------
> agent2.sources = seqGenSrc
> agent2.channels = memoryChannel
> agent2.sinks = loggerSink
>
>
> agent2.sources.seqGenSrc.type = avro
> agent2.sources.seqGenSrc.bind=slcso-poc2-lnx
> agent2.sources.seqGenSrc.port=41414
>
> #agent2.sources.seqGenSrc.interceptors = time hostInterceptor
> #agent2.sources.seqGenSrc.interceptors.hostInterceptor.type =
> org.apache.flume.interceptor.HostInterceptor$Builder
> #agent2.sources.seqGenSrc.interceptors.hostInterceptor.hostHeader = host
> #agent2.sources.seqGenSrc.interceptors.hostInterceptor.useIP = false
> #agent2.sources.seqGenSrc.interceptors.hostInterceptor.hostHeader.preserveExisting
> = false
> #agent2.sources.seqGenSrc.interceptors.time.type =
> org.apache.flume.interceptor.TimestampInterceptor$Builder
>
> agent2.channels.memoryChannel.type = memory
> agent2.channels.memoryChannel.capacity = 1000000
> agent2.channels.memoryChannel.transactionCapacity = 1000000
> agent2.channels.memoryChannel.keep-alive = 30
>
> agent2.sources.seqGenSrc.channels = memoryChannel
>
> agent2.sinks.loggerSink.type = hdfs
> #agent2.sinks.loggerSink.hdfs.path = hdfs://
> 10.105.39.204:8020/data/CspcLogs
> agent2.sinks.loggerSink.hdfs.path =
> hdfs://slcso-poc4-lnx:8020/data/cssplogs
> agent2.sinks.loggerSink.hdfs.fileType = DataStream
> #agent2.sinks.loggerSink.hdfs.writeFormat = Text
>
> agent2.sinks.loggerSink.channel = memoryChannel
> #agent2.sinks.loggerSink.serializer =
> org.apache.flume.serialization.BodyTextEventSerializer
> #agent2.sinks.loggerSink.serializer = avro_event
> agent2.sinks.loggerSink.serializer =
> org.apache.flume.serialization.CustomLogAvroEventSerializer
> agent2.sinks.loggerSink.serializer.compressionCodec = snappy
> #agent2.sinks.loggerSink.serializer.syncIntervalBytes = 2048000
> agent2.channels.memoryChannel.type = memory
> ~
>
>
>
> -----------------------------------------------------------------------------------------------------------------
> The following is my class
>
> public class CustomLogAvroEventSerializer extends
> AbstractAvroEventSerializer<LogEvent> {
>
> private static final DateTimeFormatter dateFmt1 =
> DateTimeFormat.forPattern("MMM dd HH:mm:ss").withZoneUTC();
>
> private static final DateTimeFormatter dateFmt2 =
> DateTimeFormat.forPattern("MMM d HH:mm:ss").withZoneUTC();
>
>
> private static final Logger logger =
> LoggerFactory.getLogger(CustomLogAvroEventSerializer.class);
>
> private final OutputStream out;
> private final Schema schema;
>
> public CustomLogAvroEventSerializer(OutputStream out) throws
> IOException {
> this.out = out;
> this.schema =new LogEvent().getSchema();;
> }
>
> @Override
> protected OutputStream getOutputStream() {
> return out;
> }
>
> @Override
> protected Schema getSchema() {
> return schema;
> }
>
> // very simple rfc3164 parser
> @Override
> protected LogEvent convert(Event event) {
> LogEvent sle = new LogEvent();
>
> // Stringify body so it's easy to parse.
> // This is a pretty inefficient way to do it.
> String msg = new String(event.getBody(), Charsets.UTF_8);
>
> // parser read pointer
> int seek = 0;
>
> // Check Flume headers to see if we came from SyslogTcp(or UDP)
> Source,
> // which at the time of this writing only parses the priority.
> // This is a bit schizophrenic and it should parse all the fields
> or none.
> Map<String, String> headers = event.getHeaders();
> boolean fromSyslogSource = false;
> if (headers.containsKey(SyslogUtils.SYSLOG_SRNO)) {
> fromSyslogSource = true;
> int srno = Integer.parseInt(headers.get("srno"));
> sle.setSrno(srno);
> }else{
> sle.setSrno(121);
> }
> if (headers.containsKey(SyslogUtils.SYSLOG_SEVERITY)) {
> fromSyslogSource = true;
> String severity = headers.get(SyslogUtils.SYSLOG_SEVERITY);
> sle.setSeverity(severity);
> }
>
> // assume the message was received raw (maybe via NetcatSource)
> // parse the priority string
> if (!fromSyslogSource) {
> if (msg.charAt(0) == '<') {
> int end = msg.indexOf(">");
> if (end > -1) {
> seek = end + 1;
> String priStr = msg.substring(1, end);
> // int priority = Integer.parseInt(priStr);
> // String severity = priStr;
>
> sle.setSeverity(priStr);
> }
> }
> }
>
> // parse the timestamp
> String timestampStr = msg.substring(seek, seek + 15);
> long ts = parseRfc3164Date(timestampStr);
> if (ts != 0) {
> sle.setTimestamp(ts);
> seek += 15 + 1; // space after timestamp
> }
>
> // parse the hostname
> int nextSpace = msg.indexOf(' ', seek);
> if (nextSpace > -1) {
> String hostname = msg.substring(seek, nextSpace);
> sle.setHostname(hostname);
> seek = nextSpace + 1;
> }
>
> // everything else is the message
> String actualMessage = msg.substring(seek);
> sle.setMessage(actualMessage);
>
> logger.debug("Serialized event as: {}", sle);
>
> return sle;
> }
>
> private static long parseRfc3164Date(String in) {
> DateTime date = null;
> try {
> date = dateFmt1.parseDateTime(in);
> } catch (IllegalArgumentException e) {
> // ignore the exception, we act based on nullity of date
> object
> logger.debug("Date parse failed on ({}), trying single-digit
> date", in);
> }
>
> if (date == null) {
> try {
> date = dateFmt2.parseDateTime(in);
> } catch (IllegalArgumentException e) {
> // ignore the exception, we act based on nullity of date
> object
> logger.debug("2nd date parse failed on ({}), unknown date
> format", in);
> }
> }
>
> // hacky stuff to try and deal with boundary cases, i.e. new
> year's eve.
> // rfc3164 dates are really dumb.
> // NB: cannot handle replaying of old logs or going back to
> the future
> if (date != null) {
> DateTime now = new DateTime();
> int year = now.getYear();
> DateTime corrected = date.withYear(year);
>
> // flume clock is ahead or there is some latency, and the
> year rolled
> if (corrected.isAfter(now) &&
> corrected.minusMonths(1).isAfter(now)) {
> corrected = date.withYear(year - 1);
> // flume clock is behind and the year rolled
> } else if (corrected.isBefore(now) &&
> corrected.plusMonths(1).isBefore(now)) {
> corrected = date.withYear(year + 1);
> }
> date = corrected;
> }
>
> if (date == null) {
> return 0;
> }
>
> return date.getMillis();
> }
>
> public static class Builder implements EventSerializer.Builder {
>
> @Override
> public EventSerializer build(Context context, OutputStream
> out) {
> CustomLogAvroEventSerializer writer = null;
> try {
> writer = new CustomLogAvroEventSerializer(out);
> writer.configure(context);
> } catch (IOException e) {
> logger.error("Unable to parse schema file. Exception
> follows.", e);
> }
> return writer;
> }
>
> }
>
>
>
> }
>
> Please suggest me i need output like this and i want to customize like
> log4j
>
> -------------------------------------------------------------------------------------------------------------------------------------
> 172 [main] FATAL com.cisco.flume.FlumeTest - Sample fatal message
> 188 [main] INFO com.cisco.flume.FlumeTest - Sample info message1
> 203 [main] WARN com.cisco.flume.FlumeTest - Sample warn message
> 219 [main] INFO com.cisco.flume.FlumeTest - Sample info message2
> 219 [main] INFO com.cisco.flume.FlumeTest - Sample info message3
> 266 [main] ERROR com.cisco.flume.FlumeTest - Sample error message
> 282 [main] FATAL com.cisco.flume.FlumeTest - Sample fatal message
> 282 [main] INFO com.cisco.flume.FlumeTest - Sample info message4
>
>
> --
> JP
>
>
>
> --
> JP
>
>
>
--
JP
Re: Custom Serializer is not working
Posted by Mubarak Seyed <se...@apple.com>.
Do you have a custom jar which contains org.apache.flume.serialization.CustomLogAvroEventSerializer in flume classpath? You can copy the custom jar file to <FLUME_HOME>/lib directory.
ERROR serialization.EventSerializerFactory: Unable to instantiate Builder from org.apache.flume.serialization.CustomLogAvroEventSerializer
On Aug 2, 2012, at 8:26 PM, JP wrote:
> HI,
>
> Im getting errros
>
> 2012-08-02 16:58:50,065 INFO source.AvroSource: Avro source seqGenSrc started.
> 2012-08-02 16:59:02,463 INFO ipc.NettyServer: [id: 0x45cbda0a, /localhost=> / localhost 41414] OPEN
> 2012-08-02 16:59:02,466 INFO ipc.NettyServer: [id: 0x45cbda0a, /localhost=> / localhost :41414] BOUND: / localhost :41414
> 2012-08-02 16:59:02,466 INFO ipc.NettyServer: [id: 0x45cbda0a, /localhost:3770 => / localhost :41414] CONNECTED: / localhost :3770
> 2012-08-02 16:59:04,006 INFO hdfs.BucketWriter: Creating hdfs:// localhost :8020/data/cssplogs/FlumeData.1343906943264.tmp
> 2012-08-02 16:59:04,167 ERROR serialization.EventSerializerFactory: Unable to instantiate Builder from org.apache.flume.serialization.CustomLogAvroEventSerializer
> 2012-08-02 16:59:04,168 WARN hdfs.HDFSEventSink: HDFS IO error
> java.io.IOException: java.lang.NullPointerException
> at org.apache.flume.sink.hdfs.BucketWriter.doOpen(BucketWriter.java:202)
> at org.apache.flume.sink.hdfs.BucketWriter.access$000(BucketWriter.java:48)
> at org.apache.flume.sink.hdfs.BucketWriter$1.run(BucketWriter.java:155)
> at org.apache.flume.sink.hdfs.BucketWriter$1.run(BucketWriter.java:152)
> at org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:125)
> at org.apache.flume.sink.hdfs.BucketWriter.open(BucketWriter.java:152)
> at org.apache.flume.sink.hdfs.BucketWriter.append(BucketWriter.java:307)
> at org.apache.flume.sink.hdfs.HDFSEventSink$1.call(HDFSEventSink.java:717)
> at org.apache.flume.sink.hdfs.HDFSEventSink$1.call(HDFSEventSink.java:714)
> at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
> at java.util.concurrent.FutureTask.run(FutureTask.java:138)
> at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
> at java.lang.Thread.run(Thread.java:662)
> Caused by: java.lang.NullPointerException
> at org.apache.flume.sink.hdfs.HDFSDataStream.open(HDFSDataStream.java:75)
> at org.apache.flume.sink.hdfs.BucketWriter.doOpen(BucketWriter.java:188)
> ... 13 more
> 2012-08-02 16:59:05,239 INFO hdfs.BucketWriter: Creating hdfs:// localhost :8020/data/cssplogs/FlumeData.1343906943265.tmp
> 2012-08-02 16:59:05,392 ERROR serialization.EventSerializerFactory: Unable to instantiate Builder from org.apache.flume.serialization.CustomLogAvroEventSerializer
> 2012-08-02 16:59:05,392 WARN hdfs.HDFSEventSink: HDFS IO error
> java.io.IOException: java.lang.NullPointerException
> at org.apache.flume.sink.hdfs.BucketWriter.doOpen(BucketWriter.java:202)
> at org.apache.flume.sink.hdfs.BucketWriter.access$000(BucketWriter.java:48)
> at org.apache.flume.sink.hdfs.BucketWriter$1.run(BucketWriter.java:155)
> at org.apache.flume.sink.hdfs.BucketWriter$1.run(BucketWriter.java:152)
> at org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:125)
> at org.apache.flume.sink.hdfs.BucketWriter.open(BucketWriter.java:152)
> at org.apache.flume.sink.hdfs.BucketWriter.append(BucketWriter.java:307)
> at org.apache.flume.sink.hdfs.HDFSEventSink$1.call(HDFSEventSink.java:717)
> at org.apache.flume.sink.hdfs.HDFSEventSink$1.call(HDFSEventSink.java:714)
> at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
> at java.util.concurrent.FutureTask.run(FutureTask.java:138)
> at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
> at java.lang.Thread.run(Thread.java:662)
>
> -----------------------------------------------------------------------------------------------------------------
>
>
> This is my avro file
>
> { "type": "record", "name": "LogEvent", "namespace": "org.apache.flume.serialization",
> "fields": [
> { "name": "srno", "type": "int" },
> { "name": "severity", "type": "int" },
> { "name": "timestamp", "type": "long" },
> { "name": "hostname", "type": "string" },
> { "name": "message", "type": "string" }
> ]
> }
>
> ------------------------------------------------------------------------------------------------
>
> This is the LogEvent created using maven-avro and little customized
>
> @SuppressWarnings("all")
> public class LogEvent extends SpecificRecordBase implements SpecificRecord {
> public static final Schema _SCHEMA = Schema.parse("{\"type\":\"record\",\"name\":\"LogEvent\",\"namespace\":\"org.apache.flume.serialization\",\"fields\":[{\"name\":\"srno\",\"type\":\"int\"},{\"name\":\"severity\",\"type\":\"int\"},{\"name\":\"timestamp\",\"type\":\"long\"},{\"name\":\"hostname\",\"type\":\"string\"},{\"name\":\"message\",\"type\":\"string\"}]}");
> public int srno;
> public String severity;
> public long timestamp;
> public String hostname;
> public String message;
>
> public Schema getSchema() { return _SCHEMA; }
> public Object get(int _field) {
> switch (_field) {
> case 0: return srno;
> case 1: return severity;
> case 2: return timestamp;
> case 3: return hostname;
> case 4: return message;
> default: throw new AvroRuntimeException("Bad index");
> }
> }
>
> @SuppressWarnings(value="unchecked")
> public void set(int _field, Object _value) {
> switch (_field) {
> case 0: srno = (Integer)_value; break;
> case 1: severity = (String)_value; break;
> case 2: timestamp = (Long)_value; break;
> case 3: hostname = (String)_value; break;
> case 4: message = (String)_value; break;
> default: throw new AvroRuntimeException("Bad index");
> }
> }
>
> public void setSrno(int srno) {
> this.srno = srno;
> }
> public void setSeverity(String s) {
> severity = s;
> }
> public String getSeverity() {
> return severity;
> }
>
> public void setTimestamp(long t) {
> timestamp = t;
> }
>
> public long getTimestamp() {
> return timestamp;
> }
>
> public void setHostname(String h) {
> hostname = h;
> }
>
> public String getHostname() {
> return hostname;
> }
>
> public void setMessage(String m) {
> message = m;
> }
>
> public String getMessage() {
> return message;
> }
>
> @Override
> public void put(int field, Object value) {
> // TODO Auto-generated method stub
>
> }
> }
> -----------------------------------------------------------------------------------------------------------------------------------------------------------------------
> agent2.sources = seqGenSrc
> agent2.channels = memoryChannel
> agent2.sinks = loggerSink
>
>
> agent2.sources.seqGenSrc.type = avro
> agent2.sources.seqGenSrc.bind=slcso-poc2-lnx
> agent2.sources.seqGenSrc.port=41414
>
> #agent2.sources.seqGenSrc.interceptors = time hostInterceptor
> #agent2.sources.seqGenSrc.interceptors.hostInterceptor.type = org.apache.flume.interceptor.HostInterceptor$Builder
> #agent2.sources.seqGenSrc.interceptors.hostInterceptor.hostHeader = host
> #agent2.sources.seqGenSrc.interceptors.hostInterceptor.useIP = false
> #agent2.sources.seqGenSrc.interceptors.hostInterceptor.hostHeader.preserveExisting = false
> #agent2.sources.seqGenSrc.interceptors.time.type = org.apache.flume.interceptor.TimestampInterceptor$Builder
>
> agent2.channels.memoryChannel.type = memory
> agent2.channels.memoryChannel.capacity = 1000000
> agent2.channels.memoryChannel.transactionCapacity = 1000000
> agent2.channels.memoryChannel.keep-alive = 30
>
> agent2.sources.seqGenSrc.channels = memoryChannel
>
> agent2.sinks.loggerSink.type = hdfs
> #agent2.sinks.loggerSink.hdfs.path = hdfs://10.105.39.204:8020/data/CspcLogs
> agent2.sinks.loggerSink.hdfs.path = hdfs://slcso-poc4-lnx:8020/data/cssplogs
> agent2.sinks.loggerSink.hdfs.fileType = DataStream
> #agent2.sinks.loggerSink.hdfs.writeFormat = Text
>
> agent2.sinks.loggerSink.channel = memoryChannel
> #agent2.sinks.loggerSink.serializer = org.apache.flume.serialization.BodyTextEventSerializer
> #agent2.sinks.loggerSink.serializer = avro_event
> agent2.sinks.loggerSink.serializer = org.apache.flume.serialization.CustomLogAvroEventSerializer
> agent2.sinks.loggerSink.serializer.compressionCodec = snappy
> #agent2.sinks.loggerSink.serializer.syncIntervalBytes = 2048000
> agent2.channels.memoryChannel.type = memory
> ~
>
>
> -----------------------------------------------------------------------------------------------------------------
> The following is my class
>
> public class CustomLogAvroEventSerializer extends
> AbstractAvroEventSerializer<LogEvent> {
>
> private static final DateTimeFormatter dateFmt1 =
> DateTimeFormat.forPattern("MMM dd HH:mm:ss").withZoneUTC();
>
> private static final DateTimeFormatter dateFmt2 =
> DateTimeFormat.forPattern("MMM d HH:mm:ss").withZoneUTC();
>
>
> private static final Logger logger =
> LoggerFactory.getLogger(CustomLogAvroEventSerializer.class);
>
> private final OutputStream out;
> private final Schema schema;
>
> public CustomLogAvroEventSerializer(OutputStream out) throws IOException {
> this.out = out;
> this.schema =new LogEvent().getSchema();;
> }
>
> @Override
> protected OutputStream getOutputStream() {
> return out;
> }
>
> @Override
> protected Schema getSchema() {
> return schema;
> }
>
> // very simple rfc3164 parser
> @Override
> protected LogEvent convert(Event event) {
> LogEvent sle = new LogEvent();
>
> // Stringify body so it's easy to parse.
> // This is a pretty inefficient way to do it.
> String msg = new String(event.getBody(), Charsets.UTF_8);
>
> // parser read pointer
> int seek = 0;
>
> // Check Flume headers to see if we came from SyslogTcp(or UDP) Source,
> // which at the time of this writing only parses the priority.
> // This is a bit schizophrenic and it should parse all the fields or none.
> Map<String, String> headers = event.getHeaders();
> boolean fromSyslogSource = false;
> if (headers.containsKey(SyslogUtils.SYSLOG_SRNO)) {
> fromSyslogSource = true;
> int srno = Integer.parseInt(headers.get("srno"));
> sle.setSrno(srno);
> }else{
> sle.setSrno(121);
> }
> if (headers.containsKey(SyslogUtils.SYSLOG_SEVERITY)) {
> fromSyslogSource = true;
> String severity = headers.get(SyslogUtils.SYSLOG_SEVERITY);
> sle.setSeverity(severity);
> }
>
> // assume the message was received raw (maybe via NetcatSource)
> // parse the priority string
> if (!fromSyslogSource) {
> if (msg.charAt(0) == '<') {
> int end = msg.indexOf(">");
> if (end > -1) {
> seek = end + 1;
> String priStr = msg.substring(1, end);
> // int priority = Integer.parseInt(priStr);
> // String severity = priStr;
>
> sle.setSeverity(priStr);
> }
> }
> }
>
> // parse the timestamp
> String timestampStr = msg.substring(seek, seek + 15);
> long ts = parseRfc3164Date(timestampStr);
> if (ts != 0) {
> sle.setTimestamp(ts);
> seek += 15 + 1; // space after timestamp
> }
>
> // parse the hostname
> int nextSpace = msg.indexOf(' ', seek);
> if (nextSpace > -1) {
> String hostname = msg.substring(seek, nextSpace);
> sle.setHostname(hostname);
> seek = nextSpace + 1;
> }
>
> // everything else is the message
> String actualMessage = msg.substring(seek);
> sle.setMessage(actualMessage);
>
> logger.debug("Serialized event as: {}", sle);
>
> return sle;
> }
>
> private static long parseRfc3164Date(String in) {
> DateTime date = null;
> try {
> date = dateFmt1.parseDateTime(in);
> } catch (IllegalArgumentException e) {
> // ignore the exception, we act based on nullity of date object
> logger.debug("Date parse failed on ({}), trying single-digit date", in);
> }
>
> if (date == null) {
> try {
> date = dateFmt2.parseDateTime(in);
> } catch (IllegalArgumentException e) {
> // ignore the exception, we act based on nullity of date object
> logger.debug("2nd date parse failed on ({}), unknown date format", in);
> }
> }
>
> // hacky stuff to try and deal with boundary cases, i.e. new year's eve.
> // rfc3164 dates are really dumb.
> // NB: cannot handle replaying of old logs or going back to the future
> if (date != null) {
> DateTime now = new DateTime();
> int year = now.getYear();
> DateTime corrected = date.withYear(year);
>
> // flume clock is ahead or there is some latency, and the year rolled
> if (corrected.isAfter(now) && corrected.minusMonths(1).isAfter(now)) {
> corrected = date.withYear(year - 1);
> // flume clock is behind and the year rolled
> } else if (corrected.isBefore(now) && corrected.plusMonths(1).isBefore(now)) {
> corrected = date.withYear(year + 1);
> }
> date = corrected;
> }
>
> if (date == null) {
> return 0;
> }
>
> return date.getMillis();
> }
>
> public static class Builder implements EventSerializer.Builder {
>
> @Override
> public EventSerializer build(Context context, OutputStream out) {
> CustomLogAvroEventSerializer writer = null;
> try {
> writer = new CustomLogAvroEventSerializer(out);
> writer.configure(context);
> } catch (IOException e) {
> logger.error("Unable to parse schema file. Exception follows.", e);
> }
> return writer;
> }
>
> }
>
>
>
> }
>
> Please suggest me i need output like this and i want to customize like log4j
> -------------------------------------------------------------------------------------------------------------------------------------
> 172 [main] FATAL com.cisco.flume.FlumeTest - Sample fatal message
> 188 [main] INFO com.cisco.flume.FlumeTest - Sample info message1
> 203 [main] WARN com.cisco.flume.FlumeTest - Sample warn message
> 219 [main] INFO com.cisco.flume.FlumeTest - Sample info message2
> 219 [main] INFO com.cisco.flume.FlumeTest - Sample info message3
> 266 [main] ERROR com.cisco.flume.FlumeTest - Sample error message
> 282 [main] FATAL com.cisco.flume.FlumeTest - Sample fatal message
> 282 [main] INFO com.cisco.flume.FlumeTest - Sample info message4
>
>
> --
> JP
>
>
>
> --
> JP