You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by ar...@apache.org on 2012/03/29 04:09:30 UTC

svn commit: r1306692 - in /incubator/flume/trunk: flume-ng-core/src/main/java/org/apache/flume/event/ flume-ng-core/src/main/java/org/apache/flume/source/ flume-ng-node/src/test/java/org/apache/flume/source/

Author: arvind
Date: Thu Mar 29 02:09:29 2012
New Revision: 1306692

URL: http://svn.apache.org/viewvc?rev=1306692&view=rev
Log:
FLUME-1037. Netcat handler threads terminate under stress test.

(Mike Percy via Arvind Prabhakar)

Added:
    incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSourceConfigurationConstants.java   (with props)
Modified:
    incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/event/EventHelper.java
    incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java
    incubator/flume/trunk/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java

Modified: incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/event/EventHelper.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/event/EventHelper.java?rev=1306692&r1=1306691&r2=1306692&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/event/EventHelper.java (original)
+++ incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/event/EventHelper.java Thu Mar 29 02:09:29 2012
@@ -41,8 +41,10 @@ public class EventHelper {
 
   public static String dumpEvent(Event event, int maxBytes) {
     StringBuilder buffer = new StringBuilder();
-    if(event == null) {
+    if (event == null || event.getBody() == null) {
       buffer.append("null");
+    } else if (event.getBody().length == 0) {
+      // do nothing... in this case, HexDump.dump() will throw an exception
     } else {
       byte[] body = event.getBody();
       byte[] data = Arrays.copyOf(body, Math.min(body.length, maxBytes));

Modified: incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java?rev=1306692&r1=1306691&r2=1306692&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java (original)
+++ incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java Thu Mar 29 02:09:29 2012
@@ -24,6 +24,7 @@ import java.io.Reader;
 import java.io.Writer;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
+import java.nio.ByteBuffer;
 import java.nio.CharBuffer;
 import java.nio.channels.Channels;
 import java.nio.channels.ClosedByInterruptException;
@@ -46,6 +47,7 @@ import org.apache.flume.event.EventBuild
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Charsets;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 /**
@@ -86,6 +88,12 @@ import com.google.common.util.concurrent
  * <td>TCP port / int</td>
  * <td>none (required)</td>
  * </tr>
+ * <tr>
+ * <td><tt>max-line-length</tt></td>
+ * <td>The maximum # of chars a line can be per event (including newline).</td>
+ * <td>Number of UTF-8 characters / int</td>
+ * <td>512</td>
+ * </tr>
  * </table>
  * <p>
  * <b>Metrics</b>
@@ -102,6 +110,7 @@ public class NetcatSource extends Abstra
 
   private String hostName;
   private int port;
+  private int maxLineLength;
 
   private CounterGroup counterGroup;
   private ServerSocketChannel serverSocket;
@@ -119,10 +128,16 @@ public class NetcatSource extends Abstra
 
   @Override
   public void configure(Context context) {
-    Configurables.ensureRequiredNonNull(context, "bind", "port");
+    String hostKey = NetcatSourceConfigurationConstants.CONFIG_HOSTNAME;
+    String portKey = NetcatSourceConfigurationConstants.CONFIG_PORT;
+
+    Configurables.ensureRequiredNonNull(context, hostKey, portKey);
 
-    hostName = context.getString("bind");
-    port = Integer.parseInt(context.getString("port"));
+    hostName = context.getString(hostKey);
+    port = context.getInteger(portKey);
+    maxLineLength = context.getInteger(
+        NetcatSourceConfigurationConstants.CONFIG_MAX_LINE_LENGTH,
+        NetcatSourceConfigurationConstants.DEFAULT_MAX_LINE_LENGTH);
   }
 
   @Override
@@ -151,7 +166,7 @@ public class NetcatSource extends Abstra
       return;
     }
 
-    AcceptHandler acceptRunnable = new AcceptHandler();
+    AcceptHandler acceptRunnable = new AcceptHandler(maxLineLength);
     acceptThreadShouldStop.set(false);
     acceptRunnable.counterGroup = counterGroup;
     acceptRunnable.handlerService = handlerService;
@@ -204,16 +219,19 @@ public class NetcatSource extends Abstra
     if (handlerService != null) {
       handlerService.shutdown();
 
-      while (!handlerService.isTerminated()) {
-        logger.debug("Waiting for handler service to stop");
-        try {
-          handlerService.awaitTermination(500, TimeUnit.MILLISECONDS);
-        } catch (InterruptedException e) {
-          logger
-              .debug("Interrupted while waiting for netcat handler service to stop");
-          handlerService.shutdownNow();
-          Thread.currentThread().interrupt();
-        }
+      logger.debug("Waiting for handler service to stop");
+
+      // wait 500ms for threads to stop
+      try {
+        handlerService.awaitTermination(500, TimeUnit.MILLISECONDS);
+      } catch (InterruptedException e) {
+        logger
+            .debug("Interrupted while waiting for netcat handler service to stop");
+        Thread.currentThread().interrupt();
+      }
+
+      if (!handlerService.isShutdown()) {
+        handlerService.shutdownNow();
       }
 
       logger.debug("Handler service stopped");
@@ -222,15 +240,20 @@ public class NetcatSource extends Abstra
     logger.debug("Source stopped. Event metrics:{}", counterGroup);
   }
 
-  public static class AcceptHandler implements Runnable {
+  private static class AcceptHandler implements Runnable {
 
     private ServerSocketChannel serverSocket;
     private CounterGroup counterGroup;
     private ExecutorService handlerService;
     private EventDrivenSource source;
-
     private AtomicBoolean shouldStop;
 
+    private final int maxLineLength;
+
+    public AcceptHandler(int maxLineLength) {
+      this.maxLineLength = maxLineLength;
+    }
+
     @Override
     public void run() {
       logger.debug("Starting accept handler");
@@ -239,7 +262,7 @@ public class NetcatSource extends Abstra
         try {
           SocketChannel socketChannel = serverSocket.accept();
 
-          NetcatSocketHandler request = new NetcatSocketHandler();
+          NetcatSocketHandler request = new NetcatSocketHandler(maxLineLength);
 
           request.socketChannel = socketChannel;
           request.counterGroup = counterGroup;
@@ -260,59 +283,170 @@ public class NetcatSource extends Abstra
     }
   }
 
-  public static class NetcatSocketHandler implements Runnable {
+  private static class NetcatSocketHandler implements Runnable {
 
     private Source source;
-
     private CounterGroup counterGroup;
     private SocketChannel socketChannel;
 
+    private final int maxLineLength;
+
+    public NetcatSocketHandler(int maxLineLength) {
+      this.maxLineLength = maxLineLength;
+    }
+
     @Override
     public void run() {
+      logger.debug("Starting connection handler");
       Event event = null;
 
       try {
         Reader reader = Channels.newReader(socketChannel, "utf-8");
         Writer writer = Channels.newWriter(socketChannel, "utf-8");
-        CharBuffer buffer = CharBuffer.allocate(512);
-        StringBuilder builder = new StringBuilder();
+        CharBuffer buffer = CharBuffer.allocate(maxLineLength);
+        buffer.flip(); // flip() so fill() sees buffer as initially empty
 
-        while (reader.read(buffer) != -1) {
-          buffer.flip();
+        while (true) {
+          // this method blocks until new data is available in the socket
+          int charsRead = fill(buffer, reader);
+          logger.debug("Chars read = {}", charsRead);
+
+          // attempt to process all the events in the buffer
+          int eventsProcessed = processEvents(buffer, writer);
+          logger.debug("Events processed = {}", eventsProcessed);
+
+          if (charsRead == -1) {
+            // if we received EOF before last event processing attempt, then we
+            // have done everything we can
+            break;
+          } else if (charsRead == 0 && eventsProcessed == 0) {
+            if (buffer.remaining() == buffer.capacity()) {
+              // If we get here it means:
+              // 1. Last time we called fill(), no new chars were buffered
+              // 2. After that, we failed to process any events => no newlines
+              // 3. The unread data in the buffer == the size of the buffer
+              // Therefore, we are stuck because the client sent a line longer
+              // than the size of the buffer. Response: Drop the connection.
+              logger.warn("Client sent event exceeding the maximum length");
+              counterGroup.incrementAndGet("events.failed");
+              writer.write("FAILED: Event exceeds the maximum length (" +
+                  buffer.capacity() + " chars, including newline)\n");
+              writer.flush();
+              break;
+            }
+          }
+        }
 
-          logger.debug("read {} characters", buffer.remaining());
+        socketChannel.close();
 
-          counterGroup.addAndGet("characters.received",
-              Long.valueOf(buffer.limit()));
+        counterGroup.incrementAndGet("sessions.completed");
+      } catch (IOException e) {
+        counterGroup.incrementAndGet("sessions.broken");
+      }
 
-          builder.append(buffer.array(), buffer.position(), buffer.length());
-        }
+      logger.debug("Connection handler exiting");
+    }
+
+    /**
+     * <p>Consume some number of events from the buffer into the system.</p>
+     *
+     * Invariants (pre- and post-conditions): <br/>
+     *   buffer should have position @ beginning of unprocessed data. <br/>
+     *   buffer should have limit @ end of unprocessed data. <br/>
+     *
+     * @param buffer The buffer containing data to process
+     * @param writer The channel back to the client
+     * @return number of events successfully processed
+     * @throws IOException
+     */
+    private int processEvents(CharBuffer buffer, Writer writer)
+        throws IOException {
+
+      int numProcessed = 0;
+
+      boolean foundNewLine = true;
+      while (foundNewLine) {
+        foundNewLine = false;
+
+        int limit = buffer.limit();
+        for (int pos = buffer.position(); pos < limit; pos++) {
+          if (buffer.get(pos) == '\n') {
+
+            // parse event body bytes out of CharBuffer
+            buffer.limit(pos); // temporary limit
+            ByteBuffer bytes = Charsets.UTF_8.encode(buffer);
+            buffer.limit(limit); // restore limit
+
+            // build event object
+            byte[] body = new byte[bytes.remaining()];
+            bytes.get(body);
+            Event event = EventBuilder.withBody(body);
+
+            // process event
+            ChannelException ex = null;
+            try {
+              source.getChannelProcessor().processEvent(event);
+            } catch (ChannelException chEx) {
+              ex = chEx;
+            }
+
+            if (ex == null) {
+              counterGroup.incrementAndGet("events.processed");
+              numProcessed++;
+              writer.write("OK\n");
+            } else {
+              counterGroup.incrementAndGet("events.failed");
+              logger.warn("Error processing event. Exception follows.", ex);
+              writer.write("FAILED: " + ex.getMessage() + "\n");
+            }
+            writer.flush();
+
+            // advance position after data is consumed
+            buffer.position(pos + 1); // skip newline
+            foundNewLine = true;
 
-        if (builder.charAt(builder.length() - 1) == '\n') {
-          builder.deleteCharAt(builder.length() - 1);
+            break;
+          }
         }
 
-        event = EventBuilder.withBody(builder.toString().getBytes());
-        Exception ex = null;
+      }
 
-        try {
-          source.getChannelProcessor().processEvent(event);
-        } catch (ChannelException chEx) {
-          ex = chEx;
-        }
+      return numProcessed;
+    }
 
-        if (ex == null) {
-          writer.append("OK\n");
-        } else {
-          writer.append("FAILED: " + ex.getMessage() + "\n");
-        }
+    /**
+     * <p>Refill the buffer read from the socket.</p>
+     *
+     * Preconditions: <br/>
+     *   buffer should have position @ beginning of unprocessed data. <br/>
+     *   buffer should have limit @ end of unprocessed data. <br/>
+     *
+     * Postconditions: <br/>
+     *   buffer should have position @ beginning of buffer (pos=0). <br/>
+     *   buffer should have limit @ end of unprocessed data. <br/>
+     *
+     * Note: this method blocks on new data arriving.
+     *
+     * @param buffer The buffer to fill
+     * @param reader The Reader to read the data from
+     * @return number of characters read
+     * @throws IOException
+     */
+    private int fill(CharBuffer buffer, Reader reader)
+        throws IOException {
+
+      // move existing data to the front of the buffer
+      buffer.compact();
+
+      // pull in as much data as we can from the socket
+      int charsRead = reader.read(buffer);
+      counterGroup.addAndGet("characters.received", Long.valueOf(charsRead));
 
-        socketChannel.close();
+      // flip so the data can be consumed
+      buffer.flip();
 
-        counterGroup.incrementAndGet("events.success");
-      } catch (IOException e) {
-        counterGroup.incrementAndGet("events.failed");
-      }
+      return charsRead;
     }
+
   }
 }

Added: incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSourceConfigurationConstants.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSourceConfigurationConstants.java?rev=1306692&view=auto
==============================================================================
--- incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSourceConfigurationConstants.java (added)
+++ incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSourceConfigurationConstants.java Thu Mar 29 02:09:29 2012
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.source;
+
+public class NetcatSourceConfigurationConstants {
+
+  /**
+   * Hostname to bind to.
+   */
+  public static final String CONFIG_HOSTNAME = "bind";
+
+  /**
+   * Port to bind to.
+   */
+  public static final String CONFIG_PORT = "port";
+
+  /**
+   * Maximum line length per event.
+   */
+  public static final String CONFIG_MAX_LINE_LENGTH = "max-line-length";
+  public static final int DEFAULT_MAX_LINE_LENGTH = 512;
+
+}

Propchange: incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSourceConfigurationConstants.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/flume/trunk/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java?rev=1306692&r1=1306691&r2=1306692&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java (original)
+++ incubator/flume/trunk/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java Thu Mar 29 02:09:29 2012
@@ -19,17 +19,18 @@
 
 package org.apache.flume.source;
 
+import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.Writer;
 import java.net.InetSocketAddress;
 import java.nio.channels.Channels;
 import java.nio.channels.SocketChannel;
-import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
+import com.google.common.collect.Lists;
 import org.apache.flume.Channel;
 import org.apache.flume.ChannelSelector;
 import org.apache.flume.Context;
@@ -45,22 +46,28 @@ import org.apache.flume.lifecycle.Lifecy
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class TestNetcatSource {
 
   private Channel channel;
   private EventDrivenSource source;
 
+  private static final Logger logger =
+      LoggerFactory.getLogger(TestNetcatSource.class);
+
   @Before
   public void setUp() {
+    logger.info("Running setup");
+
     channel = new MemoryChannel();
     source = new NetcatSource();
 
     Context context = new Context();
 
     Configurables.configure(channel, context);
-    List<Channel> channels = new ArrayList<Channel>();
-    channels.add(channel);
+    List<Channel> channels = Lists.newArrayList(channel);
     ChannelSelector rcs = new ReplicatingChannelSelector();
     rcs.setChannels(channels);
 
@@ -82,8 +89,6 @@ public class TestNetcatSource {
 
     source.start();
 
-    /* FIXME: Ensure proper send / received semantics. */
-
     Runnable clientRequestRunnable = new Runnable() {
 
       @Override
@@ -93,24 +98,29 @@ public class TestNetcatSource {
               .open(new InetSocketAddress(41414));
 
           Writer writer = Channels.newWriter(clientChannel, "utf-8");
+          BufferedReader reader = new BufferedReader(
+              Channels.newReader(clientChannel, "utf-8"));
 
-          writer.write("Test message");
-
+          writer.write("Test message\n");
           writer.flush();
+
+          String response = reader.readLine();
+          Assert.assertEquals("Server should return OK", "OK", response);
           clientChannel.close();
         } catch (IOException e) {
-          // TODO Auto-generated catch block
-          e.printStackTrace();
+          logger.error("Caught exception: ", e);
         }
       }
 
     };
 
-    ChannelSelector seclector = source.getChannelProcessor().getSelector();
-    Transaction tx = seclector.getAllChannels().get(0).getTransaction();
+    ChannelSelector selector = source.getChannelProcessor().getSelector();
+    Transaction tx = selector.getAllChannels().get(0).getTransaction();
     tx.begin();
 
     for (int i = 0; i < 100; i++) {
+      logger.info("Sending request");
+
       executor.submit(clientRequestRunnable);
 
       Event event = channel.take();