You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by de...@apache.org on 2017/06/30 11:31:43 UTC

flume git commit: FLUME-2752. Fix AvroSource startup resource leaks

Repository: flume
Updated Branches:
  refs/heads/trunk d01dfd321 -> b5e5ba50f


FLUME-2752. Fix AvroSource startup resource leaks

Cleanup after Netty initialisation fails (call this.stop())

- Make sure this.stop() releases the resources and end up the component in
  a LifecycleAware.STOPPED state
- Added junit test to cover the invalid host scenario
- Added junit test to cover the used port scenario

This closes #141.

Reviewers: Denes Arvay

(Attila Simon via Denes Arvay)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/b5e5ba50
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/b5e5ba50
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/b5e5ba50

Branch: refs/heads/trunk
Commit: b5e5ba50f4333272b9e2f2be2b32027e667f32e2
Parents: d01dfd3
Author: Attila Simon <sa...@cloudera.com>
Authored: Thu Jun 29 08:21:44 2017 +0200
Committer: Denes Arvay <de...@apache.org>
Committed: Fri Jun 30 13:29:35 2017 +0200

----------------------------------------------------------------------
 .../org/apache/flume/source/AvroSource.java     |  55 +++---
 .../org/apache/flume/source/TestAvroSource.java | 168 ++++++++++++-------
 2 files changed, 140 insertions(+), 83 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/b5e5ba50/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java
index 762f690..e3467ec 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java
@@ -20,7 +20,6 @@
 package org.apache.flume.source;
 
 import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.avro.ipc.NettyServer;
 import org.apache.avro.ipc.NettyTransceiver;
@@ -156,6 +155,7 @@ public class AvroSource extends AbstractSource implements EventDrivenSource,
   private boolean enableIpFilter;
   private String patternRuleConfigDefinition;
 
+  private NioServerSocketChannelFactory socketChannelFactory;
   private Server server;
   private SourceCounter sourceCounter;
 
@@ -233,14 +233,20 @@ public class AvroSource extends AbstractSource implements EventDrivenSource,
   public void start() {
     logger.info("Starting {}...", this);
 
-    Responder responder = new SpecificResponder(AvroSourceProtocol.class, this);
+    try {
+      Responder responder = new SpecificResponder(AvroSourceProtocol.class, this);
 
-    NioServerSocketChannelFactory socketChannelFactory = initSocketChannelFactory();
+      socketChannelFactory = initSocketChannelFactory();
 
-    ChannelPipelineFactory pipelineFactory = initChannelPipelineFactory();
+      ChannelPipelineFactory pipelineFactory = initChannelPipelineFactory();
 
-    server = new NettyServer(responder, new InetSocketAddress(bindAddress, port),
-          socketChannelFactory, pipelineFactory, null);
+      server = new NettyServer(responder, new InetSocketAddress(bindAddress, port),
+              socketChannelFactory, pipelineFactory, null);
+    } catch (org.jboss.netty.channel.ChannelException nce) {
+      logger.error("Avro source {} startup failed. Cannot initialize Netty server", getName(), nce);
+      stop();
+      throw new FlumeException("Failed to set up server socket", nce);
+    }
 
     connectionCountUpdater = Executors.newSingleThreadScheduledExecutor();
     server.start();
@@ -300,28 +306,31 @@ public class AvroSource extends AbstractSource implements EventDrivenSource,
   public void stop() {
     logger.info("Avro source {} stopping: {}", getName(), this);
 
-    server.close();
+    if (server != null) {
+      server.close();
+      try {
+        server.join();
+        server = null;
+      } catch (InterruptedException e) {
+        logger.info("Avro source " + getName() + ": Interrupted while waiting " +
+                "for Avro server to stop. Exiting. Exception follows.", e);
+        Thread.currentThread().interrupt();
+      }
+    }
 
-    try {
-      server.join();
-    } catch (InterruptedException e) {
-      logger.info("Avro source " + getName() + ": Interrupted while waiting " +
-          "for Avro server to stop. Exiting. Exception follows.", e);
+    if (socketChannelFactory != null) {
+      socketChannelFactory.releaseExternalResources();
+      socketChannelFactory = null;
     }
+
     sourceCounter.stop();
-    connectionCountUpdater.shutdown();
-    while (!connectionCountUpdater.isTerminated()) {
-      try {
-        Thread.sleep(100);
-      } catch (InterruptedException ex) {
-        logger.error("Interrupted while waiting for connection count executor "
-                + "to terminate", ex);
-        Throwables.propagate(ex);
-      }
+    if (connectionCountUpdater != null) {
+      connectionCountUpdater.shutdownNow();
+      connectionCountUpdater = null;
     }
+
     super.stop();
-    logger.info("Avro source {} stopped. Metrics: {}", getName(),
-        sourceCounter);
+    logger.info("Avro source {} stopped. Metrics: {}", getName(), sourceCounter);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/flume/blob/b5e5ba50/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java b/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java
index d73e5ad..77fcb22 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java
@@ -20,12 +20,12 @@
 package org.apache.flume.source;
 
 import java.io.IOException;
-import java.net.Inet4Address;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
 import java.security.cert.X509Certificate;
+import java.nio.channels.ServerSocketChannel;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -52,7 +52,6 @@ import org.apache.flume.lifecycle.LifecycleState;
 import org.apache.flume.source.avro.AvroFlumeEvent;
 import org.apache.flume.source.avro.AvroSourceProtocol;
 import org.apache.flume.source.avro.Status;
-import org.jboss.netty.channel.ChannelException;
 import org.jboss.netty.channel.ChannelPipeline;
 import org.jboss.netty.channel.socket.SocketChannel;
 import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
@@ -97,20 +96,19 @@ public class TestAvroSource {
     boolean bound = false;
 
     for (int i = 0; i < 100 && !bound; i++) {
-      try {
-        Context context = new Context();
-
-        context.put("port", String.valueOf(selectedPort = 41414 + i));
-        context.put("bind", "0.0.0.0");
 
-        Configurables.configure(source, context);
+      Context context = new Context();
 
+      context.put("port", String.valueOf(selectedPort = 41414 + i));
+      context.put("bind", "0.0.0.0");
+      // Invalid configuration may throw a FlumeException which has to be expected in the callers
+      Configurables.configure(source, context);
+      try {
         source.start();
         bound = true;
-      } catch (ChannelException e) {
+      } catch (FlumeException e) {
         /*
-         * NB: This assume we're using the Netty server under the hood and the
-         * failure is to bind. Yucky.
+         * NB: This assume the failure is to bind.
          */
       }
     }
@@ -129,6 +127,62 @@ public class TestAvroSource {
   }
 
   @Test
+  public void testSourceStoppedOnFlumeExceptionIfPortUsed()
+      throws InterruptedException, IOException {
+    final String loopbackIPv4 = "127.0.0.1";
+    final int port = 10500;
+
+    // create a dummy socket bound to a known port.
+    try (ServerSocketChannel dummyServerSocket = ServerSocketChannel.open()) {
+      dummyServerSocket.socket().setReuseAddress(true);
+      dummyServerSocket.socket().bind(new InetSocketAddress(loopbackIPv4, port));
+
+      Context context = new Context();
+      context.put("port", String.valueOf(port));
+      context.put("bind", loopbackIPv4);
+      Configurables.configure(source, context);
+      try {
+        source.start();
+        Assert.fail("Expected an exception during startup caused by binding on a used port");
+      } catch (FlumeException e) {
+        logger.info("Received an expected exception.", e);
+        Assert.assertTrue("Expected a server socket setup related root cause",
+            e.getMessage().contains("server socket"));
+      }
+    }
+    // As port is already in use, an exception is thrown and the source is stopped
+    // cleaning up the opened sockets during source.start().
+    Assert.assertEquals("Server is stopped", LifecycleState.STOP,
+            source.getLifecycleState());
+  }
+
+  @Test
+  public void testInvalidAddress()
+      throws InterruptedException, IOException {
+    final String invalidHost = "invalid.host";
+    final int port = 10501;
+
+    Context context = new Context();
+    context.put("port", String.valueOf(port));
+    context.put("bind", invalidHost);
+    Configurables.configure(source, context);
+
+    try {
+      source.start();
+      Assert.fail("Expected an exception during startup caused by binding on a invalid host");
+    } catch (FlumeException e) {
+      logger.info("Received an expected exception.", e);
+      Assert.assertTrue("Expected a server socket setup related root cause",
+          e.getMessage().contains("server socket"));
+    }
+
+    // As port is already in use, an exception is thrown and the source is stopped
+    // cleaning up the opened sockets during source.start().
+    Assert.assertEquals("Server is stopped", LifecycleState.STOP,
+        source.getLifecycleState());
+  }
+
+  @Test
   public void testRequestWithNoCompression() throws InterruptedException, IOException {
 
     doRequest(false, false, 6);
@@ -179,25 +233,22 @@ public class TestAvroSource {
     boolean bound = false;
 
     for (int i = 0; i < 100 && !bound; i++) {
+      Context context = new Context();
+      context.put("port", String.valueOf(selectedPort = 41414 + i));
+      context.put("bind", "0.0.0.0");
+      context.put("threads", "50");
+      if (serverEnableCompression) {
+        context.put("compression-type", "deflate");
+      } else {
+        context.put("compression-type", "none");
+      }
+      Configurables.configure(source, context);
       try {
-        Context context = new Context();
-        context.put("port", String.valueOf(selectedPort = 41414 + i));
-        context.put("bind", "0.0.0.0");
-        context.put("threads", "50");
-        if (serverEnableCompression) {
-          context.put("compression-type", "deflate");
-        } else {
-          context.put("compression-type", "none");
-        }
-
-        Configurables.configure(source, context);
-
         source.start();
         bound = true;
-      } catch (ChannelException e) {
+      } catch (FlumeException e) {
         /*
-         * NB: This assume we're using the Netty server under the hood and the
-         * failure is to bind. Yucky.
+         * NB: This assume the failure is to bind.
          */
       }
     }
@@ -282,24 +333,21 @@ public class TestAvroSource {
     boolean bound = false;
 
     for (int i = 0; i < 10 && !bound; i++) {
+      Context context = new Context();
+
+      context.put("port", String.valueOf(selectedPort = 41414 + i));
+      context.put("bind", "0.0.0.0");
+      context.put("ssl", "true");
+      context.put("keystore", "src/test/resources/server.p12");
+      context.put("keystore-password", "password");
+      context.put("keystore-type", "PKCS12");
+      Configurables.configure(source, context);
       try {
-        Context context = new Context();
-
-        context.put("port", String.valueOf(selectedPort = 41414 + i));
-        context.put("bind", "0.0.0.0");
-        context.put("ssl", "true");
-        context.put("keystore", "src/test/resources/server.p12");
-        context.put("keystore-password", "password");
-        context.put("keystore-type", "PKCS12");
-
-        Configurables.configure(source, context);
-
         source.start();
         bound = true;
-      } catch (ChannelException e) {
+      } catch (FlumeException e) {
         /*
-         * NB: This assume we're using the Netty server under the hood and the
-         * failure is to bind. Yucky.
+         * NB: This assume the failure is to bind.
          */
         Thread.sleep(100);
       }
@@ -466,30 +514,30 @@ public class TestAvroSource {
     boolean bound = false;
 
     for (int i = 0; i < 100 && !bound; i++) {
-      try {
-        Context context = new Context();
-        context.put("port", String.valueOf(selectedPort = 41414 + i));
-        context.put("bind", "0.0.0.0");
-        context.put("ipFilter", "true");
-        if (ruleDefinition != null) {
-          context.put("ipFilterRules", ruleDefinition);
-        }
-        if (testWithSSL) {
-          logger.info("Client testWithSSL" + testWithSSL);
-          context.put("ssl", "true");
-          context.put("keystore", "src/test/resources/server.p12");
-          context.put("keystore-password", "password");
-          context.put("keystore-type", "PKCS12");
-        }
-
-        Configurables.configure(source, context);
 
+      Context context = new Context();
+      context.put("port", String.valueOf(selectedPort = 41414 + i));
+      context.put("bind", "0.0.0.0");
+      context.put("ipFilter", "true");
+      if (ruleDefinition != null) {
+        context.put("ipFilterRules", ruleDefinition);
+      }
+      if (testWithSSL) {
+        logger.info("Client testWithSSL" + testWithSSL);
+        context.put("ssl", "true");
+        context.put("keystore", "src/test/resources/server.p12");
+        context.put("keystore-password", "password");
+        context.put("keystore-type", "PKCS12");
+      }
+      // Invalid configuration may result in a FlumeException
+      Configurables.configure(source, context);
+
+      try {
         source.start();
         bound = true;
-      } catch (ChannelException e) {
+      } catch (FlumeException e) {
         /*
-         * NB: This assume we're using the Netty server under the hood and the
-         * failure is to bind. Yucky.
+         * NB: This assume the failure is to bind.
          */
         Thread.sleep(100);
       }