You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by ju...@apache.org on 2013/03/14 07:01:26 UTC
git commit: FLUME-1926. Optionally timeout Avro Sink Rpc Clients to
avoid stickiness
Updated Branches:
refs/heads/flume-1.4 113dcdca6 -> 3dc9069a3
FLUME-1926. Optionally timeout Avro Sink Rpc Clients to avoid stickiness
(Hari Shreedharan via Juhani Connolly)
Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/3dc9069a
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/3dc9069a
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/3dc9069a
Branch: refs/heads/flume-1.4
Commit: 3dc9069a3c28af6174ca659469345a28a99f91a2
Parents: 113dcdc
Author: Juhani Connolly <ju...@cyberagent.co.jp>
Authored: Thu Mar 14 15:00:28 2013 +0900
Committer: Juhani Connolly <ju...@cyberagent.co.jp>
Committed: Thu Mar 14 15:00:28 2013 +0900
----------------------------------------------------------------------
.../org/apache/flume/sink/AbstractRpcSink.java | 40 ++++++++++
.../java/org/apache/flume/sink/TestAvroSink.java | 61 +++++++++++++++
flume-ng-doc/sphinx/FlumeUserGuide.rst | 25 +++---
3 files changed, 114 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flume/blob/3dc9069a/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractRpcSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractRpcSink.java b/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractRpcSink.java
index f5699e4..892c949 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractRpcSink.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractRpcSink.java
@@ -18,8 +18,11 @@
*/
package org.apache.flume.sink;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.flume.Channel;
import org.apache.flume.ChannelException;
import org.apache.flume.Context;
@@ -37,6 +40,9 @@ import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.Map.Entry;
import java.util.Properties;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
/**
* This sink provides the basic RPC functionality for Flume. This sink takes
@@ -140,6 +146,11 @@ public abstract class AbstractRpcSink extends AbstractSink
private RpcClient client;
private Properties clientProps;
private SinkCounter sinkCounter;
+ private int cxnResetInterval;
+ private final int DEFAULT_CXN_RESET_INTERVAL = 0;
+ private final ScheduledExecutorService cxnResetExecutor = Executors
+ .newSingleThreadScheduledExecutor(new ThreadFactoryBuilder()
+ .setNameFormat("Rpc Sink Reset Thread").build());
@Override
public void configure(Context context) {
@@ -162,6 +173,13 @@ public abstract class AbstractRpcSink extends AbstractSink
if (sinkCounter == null) {
sinkCounter = new SinkCounter(getName());
}
+ cxnResetInterval = context.getInteger("reset-connection-interval",
+ DEFAULT_CXN_RESET_INTERVAL);
+ if(cxnResetInterval == DEFAULT_CXN_RESET_INTERVAL) {
+ logger.info("Connection reset is set to " + String.valueOf
+ (DEFAULT_CXN_RESET_INTERVAL) +". Will not reset connection to next " +
+ "hop");
+ }
}
/**
@@ -189,6 +207,14 @@ public abstract class AbstractRpcSink extends AbstractSink
Preconditions.checkNotNull(client, "Rpc Client could not be " +
"initialized. " + getName() + " could not be started");
sinkCounter.incrementConnectionCreatedCount();
+ if (cxnResetInterval > 0) {
+ cxnResetExecutor.schedule(new Runnable() {
+ @Override
+ public void run() {
+ destroyConnection();
+ }
+ }, cxnResetInterval, TimeUnit.SECONDS);
+ }
} catch (Exception ex) {
sinkCounter.incrementConnectionFailedCount();
if (ex instanceof FlumeException) {
@@ -266,6 +292,15 @@ public abstract class AbstractRpcSink extends AbstractSink
logger.info("Rpc sink {} stopping...", getName());
destroyConnection();
+ cxnResetExecutor.shutdown();
+ try {
+ if (cxnResetExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
+ cxnResetExecutor.shutdownNow();
+ }
+ } catch (Exception ex) {
+ logger.error("Interrupted while waiting for connection reset executor " +
+ "to shut down");
+ }
sinkCounter.stop();
super.stop();
@@ -338,4 +373,9 @@ public abstract class AbstractRpcSink extends AbstractSink
return status;
}
+
+ @VisibleForTesting
+ RpcClient getUnderlyingClient() {
+ return client;
+ }
}
http://git-wip-us.apache.org/repos/asf/flume/blob/3dc9069a/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java b/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java
index 3b1c8db..ac47ee9 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java
@@ -43,6 +43,7 @@ import org.apache.flume.EventDeliveryException;
import org.apache.flume.Sink;
import org.apache.flume.Transaction;
import org.apache.flume.channel.ChannelProcessor;
+import org.apache.flume.api.RpcClient;
import org.apache.flume.channel.MemoryChannel;
import org.apache.flume.channel.ReplicatingChannelSelector;
import org.apache.flume.conf.Configurables;
@@ -264,6 +265,66 @@ public class TestAvroSink {
server.close();
}
+ @Test
+ public void testReset() throws Exception {
+
+ setUp();
+ Server server = createServer(new MockAvroServer());
+
+ server.start();
+
+ Context context = new Context();
+
+ context.put("hostname", hostname);
+ context.put("port", String.valueOf(port));
+ context.put("batch-size", String.valueOf(2));
+ context.put("connect-timeout", String.valueOf(2000L));
+ context.put("request-timeout", String.valueOf(3000L));
+ context.put("reset-connection-interval", String.valueOf("5"));
+
+ sink.setChannel(channel);
+ Configurables.configure(sink, context);
+ sink.start();
+ RpcClient firstClient = sink.getUnderlyingClient();
+ Thread.sleep(6000);
+ // Make sure they are not the same object, connection should be reset
+ Assert.assertFalse(firstClient == sink.getUnderlyingClient());
+ sink.stop();
+
+ context.put("hostname", hostname);
+ context.put("port", String.valueOf(port));
+ context.put("batch-size", String.valueOf(2));
+ context.put("connect-timeout", String.valueOf(2000L));
+ context.put("request-timeout", String.valueOf(3000L));
+ context.put("reset-connection-interval", String.valueOf("0"));
+
+ sink.setChannel(channel);
+ Configurables.configure(sink, context);
+ sink.start();
+ firstClient = sink.getUnderlyingClient();
+ Thread.sleep(6000);
+ // Make sure they are the same object, since connection should not be reset
+ Assert.assertTrue(firstClient == sink.getUnderlyingClient());
+ sink.stop();
+
+ context.clear();
+ context.put("hostname", hostname);
+ context.put("port", String.valueOf(port));
+ context.put("batch-size", String.valueOf(2));
+ context.put("connect-timeout", String.valueOf(2000L));
+ context.put("request-timeout", String.valueOf(3000L));
+
+ sink.setChannel(channel);
+ Configurables.configure(sink, context);
+ sink.start();
+ firstClient = sink.getUnderlyingClient();
+ Thread.sleep(6000);
+ // Make sure they are the same object, since connection should not be reset
+ Assert.assertTrue(firstClient == sink.getUnderlyingClient());
+ sink.stop();
+ server.close();
+ }
+
private Server createServer(AvroSourceProtocol protocol)
throws IllegalAccessException, InstantiationException {
http://git-wip-us.apache.org/repos/asf/flume/blob/3dc9069a/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index d72c965..b0dcbfc 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -1453,18 +1453,19 @@ hostname / port pair. The events are taken from the configured Channel in
batches of the configured batch size.
Required properties are in **bold**.
-=================== ======= ==============================================
-Property Name Default Description
-=================== ======= ==============================================
-**channel** --
-**type** -- The component type name, needs to be ``avro``.
-**hostname** -- The hostname or IP address to bind to.
-**port** -- The port # to listen on.
-batch-size 100 number of event to batch together for send.
-connect-timeout 20000 Amount of time (ms) to allow for the first (handshake) request.
-request-timeout 20000 Amount of time (ms) to allow for requests after the first.
-compression-type none This can be "none" or "deflate". The compression-type must match the compression-type of matching AvroSource
-compression-level 6 The level of compression to compress event. 0 = no compression and 1-9 is compression. The higher the number the more compression
+========================== ======= ==============================================
+Property Name Default Description
+========================== ======= ==============================================
+**channel** --
+**type** -- The component type name, needs to be ``avro``.
+**hostname** -- The hostname or IP address to bind to.
+**port** -- The port # to listen on.
+batch-size 100 number of event to batch together for send.
+connect-timeout 20000 Amount of time (ms) to allow for the first (handshake) request.
+request-timeout 20000 Amount of time (ms) to allow for requests after the first.
+connection-reset-interval none Amount of time (s) before the connection to the next hop is reset. This will force the Avro Sink to reconnect to the next hop. This will allow the sink to connect to hosts behind a hardware load-balancer when news hosts are added without having to restart the agent.
+compression-type none This can be "none" or "deflate". The compression-type must match the compression-type of matching AvroSource
+compression-level 6 The level of compression to compress event. 0 = no compression and 1-9 is compression. The higher the number the more compression
=================== ======= ==============================================
Example for agent named a1: