You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by ju...@apache.org on 2013/03/14 07:01:26 UTC

git commit: FLUME-1926. Optionally timeout Avro Sink Rpc Clients to avoid stickiness

Updated Branches:
  refs/heads/flume-1.4 113dcdca6 -> 3dc9069a3


FLUME-1926. Optionally timeout Avro Sink Rpc Clients to avoid stickiness

(Hari Shreedharan via Juhani Connolly)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/3dc9069a
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/3dc9069a
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/3dc9069a

Branch: refs/heads/flume-1.4
Commit: 3dc9069a3c28af6174ca659469345a28a99f91a2
Parents: 113dcdc
Author: Juhani Connolly <ju...@cyberagent.co.jp>
Authored: Thu Mar 14 15:00:28 2013 +0900
Committer: Juhani Connolly <ju...@cyberagent.co.jp>
Committed: Thu Mar 14 15:00:28 2013 +0900

----------------------------------------------------------------------
 .../org/apache/flume/sink/AbstractRpcSink.java     |   40 ++++++++++
 .../java/org/apache/flume/sink/TestAvroSink.java   |   61 +++++++++++++++
 flume-ng-doc/sphinx/FlumeUserGuide.rst             |   25 +++---
 3 files changed, 114 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/3dc9069a/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractRpcSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractRpcSink.java b/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractRpcSink.java
index f5699e4..892c949 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractRpcSink.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractRpcSink.java
@@ -18,8 +18,11 @@
  */
 package org.apache.flume.sink;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
 import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.flume.Channel;
 import org.apache.flume.ChannelException;
 import org.apache.flume.Context;
