You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by mp...@apache.org on 2013/03/08 01:54:35 UTC

git commit: FLUME-1915. Deflate compression support for the AvroSource, AvroSink, and Avro RPC Client.

Updated Branches:
  refs/heads/trunk d10a6bd1e -> e72e559ba


FLUME-1915. Deflate compression support for the AvroSource, AvroSink, and Avro RPC Client.

(Ted Malaska via Mike Percy)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/e72e559b
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/e72e559b
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/e72e559b

Branch: refs/heads/trunk
Commit: e72e559baa589787bcba233e29882e28e0ff43ef
Parents: d10a6bd
Author: Mike Percy <mp...@apache.org>
Authored: Thu Mar 7 16:52:10 2013 -0800
Committer: Mike Percy <mp...@apache.org>
Committed: Thu Mar 7 16:53:01 2013 -0800

----------------------------------------------------------------------
 .../org/apache/flume/sink/AbstractRpcSink.java     |   34 ++--
 .../java/org/apache/flume/source/AvroSource.java   |   67 ++++++-
 .../java/org/apache/flume/sink/TestAvroSink.java   |  146 ++++++++++++++-
 .../org/apache/flume/source/TestAvroSource.java    |   92 +++++++++-
 flume-ng-doc/sphinx/FlumeUserGuide.rst             |   48 +++---
 .../org/apache/flume/api/NettyAvroRpcClient.java   |   70 +++++++-
 .../flume/api/RpcClientConfigurationConstants.java |    9 +
 .../java/org/apache/flume/api/RpcTestUtils.java    |   94 ++++++++-
 .../apache/flume/api/TestNettyAvroRpcClient.java   |   90 +++++++++
 9 files changed, 578 insertions(+), 72 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/e72e559b/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractRpcSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractRpcSink.java b/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractRpcSink.java
index 52bd49b..f5699e4 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractRpcSink.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractRpcSink.java
@@ -35,6 +35,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.List;
+import java.util.Map.Entry;
 import java.util.Properties;
 
 /**
@@ -103,6 +104,19 @@ import java.util.Properties;
  * <td>milliseconds (long)</td>
  * <td>20000</td>
  * </tr>
+ * <tr>
+ * <td><tt>compression-type</tt></td>
+ * <td>Select compression type.  Default is "none" and the only compression type available is "deflate"</td>
+ * <td>compression type</td>
+ * <td>none</td>
+ * </tr>
+ * <tr>
+ * <td><tt>compression-level</tt></td>
+ * <td>In the case compression type is "deflate" this value can be between 0-9.  0 being no compression and
+ * 1-9 is compression.  The higher the number the better the compression.  6 is the default.</td>
+ * <td>compression level</td>
+ * <td>6</td>
+ * </tr>
  * </table>
  * <p>
  * <b>Metrics</b>
@@ -141,24 +155,8 @@ public abstract class AbstractRpcSink extends AbstractSink
     clientProps.setProperty(RpcClientConfigurationConstants.CONFIG_HOSTS_PREFIX +
         "h1", hostname + ":" + port);
 
-    Integer batchSize = context.getInteger("batch-size");
-    if (batchSize != null) {
-      clientProps.setProperty(RpcClientConfigurationConstants.CONFIG_BATCH_SIZE,
-          String.valueOf(batchSize));
-    }
-
-    Long connectTimeout = context.getLong("connect-timeout");
-    if (connectTimeout != null) {
-      clientProps.setProperty(
-          RpcClientConfigurationConstants.CONFIG_CONNECT_TIMEOUT,
-          String.valueOf(connectTimeout));
-    }
-
-    Long requestTimeout = context.getLong("request-timeout");
-    if (requestTimeout != null) {
-      clientProps.setProperty(
-          RpcClientConfigurationConstants.CONFIG_REQUEST_TIMEOUT,
-          String.valueOf(requestTimeout));
+    for (Entry<String, String> entry: context.getParameters().entrySet()) {
+      clientProps.setProperty(entry.getKey(), entry.getValue());
     }
 
     if (sinkCounter == null) {

http://git-wip-us.apache.org/repos/asf/flume/blob/e72e559b/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java
index dc18c5d..517d545 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java
@@ -46,7 +46,12 @@ import org.apache.flume.instrumentation.SourceCounter;
 import org.apache.flume.source.avro.AvroFlumeEvent;
 import org.apache.flume.source.avro.AvroSourceProtocol;
 import org.apache.flume.source.avro.Status;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.ChannelPipeline;
 import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.handler.codec.compression.ZlibDecoder;
+import org.jboss.netty.handler.codec.compression.ZlibEncoder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -115,8 +120,10 @@ public class AvroSource extends AbstractSource implements EventDrivenSource,
 
   private static final String PORT_KEY = "port";
   private static final String BIND_KEY = "bind";
+  private static final String COMPRESSION_TYPE = "compression-type";
   private int port;
   private String bindAddress;
+  private String compressionType;
 
   private Server server;
   private SourceCounter sourceCounter;
@@ -130,6 +137,8 @@ public class AvroSource extends AbstractSource implements EventDrivenSource,
 
     port = context.getInteger(PORT_KEY);
     bindAddress = context.getString(BIND_KEY);
+    compressionType = context.getString(COMPRESSION_TYPE, "none");
+
     try {
       maxThreads = context.getInteger(THREADS, 0);
     } catch (NumberFormatException e) {
@@ -147,15 +156,14 @@ public class AvroSource extends AbstractSource implements EventDrivenSource,
     logger.info("Starting {}...", this);
 
     Responder responder = new SpecificResponder(AvroSourceProtocol.class, this);
-    if(maxThreads <= 0) {
-      server = new NettyServer(responder,
-              new InetSocketAddress(bindAddress, port));
-    } else {
-      server = new NettyServer(responder, new InetSocketAddress(bindAddress, port),
-              new NioServerSocketChannelFactory(
-                      Executors.newCachedThreadPool(),
-                      Executors.newFixedThreadPool(maxThreads)));
-    }
+
+    NioServerSocketChannelFactory socketChannelFactory = initSocketChannelFactory();
+
+    ChannelPipelineFactory pipelineFactory = initChannelPipelineFactory();
+
+    server = new NettyServer(responder, new InetSocketAddress(bindAddress, port),
+          socketChannelFactory, pipelineFactory, null);
+
     connectionCountUpdater = Executors.newSingleThreadScheduledExecutor();
     server.start();
     sourceCounter.start();
@@ -173,6 +181,34 @@ public class AvroSource extends AbstractSource implements EventDrivenSource,
     logger.info("Avro source {} started.", getName());
   }
 
+  private NioServerSocketChannelFactory initSocketChannelFactory() {
+    NioServerSocketChannelFactory socketChannelFactory;
+    if (maxThreads <= 0) {
+      socketChannelFactory = new NioServerSocketChannelFactory
+          (Executors .newCachedThreadPool(), Executors.newCachedThreadPool());
+    } else {
+      socketChannelFactory = new NioServerSocketChannelFactory(
+          Executors.newCachedThreadPool(),
+          Executors.newFixedThreadPool(maxThreads));
+    }
+    return socketChannelFactory;
+  }
+
+  private ChannelPipelineFactory initChannelPipelineFactory() {
+    ChannelPipelineFactory pipelineFactory;
+    if (compressionType.equalsIgnoreCase("deflate")) {
+      pipelineFactory = new CompressionChannelPipelineFactory();
+    } else {
+      pipelineFactory = new ChannelPipelineFactory() {
+        @Override
+        public ChannelPipeline getPipeline() throws Exception {
+          return Channels.pipeline();
+        }
+      };
+    }
+    return pipelineFactory;
+  }
+
   @Override
   public void stop() {
     logger.info("Avro source {} stopping: {}", getName(), this);
@@ -276,4 +312,17 @@ public class AvroSource extends AbstractSource implements EventDrivenSource,
 
     return Status.OK;
   }
+
+  private static class CompressionChannelPipelineFactory implements
+  ChannelPipelineFactory {
+
+    @Override
+    public ChannelPipeline getPipeline() throws Exception {
+      ChannelPipeline pipeline = Channels.pipeline();
+      ZlibEncoder encoder = new ZlibEncoder(6);
+      pipeline.addFirst("deflater", encoder);
+      pipeline.addFirst("inflater", new ZlibDecoder());
+      return pipeline;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/e72e559b/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java b/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java
index b9e59ef..3b1c8db 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java
@@ -19,8 +19,11 @@
 
 package org.apache.flume.sink;
 
+import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
 import java.nio.charset.Charset;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicLong;
@@ -28,22 +31,29 @@ import java.util.concurrent.atomic.AtomicLong;
 import com.google.common.base.Charsets;
 import org.apache.avro.AvroRemoteException;
 import org.apache.avro.ipc.NettyServer;
+import org.apache.avro.ipc.NettyTransceiver;
 import org.apache.avro.ipc.Server;
+import org.apache.avro.ipc.specific.SpecificRequestor;
 import org.apache.avro.ipc.specific.SpecificResponder;
 import org.apache.flume.Channel;
+import org.apache.flume.ChannelSelector;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
 import org.apache.flume.EventDeliveryException;
 import org.apache.flume.Sink;
 import org.apache.flume.Transaction;
+import org.apache.flume.channel.ChannelProcessor;
 import org.apache.flume.channel.MemoryChannel;
+import org.apache.flume.channel.ReplicatingChannelSelector;
 import org.apache.flume.conf.Configurables;
 import org.apache.flume.event.EventBuilder;
 import org.apache.flume.lifecycle.LifecycleController;
 import org.apache.flume.lifecycle.LifecycleState;
+import org.apache.flume.source.AvroSource;
 import org.apache.flume.source.avro.AvroFlumeEvent;
 import org.apache.flume.source.avro.AvroSourceProtocol;
 import org.apache.flume.source.avro.Status;
+import org.jboss.netty.channel.ChannelException;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -60,8 +70,13 @@ public class TestAvroSink {
   private AvroSink sink;
   private Channel channel;
 
-  @Before
+
   public void setUp() {
+    setUp("none", 0);
+  }
+
+  public void setUp(String compressionType, int compressionLevel) {
+    if (sink != null) { throw new RuntimeException("double setup");}
     sink = new AvroSink();
     channel = new MemoryChannel();
 
@@ -72,6 +87,10 @@ public class TestAvroSink {
     context.put("batch-size", String.valueOf(2));
     context.put("connect-timeout", String.valueOf(2000L));
     context.put("request-timeout", String.valueOf(3000L));
+    if (compressionType.equals("deflate")) {
+      context.put("compression-type", compressionType);
+      context.put("compression-level", Integer.toString(compressionLevel));
+    }
 
     sink.setChannel(channel);
 
@@ -82,6 +101,7 @@ public class TestAvroSink {
   @Test
   public void testLifecycle() throws InterruptedException,
       InstantiationException, IllegalAccessException {
+    setUp();
     Server server = createServer(new MockAvroServer());
 
     server.start();
@@ -100,6 +120,7 @@ public class TestAvroSink {
   @Test
   public void testProcess() throws InterruptedException,
       EventDeliveryException, InstantiationException, IllegalAccessException {
+    setUp();
 
     Event event = EventBuilder.withBody("test event 1", Charsets.UTF_8);
     Server server = createServer(new MockAvroServer());
@@ -136,6 +157,7 @@ public class TestAvroSink {
   @Test
   public void testTimeout() throws InterruptedException,
       EventDeliveryException, InstantiationException, IllegalAccessException {
+    setUp();
     Event event = EventBuilder.withBody("foo", Charsets.UTF_8);
     AtomicLong delay = new AtomicLong();
     Server server = createServer(new DelayMockAvroServer(delay));
@@ -192,6 +214,7 @@ public class TestAvroSink {
   public void testFailedConnect() throws InterruptedException,
       EventDeliveryException, InstantiationException, IllegalAccessException {
 
+    setUp();
     Event event = EventBuilder.withBody("test event 1",
         Charset.forName("UTF8"));
     Server server = createServer(new MockAvroServer());
@@ -243,6 +266,7 @@ public class TestAvroSink {
 
   private Server createServer(AvroSourceProtocol protocol)
       throws IllegalAccessException, InstantiationException {
+
     Server server = new NettyServer(new SpecificResponder(
         AvroSourceProtocol.class, protocol), new InetSocketAddress(
         hostname, port));
@@ -250,6 +274,126 @@ public class TestAvroSink {
     return server;
   }
 
+  @Test
+  public void testRequestWithNoCompression() throws InterruptedException, IOException, EventDeliveryException {
+
+    doRequest(false, false, 6);
+  }
+
+  @Test
+  public void testRequestWithCompressionOnClientAndServerOnLevel0() throws InterruptedException, IOException, EventDeliveryException {
+
+    doRequest(true, true, 0);
+  }
+
+  @Test
+  public void testRequestWithCompressionOnClientAndServerOnLevel1() throws InterruptedException, IOException, EventDeliveryException {
+
+    doRequest(true, true, 1);
+  }
+
+  @Test
+  public void testRequestWithCompressionOnClientAndServerOnLevel6() throws InterruptedException, IOException, EventDeliveryException {
+
+    doRequest(true, true, 6);
+  }
+
+  @Test
+  public void testRequestWithCompressionOnClientAndServerOnLevel9() throws InterruptedException, IOException, EventDeliveryException {
+
+    doRequest(true, true, 9);
+  }
+
+  private void doRequest(boolean serverEnableCompression, boolean clientEnableCompression, int compressionLevel) throws InterruptedException, IOException, EventDeliveryException {
+
+    if (clientEnableCompression) {
+      setUp("deflate", compressionLevel);
+    } else {
+      setUp("none", compressionLevel);
+    }
+
+    boolean bound = false;
+
+    AvroSource source;
+    Channel sourceChannel;
+    int selectedPort;
+
+    source = new AvroSource();
+    sourceChannel = new MemoryChannel();
+
+    Configurables.configure(sourceChannel, new Context());
+
+    List<Channel> channels = new ArrayList<Channel>();
+    channels.add(sourceChannel);
+
+    ChannelSelector rcs = new ReplicatingChannelSelector();
+    rcs.setChannels(channels);
+
+    source.setChannelProcessor(new ChannelProcessor(rcs));
+
+    Context context = new Context();
+    context.put("port", port.toString());
+    context.put("bind", hostname);
+    context.put("threads", "50");
+    if (serverEnableCompression) {
+      context.put("compression-type", "deflate");
+    } else {
+      context.put("compression-type", "none");
+    }
+
+    Configurables.configure(source, context);
+
+    source.start();
+
+    Assert
+        .assertTrue("Reached start or error", LifecycleController.waitForOneOf(
+            source, LifecycleState.START_OR_ERROR));
+    Assert.assertEquals("Server is started", LifecycleState.START,
+        source.getLifecycleState());
+
+
+    Event event = EventBuilder.withBody("Hello avro",
+        Charset.forName("UTF8"));
+
+    sink.start();
+
+    Transaction sickTransaction = channel.getTransaction();
+
+    sickTransaction.begin();
+    for (int i = 0; i < 10; i++) {
+      channel.put(event);
+    }
+    sickTransaction.commit();
+    sickTransaction.close();
+
+    for (int i = 0; i < 5; i++) {
+      Sink.Status status = sink.process();
+      logger.debug("Calling Process " + i + " times:" + status);
+      Assert.assertEquals(Sink.Status.READY, status);
+    }
+
+    sink.stop();
+
+
+    Transaction sourceTransaction = sourceChannel.getTransaction();
+    sourceTransaction.begin();
+
+    Event sourceEvent = sourceChannel.take();
+    Assert.assertNotNull(sourceEvent);
+    Assert.assertEquals("Channel contained our event", "Hello avro",
+        new String(sourceEvent.getBody()));
+    sourceTransaction.commit();
+    sourceTransaction.close();
+
+    logger.debug("Round trip event:{}", sourceEvent);
+
+    source.stop();
+    Assert.assertTrue("Reached stop or error",
+        LifecycleController.waitForOneOf(source, LifecycleState.STOP_OR_ERROR));
+    Assert.assertEquals("Server is stopped", LifecycleState.STOP,
+        source.getLifecycleState());
+  }
+
   private static class MockAvroServer implements AvroSourceProtocol {
 
     @Override

http://git-wip-us.apache.org/repos/asf/flume/blob/e72e559b/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java b/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java
index 4bf36e6..c699241 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java
@@ -25,6 +25,7 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
+import java.util.concurrent.Executor;
 
 import org.apache.avro.ipc.NettyTransceiver;
 import org.apache.avro.ipc.specific.SpecificRequestor;
@@ -43,6 +44,11 @@ import org.apache.flume.source.avro.AvroFlumeEvent;
 import org.apache.flume.source.avro.AvroSourceProtocol;
 import org.apache.flume.source.avro.Status;
 import org.jboss.netty.channel.ChannelException;
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.socket.SocketChannel;
+import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
+import org.jboss.netty.handler.codec.compression.ZlibDecoder;
+import org.jboss.netty.handler.codec.compression.ZlibEncoder;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -111,16 +117,61 @@ public class TestAvroSource {
   }
 
   @Test
-  public void testRequest() throws InterruptedException, IOException {
+  public void testRequestWithNoCompression() throws InterruptedException, IOException {
+
+    doRequest(false, false, 6);
+  }
+
+  @Test
+  public void testRequestWithCompressionOnClientAndServerOnLevel0() throws InterruptedException, IOException {
+
+    doRequest(true, true, 0);
+  }
+
+  @Test
+  public void testRequestWithCompressionOnClientAndServerOnLevel1() throws InterruptedException, IOException {
+
+    doRequest(true, true, 1);
+  }
+
+  @Test
+  public void testRequestWithCompressionOnClientAndServerOnLevel6() throws InterruptedException, IOException {
+
+    doRequest(true, true, 6);
+  }
+
+  @Test
+  public void testRequestWithCompressionOnClientAndServerOnLevel9() throws InterruptedException, IOException {
+
+    doRequest(true, true, 9);
+  }
+
+  @Test(expected=org.apache.avro.AvroRemoteException.class)
+  public void testRequestWithCompressionOnServerOnly() throws InterruptedException, IOException {
+    //This will fail because both client and server need compression on
+    doRequest(true, false, 6);
+  }
+
+  @Test(expected=org.apache.avro.AvroRemoteException.class)
+  public void testRequestWithCompressionOnClientOnly() throws InterruptedException, IOException {
+    //This will fail because both client and server need compression on
+    doRequest(false, true, 6);
+  }
+
+  private void doRequest(boolean serverEnableCompression, boolean clientEnableCompression, int compressionLevel) throws InterruptedException, IOException {
     boolean bound = false;
 
     for (int i = 0; i < 100 && !bound; i++) {
       try {
         Context context = new Context();
-
         context.put("port", String.valueOf(selectedPort = 41414 + i));
         context.put("bind", "0.0.0.0");
         context.put("threads", "50");
+        if (serverEnableCompression) {
+          context.put("compression-type", "deflate");
+        } else {
+          context.put("compression-type", "none");
+        }
 
         Configurables.configure(source, context);
 
@@ -140,9 +191,16 @@ public class TestAvroSource {
     Assert.assertEquals("Server is started", LifecycleState.START,
         source.getLifecycleState());
 
-    AvroSourceProtocol client = SpecificRequestor.getClient(
-        AvroSourceProtocol.class, new NettyTransceiver(new InetSocketAddress(
-            selectedPort)));
+    AvroSourceProtocol client;
+    if (clientEnableCompression) {
+      client = SpecificRequestor.getClient(
+          AvroSourceProtocol.class, new NettyTransceiver(new InetSocketAddress(
+              selectedPort), new CompressionChannelFactory(6)));
+    } else {
+      client = SpecificRequestor.getClient(
+          AvroSourceProtocol.class, new NettyTransceiver(new InetSocketAddress(
+              selectedPort)));
+    }
 
     AvroFlumeEvent avroEvent = new AvroFlumeEvent();
 
@@ -172,4 +230,28 @@ public class TestAvroSource {
         source.getLifecycleState());
   }
 
+
+  private static class CompressionChannelFactory extends
+      NioClientSocketChannelFactory {
+    private int compressionLevel;
+
+    public CompressionChannelFactory( int compressionLevel) {
+      super();
+      this.compressionLevel = compressionLevel;
+    }
+
+    @Override
+    public SocketChannel newChannel(ChannelPipeline pipeline) {
+      try {
+
+        ZlibEncoder encoder = new ZlibEncoder(compressionLevel);
+        pipeline.addFirst("deflater", encoder);
+        pipeline.addFirst("inflater", new ZlibDecoder());
+        return super.newChannel(pipeline);
+      } catch (Exception ex) {
+        throw new RuntimeException("Cannot create Compression channel", ex);
+      }
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/e72e559b/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index 01067d1..f9088f9 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -652,7 +652,7 @@ be written to the default channels and will be attempted to be written to the
 optional channels for that header. Specifying optional channels will still cause
 the event to be written to the default channels, if no required channels are
 specified. If no channels are designated as default and there are no required,
- the selector will attempt to write the events to the optional channels. Any
+the selector will attempt to write the events to the optional channels. Any
 failures are simply ignored in that case.
 
 
@@ -667,19 +667,20 @@ When paired with the built-in AvroSink on another (previous hop) Flume agent,
 it can create tiered collection topologies.
 Required properties are in **bold**.
 
-==============  ===========  ===================================================
-Property Name   Default      Description
-==============  ===========  ===================================================
-**channels**    --
-**type**        --           The component type name, needs to be ``avro``
-**bind**        --           hostname or IP address to listen on
-**port**        --           Port # to bind to
-threads         --           Maximum number of worker threads to spawn
+==================   ===========  ===================================================
+Property Name        Default      Description
+==================   ===========  ===================================================
+**channels**         --
+**type**             --           The component type name, needs to be ``avro``
+**bind**             --           hostname or IP address to listen on
+**port**             --           Port # to bind to
+threads              --           Maximum number of worker threads to spawn
 selector.type
 selector.*
-interceptors    --           Space separated list of interceptors
+interceptors         --           Space separated list of interceptors
 interceptors.*
-==============  ===========  ===================================================
+compression-type     none         This can be "none" or "deflate".  The compression-type must match the compression-type of matching AvroSource
+==================   ===========  ===================================================
 
 Example for agent named a1:
 
@@ -1440,18 +1441,19 @@ hostname / port pair. The events are taken from the configured Channel in
 batches of the configured batch size.
 Required properties are in **bold**.
 
-===============  =======  ==============================================
-Property Name    Default  Description
-===============  =======  ==============================================
-**channel**      --
-**type**         --       The component type name, needs to be ``avro``.
-**hostname**     --       The hostname or IP address to bind to.
-**port**         --       The port # to listen on.
-batch-size       100      number of event to batch together for send.
-connect-timeout  20000    Amount of time (ms) to allow for the first (handshake) request.
-request-timeout  20000    Amount of time (ms) to allow for requests after the first.
-
-===============  =======  ==============================================
+===================   =======  ==============================================
+Property Name         Default  Description
+===================   =======  ==============================================
+**channel**           --
+**type**              --       The component type name, needs to be ``avro``.
+**hostname**          --       The hostname or IP address to bind to.
+**port**              --       The port # to listen on.
+batch-size            100      number of event to batch together for send.
+connect-timeout       20000    Amount of time (ms) to allow for the first (handshake) request.
+request-timeout       20000    Amount of time (ms) to allow for requests after the first.
+compression-type      none     This can be "none" or "deflate".  The compression-type must match the compression-type of matching AvroSource
+compression-level     6	       The level of compression to compress event. 0 = no compression and 1-9 is compression.  The higher the number the more compression
+===================   =======  ==============================================
 
 Example for agent named a1:
 

http://git-wip-us.apache.org/repos/asf/flume/blob/e72e559b/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java
----------------------------------------------------------------------
diff --git a/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java b/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java
index cf9724c..8285129 100644
--- a/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java
+++ b/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java
@@ -30,6 +30,7 @@ import java.util.Properties;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -51,7 +52,12 @@ import org.apache.flume.FlumeException;
 import org.apache.flume.source.avro.AvroFlumeEvent;
 import org.apache.flume.source.avro.AvroSourceProtocol;
 import org.apache.flume.source.avro.Status;
+import org.jboss.netty.channel.ChannelFactory;
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.socket.SocketChannel;
 import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
+import org.jboss.netty.handler.codec.compression.ZlibDecoder;
+import org.jboss.netty.handler.codec.compression.ZlibEncoder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -77,6 +83,8 @@ implements RpcClient {
   private AvroSourceProtocol.Callback avroClient;
   private static final Logger logger = LoggerFactory
       .getLogger(NettyAvroRpcClient.class);
+  private boolean enableDeflateCompression;
+  private int compressionLevel;
 
   /**
    * This constructor is intended to be called from {@link RpcClientFactory}.
@@ -103,12 +111,25 @@ implements RpcClient {
     callTimeoutPool = Executors.newCachedThreadPool(
         new TransceiverThreadFactory("Flume Avro RPC Client Call Invoker"));
     try {
+
+      NioClientSocketChannelFactory socketChannelFactory;
+
+      if (enableDeflateCompression) {
+        socketChannelFactory = new CompressionChannelFactory(
+            Executors.newCachedThreadPool(new TransceiverThreadFactory(
+                "Avro " + NettyTransceiver.class.getSimpleName() + " Boss")),
+            Executors.newCachedThreadPool(new TransceiverThreadFactory(
+                "Avro " + NettyTransceiver.class.getSimpleName() + " I/O Worker")), compressionLevel);
+      } else {
+        socketChannelFactory = new NioClientSocketChannelFactory(
+            Executors.newCachedThreadPool(new TransceiverThreadFactory(
+                "Avro " + NettyTransceiver.class.getSimpleName() + " Boss")),
+            Executors.newCachedThreadPool(new TransceiverThreadFactory(
+                "Avro " + NettyTransceiver.class.getSimpleName() + " I/O Worker")));
+      }
+
       transceiver = new NettyTransceiver(this.address,
-          new NioClientSocketChannelFactory(
-        Executors.newCachedThreadPool(new TransceiverThreadFactory(
-            "Avro " + NettyTransceiver.class.getSimpleName() + " Boss")),
-        Executors.newCachedThreadPool(new TransceiverThreadFactory(
-            "Avro " + NettyTransceiver.class.getSimpleName() + " I/O Worker"))),
+          socketChannelFactory,
           tu.toMillis(timeout));
       avroClient =
           SpecificRequestor.getClient(AvroSourceProtocol.Callback.class,
@@ -511,6 +532,21 @@ implements RpcClient {
       }
     }
 
+    String enableCompressionStr = properties.getProperty(RpcClientConfigurationConstants.CONFIG_COMPRESSION_TYPE);
+    if (enableCompressionStr != null && enableCompressionStr.equalsIgnoreCase("deflate")) {
+      this.enableDeflateCompression = true;
+      String compressionLvlStr = properties.getProperty(RpcClientConfigurationConstants.CONFIG_COMPRESSION_LEVEL);
+      compressionLevel = RpcClientConfigurationConstants.DEFAULT_COMPRESSION_LEVEL;
+      if (compressionLvlStr != null) {
+        try {
+          compressionLevel = Integer.parseInt(compressionLvlStr);
+        } catch (NumberFormatException ex) {
+          logger.error("Invalid compression level: " + compressionLvlStr);
+        }
+      }
+    }
+
+
     this.connect();
   }
 
@@ -545,4 +581,28 @@ implements RpcClient {
       return thread;
     }
   }
+
+  private static class CompressionChannelFactory extends
+      NioClientSocketChannelFactory {
+    private int compressionLevel;
+
+    public CompressionChannelFactory(
+        Executor bossExecutor, Executor workerExecutor, int compressionLevel) {
+      super(bossExecutor, workerExecutor);
+      this.compressionLevel = compressionLevel;
+    }
+
+    @Override
+    public SocketChannel newChannel(ChannelPipeline pipeline) {
+      try {
+
+        ZlibEncoder encoder = new ZlibEncoder(compressionLevel);
+        pipeline.addFirst("deflater", encoder);
+        pipeline.addFirst("inflater", new ZlibDecoder());
+        return super.newChannel(pipeline);
+      } catch (Exception ex) {
+        throw new RuntimeException("Cannot create Compression channel", ex);
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/e72e559b/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientConfigurationConstants.java
----------------------------------------------------------------------
diff --git a/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientConfigurationConstants.java b/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientConfigurationConstants.java
index 1e642d8..34d73a3 100644
--- a/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientConfigurationConstants.java
+++ b/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientConfigurationConstants.java
@@ -126,6 +126,15 @@ public final class RpcClientConfigurationConstants {
   public static final String CONFIG_CONNECTION_POOL_SIZE = "maxConnections";
   public static final int DEFAULT_CONNECTION_POOL_SIZE = 5;
 
+  /**
+   * The following are const for the NettyAvro Client.  To enable compression
+   * and set a compression level
+   */
+  public static final String CONFIG_COMPRESSION_TYPE = "compression-type";
+  public static final String CONFIG_COMPRESSION_LEVEL = "compression-level";
+  public static final int DEFAULT_COMPRESSION_LEVEL = 6;
+
+
   private RpcClientConfigurationConstants() {
     // disable explicit object creation
   }

