You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by mp...@apache.org on 2013/06/18 20:19:48 UTC

git commit: FLUME-2010. Support Avro records in Log4jAppender and the HDFS Sink.

Updated Branches:
  refs/heads/trunk cf6298415 -> e048160e0


FLUME-2010. Support Avro records in Log4jAppender and the HDFS Sink.

(Tom White via Mike Percy)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/e048160e
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/e048160e
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/e048160e

Branch: refs/heads/trunk
Commit: e048160e043e4aa57fc49cc33f2640e60677cbe5
Parents: cf62984
Author: Mike Percy <mp...@apache.org>
Authored: Tue Jun 18 11:17:39 2013 -0700
Committer: Mike Percy <mp...@apache.org>
Committed: Tue Jun 18 11:18:30 2013 -0700

----------------------------------------------------------------------
 .../clients/log4jappender/Log4jAppender.java    |  73 ++++++-
 .../clients/log4jappender/Log4jAvroHeaders.java |   4 +-
 .../TestLog4jAppenderWithAvro.java              | 195 ++++++++++++++++++
 .../flume-log4jtest-avro-generic.properties     |  21 ++
 .../flume-log4jtest-avro-reflect.properties     |  21 ++
 .../src/test/resources/myrecord.avsc            |   1 +
 flume-ng-doc/sphinx/FlumeUserGuide.rst          |  79 ++++++--
 .../flume/sink/hdfs/AvroEventSerializer.java    | 197 +++++++++++++++++++
 .../sink/hdfs/TestAvroEventSerializer.java      | 156 +++++++++++++++
 9 files changed, 717 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/e048160e/flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/Log4jAppender.java
----------------------------------------------------------------------
diff --git a/flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/Log4jAppender.java b/flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/Log4jAppender.java
index 532b761..b07b189 100644
--- a/flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/Log4jAppender.java
+++ b/flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/Log4jAppender.java
@@ -18,11 +18,21 @@
  */
 package org.apache.flume.clients.log4jappender;
 
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
 import java.nio.charset.Charset;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
 
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.reflect.ReflectDatumWriter;
+import org.apache.avro.specific.SpecificRecord;
 import org.apache.flume.Event;
 import org.apache.flume.EventDeliveryException;
 import org.apache.flume.FlumeException;
@@ -67,6 +77,8 @@ public class Log4jAppender extends AppenderSkeleton {
   private boolean unsafeMode = false;
   private long timeout = RpcClientConfigurationConstants
     .DEFAULT_REQUEST_TIMEOUT_MILLIS;
+  private boolean avroReflectionEnabled;
+  private String avroSchemaUrl;
 
   RpcClient rpcClient = null;
 
@@ -130,18 +142,23 @@ public class Log4jAppender extends AppenderSkeleton {
     //Log4jAvroHeaders.LOG_LEVEL.toString()))
     hdrs.put(Log4jAvroHeaders.LOG_LEVEL.toString(),
         String.valueOf(event.getLevel().toInt()));
-    hdrs.put(Log4jAvroHeaders.MESSAGE_ENCODING.toString(), "UTF8");
 
-    String message = null;
-    if(this.layout != null) {
-      message = this.layout.format(event);
+    Event flumeEvent;
+    Object message = event.getMessage();
+    if (message instanceof GenericRecord) {
+      GenericRecord record = (GenericRecord) message;
+      populateAvroHeaders(hdrs, record.getSchema(), message);
+      flumeEvent = EventBuilder.withBody(serialize(record, record.getSchema()), hdrs);
+    } else if (message instanceof SpecificRecord || avroReflectionEnabled) {
+      Schema schema = ReflectData.get().getSchema(message.getClass());
+      populateAvroHeaders(hdrs, schema, message);
+      flumeEvent = EventBuilder.withBody(serialize(message, schema), hdrs);
     } else {
-      message = event.getMessage().toString();
+      hdrs.put(Log4jAvroHeaders.MESSAGE_ENCODING.toString(), "UTF8");
+      String msg = layout != null ? layout.format(event) : message.toString();
+      flumeEvent = EventBuilder.withBody(msg, Charset.forName("UTF8"), hdrs);
     }
 
-    Event flumeEvent = EventBuilder.withBody(
-        message, Charset.forName("UTF8"), hdrs);
-
     try {
       rpcClient.append(flumeEvent);
     } catch (EventDeliveryException e) {
@@ -154,6 +171,39 @@ public class Log4jAppender extends AppenderSkeleton {
     }
   }
 
+  private Schema schema;
+  private ByteArrayOutputStream out;
+  private DatumWriter<Object> writer;
+  private BinaryEncoder encoder;
+
+  protected void populateAvroHeaders(Map<String, String> hdrs, Schema schema,
+      Object message) {
+    if (avroSchemaUrl != null) {
+      hdrs.put(Log4jAvroHeaders.AVRO_SCHEMA_URL.toString(), avroSchemaUrl);
+      return;
+    }
+    LogLog.warn("Cannot find ID for schema. Adding header for schema, " +
+        "which may be inefficient. Consider setting up an Avro Schema Cache.");
+    hdrs.put(Log4jAvroHeaders.AVRO_SCHEMA_LITERAL.toString(), schema.toString());
+  }
+
+  private byte[] serialize(Object datum, Schema datumSchema) throws FlumeException {
+    if (schema == null || !datumSchema.equals(schema)) {
+      schema = datumSchema;
+      out = new ByteArrayOutputStream();
+      writer = new ReflectDatumWriter<Object>(schema);
+      encoder = EncoderFactory.get().binaryEncoder(out, null);
+    }
+    out.reset();
+    try {
+      writer.write(datum, encoder);
+      encoder.flush();
+      return out.toByteArray();
+    } catch (IOException e) {
+      throw new FlumeException(e);
+    }
+  }
+
   //This function should be synchronized to make sure one thread
   //does not close an appender another thread is using, and hence risking
   //a null pointer exception.
@@ -229,6 +279,13 @@ public class Log4jAppender extends AppenderSkeleton {
     return this.timeout;
   }
 
+  public void setAvroReflectionEnabled(boolean avroReflectionEnabled) {
+    this.avroReflectionEnabled = avroReflectionEnabled;
+  }
+
+  public void setAvroSchemaUrl(String avroSchemaUrl) {
+    this.avroSchemaUrl = avroSchemaUrl;
+  }
 
   /**
    * Activate the options set using <tt>setPort()</tt>

http://git-wip-us.apache.org/repos/asf/flume/blob/e048160e/flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/Log4jAvroHeaders.java
----------------------------------------------------------------------
diff --git a/flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/Log4jAvroHeaders.java b/flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/Log4jAvroHeaders.java
index a6216c3..08a7203 100644
--- a/flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/Log4jAvroHeaders.java
+++ b/flume-ng-clients/flume-ng-log4jappender/src/main/java/org/apache/flume/clients/log4jappender/Log4jAvroHeaders.java
@@ -23,7 +23,9 @@ public enum Log4jAvroHeaders {
   LOGGER_NAME("flume.client.log4j.logger.name"),
   LOG_LEVEL("flume.client.log4j.log.level"),
   MESSAGE_ENCODING("flume.client.log4j.message.encoding"),
-  TIMESTAMP("flume.client.log4j.timestamp");
+  TIMESTAMP("flume.client.log4j.timestamp"),
+  AVRO_SCHEMA_LITERAL("flume.avro.schema.literal"),
+  AVRO_SCHEMA_URL("flume.avro.schema.url");
 
   private String headerName;
   private Log4jAvroHeaders(String headerName){

http://git-wip-us.apache.org/repos/asf/flume/blob/e048160e/flume-ng-clients/flume-ng-log4jappender/src/test/java/org/apache/flume/clients/log4jappender/TestLog4jAppenderWithAvro.java
----------------------------------------------------------------------
diff --git a/flume-ng-clients/flume-ng-log4jappender/src/test/java/org/apache/flume/clients/log4jappender/TestLog4jAppenderWithAvro.java b/flume-ng-clients/flume-ng-log4jappender/src/test/java/org/apache/flume/clients/log4jappender/TestLog4jAppenderWithAvro.java
new file mode 100644
index 0000000..5899c62
--- /dev/null
+++ b/flume-ng-clients/flume-ng-log4jappender/src/test/java/org/apache/flume/clients/log4jappender/TestLog4jAppenderWithAvro.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.clients.log4jappender;
+
+import com.google.common.io.Files;
+import com.google.common.io.Resources;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import junit.framework.Assert;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.GenericRecordBuilder;
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.reflect.ReflectDatumReader;
+import org.apache.flume.Channel;
+import org.apache.flume.ChannelSelector;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.Transaction;
+import org.apache.flume.channel.ChannelProcessor;
+import org.apache.flume.channel.MemoryChannel;
+import org.apache.flume.channel.ReplicatingChannelSelector;
+import org.apache.flume.conf.Configurables;
+import org.apache.flume.source.AvroSource;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.log4j.PropertyConfigurator;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestLog4jAppenderWithAvro {
+  private AvroSource source;
+  private Channel ch;
+  private Properties props;
+
+  @Before
+  public void setUp() throws Exception {
+    URL schemaUrl = getClass().getClassLoader().getResource("myrecord.avsc");
+    Files.copy(Resources.newInputStreamSupplier(schemaUrl),
+        new File("/tmp/myrecord.avsc"));
+
+    int port = 25430;
+    source = new AvroSource();
+    ch = new MemoryChannel();
+    Configurables.configure(ch, new Context());
+
+    Context context = new Context();
+    context.put("port", String.valueOf(port));
+    context.put("bind", "localhost");
+    Configurables.configure(source, context);
+
+    List<Channel> channels = new ArrayList<Channel>();
+    channels.add(ch);
+
+    ChannelSelector rcs = new ReplicatingChannelSelector();
+    rcs.setChannels(channels);
+
+    source.setChannelProcessor(new ChannelProcessor(rcs));
+
+    source.start();
+  }
+
+  private void loadProperties(String file) throws IOException {
+    File TESTFILE = new File(
+        TestLog4jAppenderWithAvro.class.getClassLoader()
+            .getResource(file).getFile());
+    FileReader reader = new FileReader(TESTFILE);
+    props = new Properties();
+    props.load(reader);
+    reader.close();
+  }
+
+  @Test
+  public void testAvroGeneric() throws IOException {
+    loadProperties("flume-log4jtest-avro-generic.properties");
+    PropertyConfigurator.configure(props);
+    Logger logger = LogManager.getLogger(TestLog4jAppenderWithAvro.class);
+    String msg = "This is log message number " + String.valueOf(0);
+
+    Schema schema = new Schema.Parser().parse(
+        getClass().getClassLoader().getResource("myrecord.avsc").openStream());
+    GenericRecordBuilder builder = new GenericRecordBuilder(schema);
+    GenericRecord record = builder.set("message", msg).build();
+
+    logger.info(record);
+
+    Transaction transaction = ch.getTransaction();
+    transaction.begin();
+    Event event = ch.take();
+    Assert.assertNotNull(event);
+
+    GenericDatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schema);
+    BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(event.getBody(), null);
+    GenericRecord recordFromEvent = reader.read(null, decoder);
+    Assert.assertEquals(msg, recordFromEvent.get("message").toString());
+
+    Map<String, String> hdrs = event.getHeaders();
+
+    Assert.assertNull(hdrs.get(Log4jAvroHeaders.MESSAGE_ENCODING.toString()));
+
+    Assert.assertEquals("Schema URL should be set",
+        "file:///tmp/myrecord.avsc", hdrs.get(Log4jAvroHeaders.AVRO_SCHEMA_URL.toString
+        ()));
+    Assert.assertNull("Schema string should not be set",
+        hdrs.get(Log4jAvroHeaders.AVRO_SCHEMA_LITERAL.toString()));
+
+    transaction.commit();
+    transaction.close();
+
+  }
+
+  @Test
+  public void testAvroReflect() throws IOException {
+    loadProperties("flume-log4jtest-avro-reflect.properties");
+    PropertyConfigurator.configure(props);
+    Logger logger = LogManager.getLogger(TestLog4jAppenderWithAvro.class);
+    String msg = "This is log message number " + String.valueOf(0);
+
+    AppEvent appEvent = new AppEvent();
+    appEvent.setMessage(msg);
+
+    logger.info(appEvent);
+
+    Transaction transaction = ch.getTransaction();
+    transaction.begin();
+    Event event = ch.take();
+    Assert.assertNotNull(event);
+
+    Schema schema = ReflectData.get().getSchema(appEvent.getClass());
+
+    ReflectDatumReader<AppEvent> reader = new ReflectDatumReader<AppEvent>(AppEvent.class);
+    BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(event.getBody(), null);
+    AppEvent recordFromEvent = reader.read(null, decoder);
+    Assert.assertEquals(msg, recordFromEvent.getMessage());
+
+    Map<String, String> hdrs = event.getHeaders();
+
+    Assert.assertNull(hdrs.get(Log4jAvroHeaders.MESSAGE_ENCODING.toString()));
+
+    Assert.assertNull("Schema URL should not be set",
+        hdrs.get(Log4jAvroHeaders.AVRO_SCHEMA_URL.toString()));
+    Assert.assertEquals("Schema string should be set", schema.toString(),
+        hdrs.get(Log4jAvroHeaders.AVRO_SCHEMA_LITERAL.toString()));
+
+    transaction.commit();
+    transaction.close();
+
+  }
+
+  @After
+  public void cleanUp(){
+    source.stop();
+    ch.stop();
+    props.clear();
+  }
+
+  public static class AppEvent {
+    private String message;
+
+    public String getMessage() {
+      return message;
+    }
+
+    public void setMessage(String message) {
+      this.message = message;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/e048160e/flume-ng-clients/flume-ng-log4jappender/src/test/resources/flume-log4jtest-avro-generic.properties
----------------------------------------------------------------------
diff --git a/flume-ng-clients/flume-ng-log4jappender/src/test/resources/flume-log4jtest-avro-generic.properties b/flume-ng-clients/flume-ng-log4jappender/src/test/resources/flume-log4jtest-avro-generic.properties
new file mode 100644
index 0000000..ffdab8b
--- /dev/null
+++ b/flume-ng-clients/flume-ng-log4jappender/src/test/resources/flume-log4jtest-avro-generic.properties
@@ -0,0 +1,21 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+log4j.appender.out2 = org.apache.flume.clients.log4jappender.Log4jAppender
+log4j.appender.out2.Port = 25430
+log4j.appender.out2.Hostname = localhost
+log4j.appender.out2.AvroSchemaUrl = file:///tmp/myrecord.avsc
+log4j.logger.org.apache.flume.clients.log4jappender = DEBUG,out2
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flume/blob/e048160e/flume-ng-clients/flume-ng-log4jappender/src/test/resources/flume-log4jtest-avro-reflect.properties
----------------------------------------------------------------------
diff --git a/flume-ng-clients/flume-ng-log4jappender/src/test/resources/flume-log4jtest-avro-reflect.properties b/flume-ng-clients/flume-ng-log4jappender/src/test/resources/flume-log4jtest-avro-reflect.properties
new file mode 100644
index 0000000..b50ffcc
--- /dev/null
+++ b/flume-ng-clients/flume-ng-log4jappender/src/test/resources/flume-log4jtest-avro-reflect.properties
@@ -0,0 +1,21 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+log4j.appender.out2 = org.apache.flume.clients.log4jappender.Log4jAppender
+log4j.appender.out2.Port = 25430
+log4j.appender.out2.Hostname = localhost
+log4j.appender.out2.AvroReflectionEnabled = true
+log4j.logger.org.apache.flume.clients.log4jappender = DEBUG,out2
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flume/blob/e048160e/flume-ng-clients/flume-ng-log4jappender/src/test/resources/myrecord.avsc
----------------------------------------------------------------------
diff --git a/flume-ng-clients/flume-ng-log4jappender/src/test/resources/myrecord.avsc b/flume-ng-clients/flume-ng-log4jappender/src/test/resources/myrecord.avsc
new file mode 100644
index 0000000..54130a3
--- /dev/null
+++ b/flume-ng-clients/flume-ng-log4jappender/src/test/resources/myrecord.avsc
@@ -0,0 +1 @@
+{"type":"record","name":"myrecord","fields":[{"name":"message","type":"string"}]}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flume/blob/e048160e/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index 4f13d32..a8f84e5 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -1449,7 +1449,7 @@ hdfs.fileType           SequenceFile  File format: currently ``SequenceFile``, `
                                       (2)CompressedStream requires set hdfs.codeC with an available codeC
 hdfs.maxOpenFiles       5000          Allow only this number of open files. If this number is exceeded, the oldest file is closed.
 hdfs.minBlockReplicas   --            Specify minimum number of replicas per HDFS block. If not specified, it comes from the default Hadoop config in the classpath.
-hdfs.writeFormat        --            "Text" or "Writable"
+hdfs.writeFormat        --            Format for sequence file records. One of "Text" or "Writable" (the default).
 hdfs.callTimeout        10000         Number of milliseconds allowed for HDFS operations, such as open, write, flush, close.
                                       This number should be increased if many HDFS timeout operations are occurring.
 hdfs.threadsPoolSize    10            Number of threads per HDFS sink for HDFS IO ops (open, write, etc.)
@@ -2607,14 +2607,18 @@ appender must have the flume-ng-sdk in the classpath (eg,
 flume-ng-sdk-1.4.0-SNAPSHOT.jar).
 Required properties are in **bold**.
 
-=============  =======  ==========================================================================
-Property Name  Default  Description
-=============  =======  ==========================================================================
-Hostname       --       The hostname on which a remote Flume agent is running with an avro source.
-Port           --       The port at which the remote Flume agent's avro source is listening.
-UnsafeMode     false    If true, the appender will not throw exceptions on failure to send the events.
-=============  =======  ==========================================================================
-
+=====================  =======  ==============================================================
+Property Name          Default  Description
+=====================  =======  ==============================================================
+**Hostname**           --       The hostname on which a remote Flume agent is running with an
+                                avro source.
+**Port**               --       The port at which the remote Flume agent's avro source is
+                                listening.
+UnsafeMode             false    If true, the appender will not throw exceptions on failure to
+                                send the events.
+AvroReflectionEnabled  false    Use Avro Reflection to serialize Log4j events.
+AvroSchemaUrl          --       A URL from which the Avro schema can be retrieved.
+=====================  =======  ==============================================================
 
 Sample log4j.properties file:
 
@@ -2630,6 +2634,35 @@ Sample log4j.properties file:
   log4j.logger.org.example.MyClass = DEBUG,flume
   #...
 
+By default each event is converted to a string by calling ``toString()``,
+or by using the Log4j layout, if specified.
+
+If the event is an instance of
+``org.apache.avro.generic.GenericRecord``, ``org.apache.avro.specific.SpecificRecord``,
+or if the property ``AvroReflectionEnabled`` is set to ``true`` then the event will be
+serialized using Avro serialization.
+
+Serializing every event with its Avro schema is inefficient, so it is good practice to
+provide a schema URL from which the schema can be retrieved by the downstream sink,
+typically the HDFS sink. If ``AvroSchemaUrl`` is not specified,
+then the schema will be included as a Flume header.
+
+Sample log4j.properties file configured to use Avro serialization:
+
+.. code-block:: properties
+
+  #...
+  log4j.appender.flume = org.apache.flume.clients.log4jappender.Log4jAppender
+  log4j.appender.flume.Hostname = example.com
+  log4j.appender.flume.Port = 41414
+  log4j.appender.flume.AvroReflectionEnabled = true
+  log4j.appender.flume.AvroSchemaUrl = hdfs://namenode/path/to/schema.avsc
+
+  # configure a class's logger to output to the flume appender
+  log4j.logger.org.example.MyClass = DEBUG,flume
+  #...
+
+
 Load Balancing Log4J Appender
 =============================
 
@@ -2640,18 +2673,22 @@ scheme for performing the load balancing. It also supports a configurable backof
 timeout so that down agents are removed temporarily from the set of hosts
 Required properties are in **bold**.
 
-=============  ===========  ==========================================================================
-Property Name  Default      Description
-=============  ===========  ==========================================================================
-**Hosts**      --           A space-separated list of host:port
-                            at which Flume (through an AvroSource) is listening for events
-Selector       ROUND_ROBIN  Selection mechanism. Must be either ROUND_ROBIN,
-                            RANDOM or custom FQDN to class that inherits from LoadBalancingSelector.
-MaxBackoff     --           A long value representing the maximum amount of time in milliseconds
-                            the Load balancing client will backoff from a node that has failed to
-                            consume an event. Defaults to no backoff
-UnsafeMode     false        If true, the appender will not throw exceptions on failure to send the events.
-=============  ===========  ==========================================================================
+=====================  ===========  ==============================================================
+Property Name          Default      Description
+=====================  ===========  ==============================================================
+**Hosts**              --           A space-separated list of host:port at which Flume (through
+                                    an AvroSource) is listening for events
+Selector               ROUND_ROBIN  Selection mechanism. Must be either ROUND_ROBIN,
+                                    RANDOM or custom FQDN to class that inherits from
+                                    LoadBalancingSelector.
+MaxBackoff             --           A long value representing the maximum amount of time in
+                                    milliseconds the Load balancing client will backoff from a
+                                    node that has failed to consume an event. Defaults to no backoff
+UnsafeMode             false        If true, the appender will not throw exceptions on failure to
+                                    send the events.
+AvroReflectionEnabled  false        Use Avro Reflection to serialize Log4j events.
+AvroSchemaUrl          --           A URL from which the Avro schema can be retrieved.
+=====================  ===========  ==============================================================
 
 
 Sample log4j.properties file configured using defaults:

http://git-wip-us.apache.org/repos/asf/flume/blob/e048160e/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/AvroEventSerializer.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/AvroEventSerializer.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/AvroEventSerializer.java
new file mode 100644
index 0000000..4b8fc78
--- /dev/null
+++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/AvroEventSerializer.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.sink.hdfs;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URL;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.avro.AvroRuntimeException;
+import org.apache.avro.Schema;
+import org.apache.avro.file.CodecFactory;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.io.DatumWriter;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.FlumeException;
+import org.apache.flume.conf.Configurable;
+import org.apache.flume.serialization.EventSerializer;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.flume.serialization.AvroEventSerializerConfigurationConstants.*;
+
+/**
+ * <p>
+ * This class serializes Flume {@linkplain org.apache.flume.Event events} into Avro data files. The
+ * Flume event body is read as an Avro datum, and is then written to the
+ * {@link org.apache.flume.serialization.EventSerializer}'s output stream in Avro data file format.
+ * </p>
+ * <p>
+ * The Avro schema is determined by reading a Flume event header. The schema may be
+ * specified either as a literal, by setting {@link #AVRO_SCHEMA_LITERAL_HEADER} (not
+ * recommended, since the full schema must be transmitted in every event),
+ * or as a URL which the schema may be read from, by setting {@link
+ * #AVRO_SCHEMA_URL_HEADER}. Schemas read from URLs are cached by instances of this
+ * class so that the overhead of retrieval is minimized.
+ * </p>
+ */
+public class AvroEventSerializer implements EventSerializer, Configurable {
+
+  private static final Logger logger =
+      LoggerFactory.getLogger(AvroEventSerializer.class);
+
+  public static final String AVRO_SCHEMA_LITERAL_HEADER = "flume.avro.schema.literal";
+  public static final String AVRO_SCHEMA_URL_HEADER = "flume.avro.schema.url";
+
+  private final OutputStream out;
+  private DatumWriter<Object> writer = null;
+  private DataFileWriter<Object> dataFileWriter = null;
+
+  private int syncIntervalBytes;
+  private String compressionCodec;
+  private Map<String, Schema> schemaCache = new HashMap<String, Schema>();
+
+  private AvroEventSerializer(OutputStream out) {
+    this.out = out;
+  }
+
+  @Override
+  public void configure(Context context) {
+    syncIntervalBytes =
+        context.getInteger(SYNC_INTERVAL_BYTES, DEFAULT_SYNC_INTERVAL_BYTES);
+    compressionCodec =
+        context.getString(COMPRESSION_CODEC, DEFAULT_COMPRESSION_CODEC);
+  }
+
+  @Override
+  public void afterCreate() throws IOException {
+    // no-op
+  }
+
+  @Override
+  public void afterReopen() throws IOException {
+    // impossible to initialize DataFileWriter without writing the schema?
+    throw new UnsupportedOperationException("Avro API doesn't support append");
+  }
+
+  @Override
+  public void write(Event event) throws IOException {
+    if (dataFileWriter == null) {
+      initialize(event);
+    }
+    dataFileWriter.appendEncoded(ByteBuffer.wrap(event.getBody()));
+  }
+
+  private void initialize(Event event) throws IOException {
+    Schema schema = null;
+    String schemaUrl = event.getHeaders().get(AVRO_SCHEMA_URL_HEADER);
+    if (schemaUrl != null) {
+      schema = schemaCache.get(schemaUrl);
+      if (schema == null) {
+        schema = loadFromUrl(schemaUrl);
+        schemaCache.put(schemaUrl, schema);
+      }
+    }
+    if (schema == null) {
+      String schemaString = event.getHeaders().get(AVRO_SCHEMA_LITERAL_HEADER);
+      if (schemaString == null) {
+        throw new FlumeException("Could not find schema for event " + event);
+      }
+      schema = new Schema.Parser().parse(schemaString);
+    }
+
+    writer = new GenericDatumWriter<Object>(schema);
+    dataFileWriter = new DataFileWriter<Object>(writer);
+
+    dataFileWriter.setSyncInterval(syncIntervalBytes);
+
+    try {
+      CodecFactory codecFactory = CodecFactory.fromString(compressionCodec);
+      dataFileWriter.setCodec(codecFactory);
+    } catch (AvroRuntimeException e) {
+      logger.warn("Unable to instantiate avro codec with name (" +
+          compressionCodec + "). Compression disabled. Exception follows.", e);
+    }
+
+    dataFileWriter.create(schema, out);
+  }
+
+  private Schema loadFromUrl(String schemaUrl) throws IOException {
+    Configuration conf = new Configuration();
+    Schema.Parser parser = new Schema.Parser();
+    if (schemaUrl.toLowerCase().startsWith("hdfs://")) {
+      FileSystem fs = FileSystem.get(conf);
+      FSDataInputStream input = null;
+      try {
+        input = fs.open(new Path(schemaUrl));
+        return parser.parse(input);
+      } finally {
+        if (input != null) {
+          input.close();
+        }
+      }
+    } else {
+      InputStream is = null;
+      try {
+        is = new URL(schemaUrl).openStream();
+        return parser.parse(is);
+      } finally {
+        if (is != null) {
+          is.close();
+        }
+      }
+    }
+  }
+
+  @Override
+  public void flush() throws IOException {
+    dataFileWriter.flush();
+  }
+
+  @Override
+  public void beforeClose() throws IOException {
+    // no-op
+  }
+
+  @Override
+  public boolean supportsReopen() {
+    return false;
+  }
+
+  public static class Builder implements EventSerializer.Builder {
+
+    @Override
+    public EventSerializer build(Context context, OutputStream out) {
+      AvroEventSerializer writer = new AvroEventSerializer(out);
+      writer.configure(context);
+      return writer;
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/e048160e/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestAvroEventSerializer.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestAvroEventSerializer.java b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestAvroEventSerializer.java
new file mode 100644
index 0000000..38af74d
--- /dev/null
+++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestAvroEventSerializer.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.sink.hdfs;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Arrays;
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.GenericRecordBuilder;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.reflect.ReflectDatumWriter;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.event.EventBuilder;
+import org.apache.flume.serialization.EventSerializer;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestAvroEventSerializer {
+
+  private File file;
+
+  @Before
+  public void setUp() throws Exception {
+    file = File.createTempFile(getClass().getSimpleName(), "");
+  }
+
+  @Test
+  public void testNoCompression() throws IOException {
+    createAvroFile(file, null, false);
+    validateAvroFile(file);
+  }
+
+  @Test
+  public void testNullCompression() throws IOException {
+    createAvroFile(file, "null", false);
+    validateAvroFile(file);
+  }
+
+  @Test
+  public void testDeflateCompression() throws IOException {
+    createAvroFile(file, "deflate", false);
+    validateAvroFile(file);
+  }
+
+  @Test
+  public void testSnappyCompression() throws IOException {
+    createAvroFile(file, "snappy", false);
+    validateAvroFile(file);
+  }
+
+  @Test
+  public void testSchemaUrl() throws IOException {
+    createAvroFile(file, null, true);
+    validateAvroFile(file);
+  }
+
+  public void createAvroFile(File file, String codec, boolean useSchemaUrl) throws
+      IOException {
+
+    // serialize a few events using the reflection-based avro serializer
+    OutputStream out = new FileOutputStream(file);
+
+    Context ctx = new Context();
+    if (codec != null) {
+      ctx.put("compressionCodec", codec);
+    }
+
+    Schema schema = Schema.createRecord("myrecord", null, null, false);
+    schema.setFields(Arrays.asList(new Schema.Field[]{
+        new Schema.Field("message", Schema.create(Schema.Type.STRING), null, null)
+    }));
+    GenericRecordBuilder recordBuilder = new GenericRecordBuilder(schema);
+    File schemaFile = null;
+    if (useSchemaUrl) {
+      schemaFile = File.createTempFile(getClass().getSimpleName(), ".avsc");
+      Files.write(schema.toString(), schemaFile, Charsets.UTF_8);
+    }
+
+    EventSerializer.Builder builder = new AvroEventSerializer.Builder();
+    EventSerializer serializer = builder.build(ctx, out);
+
+    serializer.afterCreate();
+    for (int i = 0; i < 3; i++) {
+      GenericRecord record = recordBuilder.set("message", "Hello " + i).build();
+      Event event = EventBuilder.withBody(serializeAvro(record, schema));
+      if (schemaFile == null) {
+        event.getHeaders().put(AvroEventSerializer.AVRO_SCHEMA_LITERAL_HEADER,
+            schema.toString());
+      } else {
+        event.getHeaders().put(AvroEventSerializer.AVRO_SCHEMA_URL_HEADER,
+            schemaFile.toURI().toURL().toExternalForm());
+      }
+      serializer.write(event);
+    }
+    serializer.flush();
+    serializer.beforeClose();
+    out.flush();
+    out.close();
+  }
+
+  private byte[] serializeAvro(Object datum, Schema schema) throws IOException {
+    ByteArrayOutputStream out = new ByteArrayOutputStream();
+    ReflectDatumWriter<Object> writer = new ReflectDatumWriter<Object>(schema);
+    BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null);
+    out.reset();
+    writer.write(datum, encoder);
+    encoder.flush();
+    return out.toByteArray();
+  }
+
+  public void validateAvroFile(File file) throws IOException {
+    // read the events back using GenericRecord
+    DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>();
+    DataFileReader<GenericRecord> fileReader =
+        new DataFileReader<GenericRecord>(file, reader);
+    GenericRecord record = new GenericData.Record(fileReader.getSchema());
+    int numEvents = 0;
+    while (fileReader.hasNext()) {
+      fileReader.next(record);
+      String bodyStr = record.get("message").toString();
+      System.out.println(bodyStr);
+      numEvents++;
+    }
+    fileReader.close();
+    Assert.assertEquals("Should have found a total of 3 events", 3, numEvents);
+  }
+}