You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by de...@apache.org on 2017/06/30 11:31:43 UTC
flume git commit: FLUME-2752. Fix AvroSource startup resource leaks
Repository: flume
Updated Branches:
refs/heads/trunk d01dfd321 -> b5e5ba50f
FLUME-2752. Fix AvroSource startup resource leaks
Cleanup after Netty initialisation fails (call this.stop())
- Make sure this.stop() releases the resources and end up the component in
a LifecycleAware.STOPPED state
- Added junit test to cover the invalid host scenario
- Added junit test to cover the used port scenario
This closes #141.
Reviewers: Denes Arvay
(Attila Simon via Denes Arvay)
Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/b5e5ba50
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/b5e5ba50
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/b5e5ba50
Branch: refs/heads/trunk
Commit: b5e5ba50f4333272b9e2f2be2b32027e667f32e2
Parents: d01dfd3
Author: Attila Simon <sa...@cloudera.com>
Authored: Thu Jun 29 08:21:44 2017 +0200
Committer: Denes Arvay <de...@apache.org>
Committed: Fri Jun 30 13:29:35 2017 +0200
----------------------------------------------------------------------
.../org/apache/flume/source/AvroSource.java | 55 +++---
.../org/apache/flume/source/TestAvroSource.java | 168 ++++++++++++-------
2 files changed, 140 insertions(+), 83 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flume/blob/b5e5ba50/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java
index 762f690..e3467ec 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java
@@ -20,7 +20,6 @@
package org.apache.flume.source;
import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.avro.ipc.NettyServer;
import org.apache.avro.ipc.NettyTransceiver;
@@ -156,6 +155,7 @@ public class AvroSource extends AbstractSource implements EventDrivenSource,
private boolean enableIpFilter;
private String patternRuleConfigDefinition;
+ private NioServerSocketChannelFactory socketChannelFactory;
private Server server;
private SourceCounter sourceCounter;
@@ -233,14 +233,20 @@ public class AvroSource extends AbstractSource implements EventDrivenSource,
public void start() {
logger.info("Starting {}...", this);
- Responder responder = new SpecificResponder(AvroSourceProtocol.class, this);
+ try {
+ Responder responder = new SpecificResponder(AvroSourceProtocol.class, this);
- NioServerSocketChannelFactory socketChannelFactory = initSocketChannelFactory();
+ socketChannelFactory = initSocketChannelFactory();
- ChannelPipelineFactory pipelineFactory = initChannelPipelineFactory();
+ ChannelPipelineFactory pipelineFactory = initChannelPipelineFactory();
- server = new NettyServer(responder, new InetSocketAddress(bindAddress, port),
- socketChannelFactory, pipelineFactory, null);
+ server = new NettyServer(responder, new InetSocketAddress(bindAddress, port),
+ socketChannelFactory, pipelineFactory, null);
+ } catch (org.jboss.netty.channel.ChannelException nce) {
+ logger.error("Avro source {} startup failed. Cannot initialize Netty server", getName(), nce);
+ stop();
+ throw new FlumeException("Failed to set up server socket", nce);
+ }
connectionCountUpdater = Executors.newSingleThreadScheduledExecutor();
server.start();
@@ -300,28 +306,31 @@ public class AvroSource extends AbstractSource implements EventDrivenSource,
public void stop() {
logger.info("Avro source {} stopping: {}", getName(), this);
- server.close();
+ if (server != null) {
+ server.close();
+ try {
+ server.join();
+ server = null;
+ } catch (InterruptedException e) {
+ logger.info("Avro source " + getName() + ": Interrupted while waiting " +
+ "for Avro server to stop. Exiting. Exception follows.", e);
+ Thread.currentThread().interrupt();
+ }
+ }
- try {
- server.join();
- } catch (InterruptedException e) {
- logger.info("Avro source " + getName() + ": Interrupted while waiting " +
- "for Avro server to stop. Exiting. Exception follows.", e);
+ if (socketChannelFactory != null) {
+ socketChannelFactory.releaseExternalResources();
+ socketChannelFactory = null;
}
+
sourceCounter.stop();
- connectionCountUpdater.shutdown();
- while (!connectionCountUpdater.isTerminated()) {
- try {
- Thread.sleep(100);
- } catch (InterruptedException ex) {
- logger.error("Interrupted while waiting for connection count executor "
- + "to terminate", ex);
- Throwables.propagate(ex);
- }
+ if (connectionCountUpdater != null) {
+ connectionCountUpdater.shutdownNow();
+ connectionCountUpdater = null;
}
+
super.stop();
- logger.info("Avro source {} stopped. Metrics: {}", getName(),
- sourceCounter);
+ logger.info("Avro source {} stopped. Metrics: {}", getName(), sourceCounter);
}
@Override
http://git-wip-us.apache.org/repos/asf/flume/blob/b5e5ba50/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java b/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java
index d73e5ad..77fcb22 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java
@@ -20,12 +20,12 @@
package org.apache.flume.source;
import java.io.IOException;
-import java.net.Inet4Address;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.security.cert.X509Certificate;
+import java.nio.channels.ServerSocketChannel;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -52,7 +52,6 @@ import org.apache.flume.lifecycle.LifecycleState;
import org.apache.flume.source.avro.AvroFlumeEvent;
import org.apache.flume.source.avro.AvroSourceProtocol;
import org.apache.flume.source.avro.Status;
-import org.jboss.netty.channel.ChannelException;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.socket.SocketChannel;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
@@ -97,20 +96,19 @@ public class TestAvroSource {
boolean bound = false;
for (int i = 0; i < 100 && !bound; i++) {
- try {
- Context context = new Context();
-
- context.put("port", String.valueOf(selectedPort = 41414 + i));
- context.put("bind", "0.0.0.0");
- Configurables.configure(source, context);
+ Context context = new Context();
+ context.put("port", String.valueOf(selectedPort = 41414 + i));
+ context.put("bind", "0.0.0.0");
+ // Invalid configuration may throw a FlumeException which has to be expected in the callers
+ Configurables.configure(source, context);
+ try {
source.start();
bound = true;
- } catch (ChannelException e) {
+ } catch (FlumeException e) {
/*
- * NB: This assume we're using the Netty server under the hood and the
- * failure is to bind. Yucky.
+ * NB: This assume the failure is to bind.
*/
}
}
@@ -129,6 +127,62 @@ public class TestAvroSource {
}
@Test
+ public void testSourceStoppedOnFlumeExceptionIfPortUsed()
+ throws InterruptedException, IOException {
+ final String loopbackIPv4 = "127.0.0.1";
+ final int port = 10500;
+
+ // create a dummy socket bound to a known port.
+ try (ServerSocketChannel dummyServerSocket = ServerSocketChannel.open()) {
+ dummyServerSocket.socket().setReuseAddress(true);
+ dummyServerSocket.socket().bind(new InetSocketAddress(loopbackIPv4, port));
+
+ Context context = new Context();
+ context.put("port", String.valueOf(port));
+ context.put("bind", loopbackIPv4);
+ Configurables.configure(source, context);
+ try {
+ source.start();
+ Assert.fail("Expected an exception during startup caused by binding on a used port");
+ } catch (FlumeException e) {
+ logger.info("Received an expected exception.", e);
+ Assert.assertTrue("Expected a server socket setup related root cause",
+ e.getMessage().contains("server socket"));
+ }
+ }
+ // As port is already in use, an exception is thrown and the source is stopped
+ // cleaning up the opened sockets during source.start().
+ Assert.assertEquals("Server is stopped", LifecycleState.STOP,
+ source.getLifecycleState());
+ }
+
+ @Test
+ public void testInvalidAddress()
+ throws InterruptedException, IOException {
+ final String invalidHost = "invalid.host";
+ final int port = 10501;
+
+ Context context = new Context();
+ context.put("port", String.valueOf(port));
+ context.put("bind", invalidHost);
+ Configurables.configure(source, context);
+
+ try {
+ source.start();
+ Assert.fail("Expected an exception during startup caused by binding on a invalid host");
+ } catch (FlumeException e) {
+ logger.info("Received an expected exception.", e);
+ Assert.assertTrue("Expected a server socket setup related root cause",
+ e.getMessage().contains("server socket"));
+ }
+
+ // As port is already in use, an exception is thrown and the source is stopped
+ // cleaning up the opened sockets during source.start().
+ Assert.assertEquals("Server is stopped", LifecycleState.STOP,
+ source.getLifecycleState());
+ }
+
+ @Test
public void testRequestWithNoCompression() throws InterruptedException, IOException {
doRequest(false, false, 6);
@@ -179,25 +233,22 @@ public class TestAvroSource {
boolean bound = false;
for (int i = 0; i < 100 && !bound; i++) {
+ Context context = new Context();
+ context.put("port", String.valueOf(selectedPort = 41414 + i));
+ context.put("bind", "0.0.0.0");
+ context.put("threads", "50");
+ if (serverEnableCompression) {
+ context.put("compression-type", "deflate");
+ } else {
+ context.put("compression-type", "none");
+ }
+ Configurables.configure(source, context);
try {
- Context context = new Context();
- context.put("port", String.valueOf(selectedPort = 41414 + i));
- context.put("bind", "0.0.0.0");
- context.put("threads", "50");
- if (serverEnableCompression) {
- context.put("compression-type", "deflate");
- } else {
- context.put("compression-type", "none");
- }
-
- Configurables.configure(source, context);
-
source.start();
bound = true;
- } catch (ChannelException e) {
+ } catch (FlumeException e) {
/*
- * NB: This assume we're using the Netty server under the hood and the
- * failure is to bind. Yucky.
+ * NB: This assume the failure is to bind.
*/
}
}
@@ -282,24 +333,21 @@ public class TestAvroSource {
boolean bound = false;
for (int i = 0; i < 10 && !bound; i++) {
+ Context context = new Context();
+
+ context.put("port", String.valueOf(selectedPort = 41414 + i));
+ context.put("bind", "0.0.0.0");
+ context.put("ssl", "true");
+ context.put("keystore", "src/test/resources/server.p12");
+ context.put("keystore-password", "password");
+ context.put("keystore-type", "PKCS12");
+ Configurables.configure(source, context);
try {
- Context context = new Context();
-
- context.put("port", String.valueOf(selectedPort = 41414 + i));
- context.put("bind", "0.0.0.0");
- context.put("ssl", "true");
- context.put("keystore", "src/test/resources/server.p12");
- context.put("keystore-password", "password");
- context.put("keystore-type", "PKCS12");
-
- Configurables.configure(source, context);
-
source.start();
bound = true;
- } catch (ChannelException e) {
+ } catch (FlumeException e) {
/*
- * NB: This assume we're using the Netty server under the hood and the
- * failure is to bind. Yucky.
+ * NB: This assume the failure is to bind.
*/
Thread.sleep(100);
}
@@ -466,30 +514,30 @@ public class TestAvroSource {
boolean bound = false;
for (int i = 0; i < 100 && !bound; i++) {
- try {
- Context context = new Context();
- context.put("port", String.valueOf(selectedPort = 41414 + i));
- context.put("bind", "0.0.0.0");
- context.put("ipFilter", "true");
- if (ruleDefinition != null) {
- context.put("ipFilterRules", ruleDefinition);
- }
- if (testWithSSL) {
- logger.info("Client testWithSSL" + testWithSSL);
- context.put("ssl", "true");
- context.put("keystore", "src/test/resources/server.p12");
- context.put("keystore-password", "password");
- context.put("keystore-type", "PKCS12");
- }
-
- Configurables.configure(source, context);
+ Context context = new Context();
+ context.put("port", String.valueOf(selectedPort = 41414 + i));
+ context.put("bind", "0.0.0.0");
+ context.put("ipFilter", "true");
+ if (ruleDefinition != null) {
+ context.put("ipFilterRules", ruleDefinition);
+ }
+ if (testWithSSL) {
+ logger.info("Client testWithSSL" + testWithSSL);
+ context.put("ssl", "true");
+ context.put("keystore", "src/test/resources/server.p12");
+ context.put("keystore-password", "password");
+ context.put("keystore-type", "PKCS12");
+ }
+ // Invalid configuration may result in a FlumeException
+ Configurables.configure(source, context);
+
+ try {
source.start();
bound = true;
- } catch (ChannelException e) {
+ } catch (FlumeException e) {
/*
- * NB: This assume we're using the Netty server under the hood and the
- * failure is to bind. Yucky.
+ * NB: This assume the failure is to bind.
*/
Thread.sleep(100);
}