http://git-wip-us.apache.org/repos/asf/flume/blob/e72e559b/flume-ng-sdk/src/test/java/org/apache/flume/api/RpcTestUtils.java
----------------------------------------------------------------------
diff --git a/flume-ng-sdk/src/test/java/org/apache/flume/api/RpcTestUtils.java b/flume-ng-sdk/src/test/java/org/apache/flume/api/RpcTestUtils.java
index 5042d11..8806860 100644
--- a/flume-ng-sdk/src/test/java/org/apache/flume/api/RpcTestUtils.java
+++ b/flume-ng-sdk/src/test/java/org/apache/flume/api/RpcTestUtils.java
@@ -23,6 +23,8 @@ import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Properties;
+import java.util.concurrent.Executors;
+
 import junit.framework.Assert;
 import org.apache.avro.AvroRemoteException;
 import org.apache.avro.ipc.NettyServer;
@@ -36,6 +38,12 @@ import org.apache.flume.event.EventBuilder;
 import org.apache.flume.source.avro.AvroFlumeEvent;
 import org.apache.flume.source.avro.AvroSourceProtocol;
 import org.apache.flume.source.avro.Status;
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
+import org.jboss.netty.handler.codec.compression.ZlibDecoder;
+import org.jboss.netty.handler.codec.compression.ZlibEncoder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -58,10 +66,28 @@ public class RpcTestUtils {
    */
   public static void handlerSimpleAppendTest(AvroSourceProtocol handler)
       throws FlumeException, EventDeliveryException {
+    handlerSimpleAppendTest(handler, false, false, 0);
+  }
+
+  /**
+   * Helper method for testing simple (single) with compression level 6 appends on handlers
+   * @param handler
+   * @throws FlumeException
+   * @throws EventDeliveryException
+   */
+  public static void handlerSimpleAppendTest(AvroSourceProtocol handler, boolean enableServerCompression, boolean enableClientCompression, int compressionLevel)
+      throws FlumeException, EventDeliveryException {
     NettyAvroRpcClient client = null;
-    Server server = startServer(handler);
+    Server server = startServer(handler, 0, enableServerCompression);
     try {
-      client = getStockLocalClient(server.getPort());
+      Properties starterProp = new Properties();
+      if (enableClientCompression) {
+        starterProp.setProperty(RpcClientConfigurationConstants.CONFIG_COMPRESSION_TYPE, "deflate");
+        starterProp.setProperty(RpcClientConfigurationConstants.CONFIG_COMPRESSION_LEVEL, "" + compressionLevel);
+      } else {
+        starterProp.setProperty(RpcClientConfigurationConstants.CONFIG_COMPRESSION_TYPE, "none");
+      }
+      client = getStockLocalClient(server.getPort(), starterProp);
       boolean isActive = client.isActive();
       Assert.assertTrue("Client should be active", isActive);
       client.append(EventBuilder.withBody("wheee!!!", Charset.forName("UTF8")));
@@ -71,18 +97,32 @@ public class RpcTestUtils {
     }
   }
 
+  public static void handlerBatchAppendTest(AvroSourceProtocol handler)
+      throws FlumeException, EventDeliveryException {
+    handlerBatchAppendTest(handler, false, false, 0);
+  }
+
   /**
    * Helper method for testing batch appends on handlers
    * @param handler
    * @throws FlumeException
    * @throws EventDeliveryException
    */
-  public static void handlerBatchAppendTest(AvroSourceProtocol handler)
+  public static void handlerBatchAppendTest(AvroSourceProtocol handler, boolean enableServerCompression, boolean enableClientCompression, int compressionLevel)
       throws FlumeException, EventDeliveryException {
     NettyAvroRpcClient client = null;
-    Server server = startServer(handler);
+    Server server = startServer(handler, 0 , enableServerCompression);
     try {
-      client = getStockLocalClient(server.getPort());
+
+      Properties starterProp = new Properties();
+      if (enableClientCompression) {
+        starterProp.setProperty(RpcClientConfigurationConstants.CONFIG_COMPRESSION_TYPE, "deflate");
+        starterProp.setProperty(RpcClientConfigurationConstants.CONFIG_COMPRESSION_LEVEL, "" + compressionLevel);
+      } else {
+        starterProp.setProperty(RpcClientConfigurationConstants.CONFIG_COMPRESSION_TYPE, "none");
+      }
+
+      client = getStockLocalClient(server.getPort(), starterProp);
       boolean isActive = client.isActive();
       Assert.assertTrue("Client should be active", isActive);
 
@@ -104,11 +144,16 @@ public class RpcTestUtils {
    */
   public static NettyAvroRpcClient getStockLocalClient(int port) {
     Properties props = new Properties();
-    props.setProperty(RpcClientConfigurationConstants.CONFIG_HOSTS, "h1");
-    props.setProperty(RpcClientConfigurationConstants.CONFIG_HOSTS_PREFIX + "h1",
+
+    return getStockLocalClient(port, props);
+  }
+
+  public static NettyAvroRpcClient getStockLocalClient(int port, Properties starterProp) {
+    starterProp.setProperty(RpcClientConfigurationConstants.CONFIG_HOSTS, "h1");
+    starterProp.setProperty(RpcClientConfigurationConstants.CONFIG_HOSTS_PREFIX + "h1",
         "127.0.0.1" + ":" + port);
     NettyAvroRpcClient client = new NettyAvroRpcClient();
-    client.configure(props);
+    client.configure(starterProp);
 
     return client;
   }
@@ -116,11 +161,20 @@ public class RpcTestUtils {
   /**
    * Start a NettyServer, wait a moment for it to spin up, and return it.
    */
-  public static Server startServer(AvroSourceProtocol handler, int port) {
+  public static Server startServer(AvroSourceProtocol handler, int port, boolean enableCompression) {
     Responder responder = new SpecificResponder(AvroSourceProtocol.class,
         handler);
-    Server server = new NettyServer(responder,
+    Server server;
+    if (enableCompression) {
+      server = new NettyServer(responder,
+          new InetSocketAddress(localhost, port),
+          new NioServerSocketChannelFactory
+          (Executors .newCachedThreadPool(), Executors.newCachedThreadPool()),
+          new CompressionChannelPipelineFactory(), null);
+    } else {
+      server = new NettyServer(responder,
         new InetSocketAddress(localhost, port));
+    }
     server.start();
     logger.info("Server started on hostname: {}, port: {}",
         new Object[] { localhost, Integer.toString(server.getPort()) });
@@ -138,9 +192,14 @@ public class RpcTestUtils {
   }
 
   public static Server startServer(AvroSourceProtocol handler) {
-    return startServer(handler, 0);
+    return startServer(handler, 0, false);
+  }
+
+  public static Server startServer(AvroSourceProtocol handler, int port) {
+    return startServer(handler, port, false);
   }
 
+
   /**
    * Request that the specified Server stop, and attempt to wait for it to exit.
    * @param server A running NettyServer
@@ -297,4 +356,17 @@ public class RpcTestUtils {
 
   }
 
+  private static class CompressionChannelPipelineFactory implements
+  ChannelPipelineFactory {
+
+    @Override
+    public ChannelPipeline getPipeline() throws Exception {
+      ChannelPipeline pipeline = Channels.pipeline();
+      ZlibEncoder encoder = new ZlibEncoder(6);
+      pipeline.addFirst("deflater", encoder);
+      pipeline.addFirst("inflater", new ZlibDecoder());
+      return pipeline;
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/e72e559b/flume-ng-sdk/src/test/java/org/apache/flume/api/TestNettyAvroRpcClient.java
----------------------------------------------------------------------
diff --git a/flume-ng-sdk/src/test/java/org/apache/flume/api/TestNettyAvroRpcClient.java b/flume-ng-sdk/src/test/java/org/apache/flume/api/TestNettyAvroRpcClient.java
index 99ae010..1e6d2b2 100644
--- a/flume-ng-sdk/src/test/java/org/apache/flume/api/TestNettyAvroRpcClient.java
+++ b/flume-ng-sdk/src/test/java/org/apache/flume/api/TestNettyAvroRpcClient.java
@@ -62,6 +62,52 @@ public class TestNettyAvroRpcClient {
   }
 
   /**
+   * Simple request with compression on the server and client with compression level 6
+   * @throws FlumeException
+   * @throws EventDeliveryException
+   */
+  @Test
+  public void testOKServerSimpleCompressionLevel6() throws FlumeException,
+      EventDeliveryException {
+    RpcTestUtils.handlerSimpleAppendTest(new OKAvroHandler(), true, true, 6);
+  }
+
+  /**
+   * Simple request with compression on the server and client with compression level 0
+   *
+   * Compression level 0 = no compression
+   * @throws FlumeException
+   * @throws EventDeliveryException
+   */
+  @Test
+  public void testOKServerSimpleCompressionLevel0() throws FlumeException,
+      EventDeliveryException {
+    RpcTestUtils.handlerSimpleAppendTest(new OKAvroHandler(), true, true, 0);
+  }
+
+  /**
+   * Simple request with compression on the client only
+   * @throws FlumeException
+   * @throws EventDeliveryException
+   */
+  @Test(expected=org.apache.flume.EventDeliveryException.class)
+  public void testOKServerSimpleCompressionClientOnly() throws FlumeException,
+      EventDeliveryException {
+    RpcTestUtils.handlerSimpleAppendTest(new OKAvroHandler(), false, true, 6);
+  }
+
+  /**
+   * Simple request with compression on the server only
+   * @throws FlumeException
+   * @throws EventDeliveryException
+   */
+  @Test(expected=org.apache.flume.EventDeliveryException.class)
+  public void testOKServerSimpleCompressionServerOnly() throws FlumeException,
+      EventDeliveryException {
+    RpcTestUtils.handlerSimpleAppendTest(new OKAvroHandler(), true, false, 6);
+  }
+
+  /**
    * Simple batch request
    * @throws FlumeException
    * @throws EventDeliveryException
@@ -73,6 +119,50 @@ public class TestNettyAvroRpcClient {
   }
 
   /**
+   * Simple batch request with compression deflate level 0
+   * @throws FlumeException
+   * @throws EventDeliveryException
+   */
+  @Test
+  public void testOKServerBatchCompressionLevel0() throws FlumeException,
+      EventDeliveryException {
+    RpcTestUtils.handlerBatchAppendTest(new OKAvroHandler(), true, true, 0);
+  }
+
+  /**
+   * Simple batch request with compression deflate level 6
+   * @throws FlumeException
+   * @throws EventDeliveryException
+   */
+  @Test
+  public void testOKServerBatchCompressionLevel6() throws FlumeException,
+      EventDeliveryException {
+    RpcTestUtils.handlerBatchAppendTest(new OKAvroHandler(), true, true, 6);
+  }
+
+  /**
+   * Simple batch request where the server only is using compression
+   * @throws FlumeException
+   * @throws EventDeliveryException
+   */
+  @Test(expected=org.apache.flume.EventDeliveryException.class)
+  public void testOKServerBatchCompressionServerOnly() throws FlumeException,
+      EventDeliveryException {
+    RpcTestUtils.handlerBatchAppendTest(new OKAvroHandler(), true, false, 6);
+  }
+
+  /**
+   * Simple batch request where the client only is using compression
+   * @throws FlumeException
+   * @throws EventDeliveryException
+   */
+  @Test(expected=org.apache.flume.EventDeliveryException.class)
+  public void testOKServerBatchCompressionClientOnly() throws FlumeException,
+      EventDeliveryException {
+    RpcTestUtils.handlerBatchAppendTest(new OKAvroHandler(), false, true, 6);
+  }
+
+  /**
    * Try to connect to a closed port.
    * Note: this test tries to connect to port 1 on localhost.
    * @throws FlumeException