@@ -37,6 +40,9 @@ import org.slf4j.LoggerFactory;
 import java.util.List;
 import java.util.Map.Entry;
 import java.util.Properties;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 
 /**
  * This sink provides the basic RPC functionality for Flume. This sink takes
@@ -140,6 +146,11 @@ public abstract class AbstractRpcSink extends AbstractSink
   private RpcClient client;
   private Properties clientProps;
   private SinkCounter sinkCounter;
+  private int cxnResetInterval;
+  private final int DEFAULT_CXN_RESET_INTERVAL = 0;
+  private final ScheduledExecutorService cxnResetExecutor = Executors
+    .newSingleThreadScheduledExecutor(new ThreadFactoryBuilder()
+      .setNameFormat("Rpc Sink Reset Thread").build());
 
   @Override
   public void configure(Context context) {
@@ -162,6 +173,13 @@ public abstract class AbstractRpcSink extends AbstractSink
     if (sinkCounter == null) {
       sinkCounter = new SinkCounter(getName());
     }
+    cxnResetInterval = context.getInteger("reset-connection-interval",
+      DEFAULT_CXN_RESET_INTERVAL);
+    if(cxnResetInterval == DEFAULT_CXN_RESET_INTERVAL) {
+      logger.info("Connection reset is set to " + String.valueOf
+        (DEFAULT_CXN_RESET_INTERVAL) +". Will not reset connection to next " +
+        "hop");
+    }
   }
 
   /**
@@ -189,6 +207,14 @@ public abstract class AbstractRpcSink extends AbstractSink
         Preconditions.checkNotNull(client, "Rpc Client could not be " +
           "initialized. " + getName() + " could not be started");
         sinkCounter.incrementConnectionCreatedCount();
+        if (cxnResetInterval > 0) {
+          cxnResetExecutor.schedule(new Runnable() {
+            @Override
+            public void run() {
+              destroyConnection();
+            }
+          }, cxnResetInterval, TimeUnit.SECONDS);
+        }
       } catch (Exception ex) {
         sinkCounter.incrementConnectionFailedCount();
         if (ex instanceof FlumeException) {
@@ -266,6 +292,15 @@ public abstract class AbstractRpcSink extends AbstractSink
     logger.info("Rpc sink {} stopping...", getName());
 
     destroyConnection();
+    cxnResetExecutor.shutdown();
+    try {
+      if (cxnResetExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
+        cxnResetExecutor.shutdownNow();
+      }
+    } catch (Exception ex) {
+      logger.error("Interrupted while waiting for connection reset executor " +
+        "to shut down");
+    }
     sinkCounter.stop();
     super.stop();
 
@@ -338,4 +373,9 @@ public abstract class AbstractRpcSink extends AbstractSink
 
     return status;
   }
+
+  @VisibleForTesting
+  RpcClient getUnderlyingClient() {
+    return client;
+  }
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/3dc9069a/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java b/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java
index 3b1c8db..ac47ee9 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java
@@ -43,6 +43,7 @@ import org.apache.flume.EventDeliveryException;
 import org.apache.flume.Sink;
 import org.apache.flume.Transaction;
 import org.apache.flume.channel.ChannelProcessor;
+import org.apache.flume.api.RpcClient;
 import org.apache.flume.channel.MemoryChannel;
 import org.apache.flume.channel.ReplicatingChannelSelector;
 import org.apache.flume.conf.Configurables;
@@ -264,6 +265,66 @@ public class TestAvroSink {
     server.close();
   }
 
+  @Test
+  public void testReset() throws Exception {
+
+    setUp();
+    Server server = createServer(new MockAvroServer());
+
+    server.start();
+
+    Context context = new Context();
+
+    context.put("hostname", hostname);
+    context.put("port", String.valueOf(port));
+    context.put("batch-size", String.valueOf(2));
+    context.put("connect-timeout", String.valueOf(2000L));
+    context.put("request-timeout", String.valueOf(3000L));
+    context.put("reset-connection-interval", String.valueOf("5"));
+
+    sink.setChannel(channel);
+    Configurables.configure(sink, context);
+    sink.start();
+    RpcClient firstClient = sink.getUnderlyingClient();
+    Thread.sleep(6000);
+    // Make sure they are not the same object, connection should be reset
+    Assert.assertFalse(firstClient == sink.getUnderlyingClient());
+    sink.stop();
+
+    context.put("hostname", hostname);
+    context.put("port", String.valueOf(port));
+    context.put("batch-size", String.valueOf(2));
+    context.put("connect-timeout", String.valueOf(2000L));
+    context.put("request-timeout", String.valueOf(3000L));
+    context.put("reset-connection-interval", String.valueOf("0"));
+
+    sink.setChannel(channel);
+    Configurables.configure(sink, context);
+    sink.start();
+    firstClient = sink.getUnderlyingClient();
+    Thread.sleep(6000);
+    // Make sure they are the same object, since connection should not be reset
+    Assert.assertTrue(firstClient == sink.getUnderlyingClient());
+    sink.stop();
+
+    context.clear();
+    context.put("hostname", hostname);
+    context.put("port", String.valueOf(port));
+    context.put("batch-size", String.valueOf(2));
+    context.put("connect-timeout", String.valueOf(2000L));
+    context.put("request-timeout", String.valueOf(3000L));
+
+    sink.setChannel(channel);
+    Configurables.configure(sink, context);
+    sink.start();
+    firstClient = sink.getUnderlyingClient();
+    Thread.sleep(6000);
+    // Make sure they are the same object, since connection should not be reset
+    Assert.assertTrue(firstClient == sink.getUnderlyingClient());
+    sink.stop();
+    server.close();
+  }
+
   private Server createServer(AvroSourceProtocol protocol)
       throws IllegalAccessException, InstantiationException {
 

http://git-wip-us.apache.org/repos/asf/flume/blob/3dc9069a/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index d72c965..b0dcbfc 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -1453,18 +1453,19 @@ hostname / port pair. The events are taken from the configured Channel in
 batches of the configured batch size.
 Required properties are in **bold**.
 
-===================   =======  ==============================================
-Property Name         Default  Description
-===================   =======  ==============================================
-**channel**           --
-**type**              --       The component type name, needs to be ``avro``.
-**hostname**          --       The hostname or IP address to bind to.
-**port**              --       The port # to listen on.
-batch-size            100      number of event to batch together for send.
-connect-timeout       20000    Amount of time (ms) to allow for the first (handshake) request.
-request-timeout       20000    Amount of time (ms) to allow for requests after the first.
-compression-type      none     This can be "none" or "deflate".  The compression-type must match the compression-type of matching AvroSource
-compression-level     6	       The level of compression to compress event. 0 = no compression and 1-9 is compression.  The higher the number the more compression
+==========================   =======  ==============================================
+Property Name                Default  Description
+==========================   =======  ==============================================
+**channel**                  --
+**type**                     --       The component type name, needs to be ``avro``.
+**hostname**                 --       The hostname or IP address to bind to.
+**port**                     --       The port # to listen on.
+batch-size                   100      number of event to batch together for send.
+connect-timeout              20000    Amount of time (ms) to allow for the first (handshake) request.
+request-timeout              20000    Amount of time (ms) to allow for requests after the first.
+connection-reset-interval    none     Amount of time (s) before the connection to the next hop is reset. This will force the Avro Sink to reconnect to the next hop. This will allow the sink to connect to hosts behind a hardware load-balancer when news hosts are added without having to restart the agent.
+compression-type             none     This can be "none" or "deflate".  The compression-type must match the compression-type of matching AvroSource
+compression-level            6	      The level of compression to compress event. 0 = no compression and 1-9 is compression.  The higher the number the more compression
 ===================   =======  ==============================================
 
 Example for agent named a1: