You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by ar...@apache.org on 2012/03/29 04:09:30 UTC
svn commit: r1306692 - in /incubator/flume/trunk:
flume-ng-core/src/main/java/org/apache/flume/event/
flume-ng-core/src/main/java/org/apache/flume/source/
flume-ng-node/src/test/java/org/apache/flume/source/
Author: arvind
Date: Thu Mar 29 02:09:29 2012
New Revision: 1306692
URL: http://svn.apache.org/viewvc?rev=1306692&view=rev
Log:
FLUME-1037. Netcat handler threads terminate under stress test.
(Mike Percy via Arvind Prabhakar)
Added:
incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSourceConfigurationConstants.java (with props)
Modified:
incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/event/EventHelper.java
incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java
incubator/flume/trunk/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java
Modified: incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/event/EventHelper.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/event/EventHelper.java?rev=1306692&r1=1306691&r2=1306692&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/event/EventHelper.java (original)
+++ incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/event/EventHelper.java Thu Mar 29 02:09:29 2012
@@ -41,8 +41,10 @@ public class EventHelper {
public static String dumpEvent(Event event, int maxBytes) {
StringBuilder buffer = new StringBuilder();
- if(event == null) {
+ if (event == null || event.getBody() == null) {
buffer.append("null");
+ } else if (event.getBody().length == 0) {
+ // do nothing... in this case, HexDump.dump() will throw an exception
} else {
byte[] body = event.getBody();
byte[] data = Arrays.copyOf(body, Math.min(body.length, maxBytes));
Modified: incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java?rev=1306692&r1=1306691&r2=1306692&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java (original)
+++ incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java Thu Mar 29 02:09:29 2012
@@ -24,6 +24,7 @@ import java.io.Reader;
import java.io.Writer;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
+import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.Channels;
import java.nio.channels.ClosedByInterruptException;
@@ -46,6 +47,7 @@ import org.apache.flume.event.EventBuild
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Charsets;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
@@ -86,6 +88,12 @@ import com.google.common.util.concurrent
* <td>TCP port / int</td>
* <td>none (required)</td>
* </tr>
+ * <tr>
+ * <td><tt>max-line-length</tt></td>
+ * <td>The maximum # of chars a line can be per event (including newline).</td>
+ * <td>Number of UTF-8 characters / int</td>
+ * <td>512</td>
+ * </tr>
* </table>
* <p>
* <b>Metrics</b>
@@ -102,6 +110,7 @@ public class NetcatSource extends Abstra
private String hostName;
private int port;
+ private int maxLineLength;
private CounterGroup counterGroup;
private ServerSocketChannel serverSocket;
@@ -119,10 +128,16 @@ public class NetcatSource extends Abstra
@Override
public void configure(Context context) {
- Configurables.ensureRequiredNonNull(context, "bind", "port");
+ String hostKey = NetcatSourceConfigurationConstants.CONFIG_HOSTNAME;
+ String portKey = NetcatSourceConfigurationConstants.CONFIG_PORT;
+
+ Configurables.ensureRequiredNonNull(context, hostKey, portKey);
- hostName = context.getString("bind");
- port = Integer.parseInt(context.getString("port"));
+ hostName = context.getString(hostKey);
+ port = context.getInteger(portKey);
+ maxLineLength = context.getInteger(
+ NetcatSourceConfigurationConstants.CONFIG_MAX_LINE_LENGTH,
+ NetcatSourceConfigurationConstants.DEFAULT_MAX_LINE_LENGTH);
}
@Override
@@ -151,7 +166,7 @@ public class NetcatSource extends Abstra
return;
}
- AcceptHandler acceptRunnable = new AcceptHandler();
+ AcceptHandler acceptRunnable = new AcceptHandler(maxLineLength);
acceptThreadShouldStop.set(false);
acceptRunnable.counterGroup = counterGroup;
acceptRunnable.handlerService = handlerService;
@@ -204,16 +219,19 @@ public class NetcatSource extends Abstra
if (handlerService != null) {
handlerService.shutdown();
- while (!handlerService.isTerminated()) {
- logger.debug("Waiting for handler service to stop");
- try {
- handlerService.awaitTermination(500, TimeUnit.MILLISECONDS);
- } catch (InterruptedException e) {
- logger
- .debug("Interrupted while waiting for netcat handler service to stop");
- handlerService.shutdownNow();
- Thread.currentThread().interrupt();
- }
+ logger.debug("Waiting for handler service to stop");
+
+ // wait 500ms for threads to stop
+ try {
+ handlerService.awaitTermination(500, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ logger
+ .debug("Interrupted while waiting for netcat handler service to stop");
+ Thread.currentThread().interrupt();
+ }
+
+ if (!handlerService.isShutdown()) {
+ handlerService.shutdownNow();
}
logger.debug("Handler service stopped");
@@ -222,15 +240,20 @@ public class NetcatSource extends Abstra
logger.debug("Source stopped. Event metrics:{}", counterGroup);
}
- public static class AcceptHandler implements Runnable {
+ private static class AcceptHandler implements Runnable {
private ServerSocketChannel serverSocket;
private CounterGroup counterGroup;
private ExecutorService handlerService;
private EventDrivenSource source;
-
private AtomicBoolean shouldStop;
+ private final int maxLineLength;
+
+ public AcceptHandler(int maxLineLength) {
+ this.maxLineLength = maxLineLength;
+ }
+
@Override
public void run() {
logger.debug("Starting accept handler");
@@ -239,7 +262,7 @@ public class NetcatSource extends Abstra
try {
SocketChannel socketChannel = serverSocket.accept();
- NetcatSocketHandler request = new NetcatSocketHandler();
+ NetcatSocketHandler request = new NetcatSocketHandler(maxLineLength);
request.socketChannel = socketChannel;
request.counterGroup = counterGroup;
@@ -260,59 +283,170 @@ public class NetcatSource extends Abstra
}
}
- public static class NetcatSocketHandler implements Runnable {
+ private static class NetcatSocketHandler implements Runnable {
private Source source;
-
private CounterGroup counterGroup;
private SocketChannel socketChannel;
+ private final int maxLineLength;
+
+ public NetcatSocketHandler(int maxLineLength) {
+ this.maxLineLength = maxLineLength;
+ }
+
@Override
public void run() {
+ logger.debug("Starting connection handler");
Event event = null;
try {
Reader reader = Channels.newReader(socketChannel, "utf-8");
Writer writer = Channels.newWriter(socketChannel, "utf-8");
- CharBuffer buffer = CharBuffer.allocate(512);
- StringBuilder builder = new StringBuilder();
+ CharBuffer buffer = CharBuffer.allocate(maxLineLength);
+ buffer.flip(); // flip() so fill() sees buffer as initially empty
- while (reader.read(buffer) != -1) {
- buffer.flip();
+ while (true) {
+ // this method blocks until new data is available in the socket
+ int charsRead = fill(buffer, reader);
+ logger.debug("Chars read = {}", charsRead);
+
+ // attempt to process all the events in the buffer
+ int eventsProcessed = processEvents(buffer, writer);
+ logger.debug("Events processed = {}", eventsProcessed);
+
+ if (charsRead == -1) {
+ // if we received EOF before last event processing attempt, then we
+ // have done everything we can
+ break;
+ } else if (charsRead == 0 && eventsProcessed == 0) {
+ if (buffer.remaining() == buffer.capacity()) {
+ // If we get here it means:
+ // 1. Last time we called fill(), no new chars were buffered
+ // 2. After that, we failed to process any events => no newlines
+ // 3. The unread data in the buffer == the size of the buffer
+ // Therefore, we are stuck because the client sent a line longer
+ // than the size of the buffer. Response: Drop the connection.
+ logger.warn("Client sent event exceeding the maximum length");
+ counterGroup.incrementAndGet("events.failed");
+ writer.write("FAILED: Event exceeds the maximum length (" +
+ buffer.capacity() + " chars, including newline)\n");
+ writer.flush();
+ break;
+ }
+ }
+ }
- logger.debug("read {} characters", buffer.remaining());
+ socketChannel.close();
- counterGroup.addAndGet("characters.received",
- Long.valueOf(buffer.limit()));
+ counterGroup.incrementAndGet("sessions.completed");
+ } catch (IOException e) {
+ counterGroup.incrementAndGet("sessions.broken");
+ }
- builder.append(buffer.array(), buffer.position(), buffer.length());
- }
+ logger.debug("Connection handler exiting");
+ }
+
+ /**
+ * <p>Consume some number of events from the buffer into the system.</p>
+ *
+ * Invariants (pre- and post-conditions): <br/>
+ * buffer should have position @ beginning of unprocessed data. <br/>
+ * buffer should have limit @ end of unprocessed data. <br/>
+ *
+ * @param buffer The buffer containing data to process
+ * @param writer The channel back to the client
+ * @return number of events successfully processed
+ * @throws IOException
+ */
+ private int processEvents(CharBuffer buffer, Writer writer)
+ throws IOException {
+
+ int numProcessed = 0;
+
+ boolean foundNewLine = true;
+ while (foundNewLine) {
+ foundNewLine = false;
+
+ int limit = buffer.limit();
+ for (int pos = buffer.position(); pos < limit; pos++) {
+ if (buffer.get(pos) == '\n') {
+
+ // parse event body bytes out of CharBuffer
+ buffer.limit(pos); // temporary limit
+ ByteBuffer bytes = Charsets.UTF_8.encode(buffer);
+ buffer.limit(limit); // restore limit
+
+ // build event object
+ byte[] body = new byte[bytes.remaining()];
+ bytes.get(body);
+ Event event = EventBuilder.withBody(body);
+
+ // process event
+ ChannelException ex = null;
+ try {
+ source.getChannelProcessor().processEvent(event);
+ } catch (ChannelException chEx) {
+ ex = chEx;
+ }
+
+ if (ex == null) {
+ counterGroup.incrementAndGet("events.processed");
+ numProcessed++;
+ writer.write("OK\n");
+ } else {
+ counterGroup.incrementAndGet("events.failed");
+ logger.warn("Error processing event. Exception follows.", ex);
+ writer.write("FAILED: " + ex.getMessage() + "\n");
+ }
+ writer.flush();
+
+ // advance position after data is consumed
+ buffer.position(pos + 1); // skip newline
+ foundNewLine = true;
- if (builder.charAt(builder.length() - 1) == '\n') {
- builder.deleteCharAt(builder.length() - 1);
+ break;
+ }
}
- event = EventBuilder.withBody(builder.toString().getBytes());
- Exception ex = null;
+ }
- try {
- source.getChannelProcessor().processEvent(event);
- } catch (ChannelException chEx) {
- ex = chEx;
- }
+ return numProcessed;
+ }
- if (ex == null) {
- writer.append("OK\n");
- } else {
- writer.append("FAILED: " + ex.getMessage() + "\n");
- }
+ /**
+ * <p>Refill the buffer read from the socket.</p>
+ *
+ * Preconditions: <br/>
+ * buffer should have position @ beginning of unprocessed data. <br/>
+ * buffer should have limit @ end of unprocessed data. <br/>
+ *
+ * Postconditions: <br/>
+ * buffer should have position @ beginning of buffer (pos=0). <br/>
+ * buffer should have limit @ end of unprocessed data. <br/>
+ *
+ * Note: this method blocks on new data arriving.
+ *
+ * @param buffer The buffer to fill
+ * @param reader The Reader to read the data from
+ * @return number of characters read
+ * @throws IOException
+ */
+ private int fill(CharBuffer buffer, Reader reader)
+ throws IOException {
+
+ // move existing data to the front of the buffer
+ buffer.compact();
+
+ // pull in as much data as we can from the socket
+ int charsRead = reader.read(buffer);
+ counterGroup.addAndGet("characters.received", Long.valueOf(charsRead));
- socketChannel.close();
+ // flip so the data can be consumed
+ buffer.flip();
- counterGroup.incrementAndGet("events.success");
- } catch (IOException e) {
- counterGroup.incrementAndGet("events.failed");
- }
+ return charsRead;
}
+
}
}
Added: incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSourceConfigurationConstants.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSourceConfigurationConstants.java?rev=1306692&view=auto
==============================================================================
--- incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSourceConfigurationConstants.java (added)
+++ incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSourceConfigurationConstants.java Thu Mar 29 02:09:29 2012
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.source;
+
+public class NetcatSourceConfigurationConstants {
+
+ /**
+ * Hostname to bind to.
+ */
+ public static final String CONFIG_HOSTNAME = "bind";
+
+ /**
+ * Port to bind to.
+ */
+ public static final String CONFIG_PORT = "port";
+
+ /**
+ * Maximum line length per event.
+ */
+ public static final String CONFIG_MAX_LINE_LENGTH = "max-line-length";
+ public static final int DEFAULT_MAX_LINE_LENGTH = 512;
+
+}
Propchange: incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSourceConfigurationConstants.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: incubator/flume/trunk/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java?rev=1306692&r1=1306691&r2=1306692&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java (original)
+++ incubator/flume/trunk/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java Thu Mar 29 02:09:29 2012
@@ -19,17 +19,18 @@
package org.apache.flume.source;
+import java.io.BufferedReader;
import java.io.IOException;
import java.io.Writer;
import java.net.InetSocketAddress;
import java.nio.channels.Channels;
import java.nio.channels.SocketChannel;
-import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
+import com.google.common.collect.Lists;
import org.apache.flume.Channel;
import org.apache.flume.ChannelSelector;
import org.apache.flume.Context;
@@ -45,22 +46,28 @@ import org.apache.flume.lifecycle.Lifecy
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class TestNetcatSource {
private Channel channel;
private EventDrivenSource source;
+ private static final Logger logger =
+ LoggerFactory.getLogger(TestNetcatSource.class);
+
@Before
public void setUp() {
+ logger.info("Running setup");
+
channel = new MemoryChannel();
source = new NetcatSource();
Context context = new Context();
Configurables.configure(channel, context);
- List<Channel> channels = new ArrayList<Channel>();
- channels.add(channel);
+ List<Channel> channels = Lists.newArrayList(channel);
ChannelSelector rcs = new ReplicatingChannelSelector();
rcs.setChannels(channels);
@@ -82,8 +89,6 @@ public class TestNetcatSource {
source.start();
- /* FIXME: Ensure proper send / received semantics. */
-
Runnable clientRequestRunnable = new Runnable() {
@Override
@@ -93,24 +98,29 @@ public class TestNetcatSource {
.open(new InetSocketAddress(41414));
Writer writer = Channels.newWriter(clientChannel, "utf-8");
+ BufferedReader reader = new BufferedReader(
+ Channels.newReader(clientChannel, "utf-8"));
- writer.write("Test message");
-
+ writer.write("Test message\n");
writer.flush();
+
+ String response = reader.readLine();
+ Assert.assertEquals("Server should return OK", "OK", response);
clientChannel.close();
} catch (IOException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
+ logger.error("Caught exception: ", e);
}
}
};
- ChannelSelector seclector = source.getChannelProcessor().getSelector();
- Transaction tx = seclector.getAllChannels().get(0).getTransaction();
+ ChannelSelector selector = source.getChannelProcessor().getSelector();
+ Transaction tx = selector.getAllChannels().get(0).getTransaction();
tx.begin();
for (int i = 0; i < 100; i++) {
+ logger.info("Sending request");
+
executor.submit(clientRequestRunnable);
Event event = channel.take();